In [1]:
import numpy as np
import pandas as pd
from mlaas_tools2.db_tool import DatabaseConnections
from mlaas_tools2.config_info import ConfigPass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DateType, StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql.functions import create_map, lit
from pyspark.sql.functions import expr
from pyspark.ml.feature import StringIndexer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from datetime import datetime,  timedelta
import os

# DB連線

In [2]:
# from mlaas_tools.config_build import config_set
# from mlaas_tools.config_info import ConfigPass
# from mlaas_tools.db_tool import DatabaseConnections
# # config_set()
# configs = ConfigPass()._configsection
# conns = DatabaseConnections(configs)
# # rawdata_conn = conns.get_rawdata_db_conn()
# # feature_conn = conns.get_feature_db_conn()
# raw_conn = conns.get_rawdata_db_conn()
# raw_cur = raw_conn.cursor()
# ftr_conn = conns.get_feature_db_conn()
# ftr_cur = ftr_conn.cursor()

# Spark 連線

In [2]:
default = {"spark.driver.memory":'16g',
           "fs.s3a.access.key": "smartchannel",
           "fs.s3a.secret.key": "smartchannel",
           "fs.s3a.endpoint": "http://10.240.205.23:9000",
           "fs.s3a.connection.ssl.enabled": False,
           "fs.s3a.path.style.access": True,} 

spark = SparkSession.builder.config(
    conf = (SparkConf().setAppName("T").setAll(default.items()))).getOrCreate()

spark

# 從feature group 以及 layer1 讀資料

In [4]:
train_start_date = '20211224'
train_end_date = '20211225'
test_date = '20211226'
# test_date = '20220131'

In [4]:
train_start_date = '20211217'
train_end_date = '20211226'
test_date = '20211227'
# test_date = '20220131'

In [26]:
train_start_date = '20220227'
train_end_date = '20220227'
test_date = '20220228'
# test_date = '20220131'

## 1. item-subtag (sc_item)

In [9]:
item_sdf = (
    spark.read.parquet(
        "s3a://df-smart-channel/recsys-dataset/beta_v2/layer1/sc_item"
    ).where(col('service')=='smart_channel').
    where(
        (col("date")>=train_start_date) & (col("date")<=train_end_date)
    )
    # .select(['item_id', 'date','subtag_list'])
)

In [10]:
item_subtag_exploded = item_sdf.select(item_sdf.item_id, item_sdf.date,  F.explode(item_sdf.subtag_list))

In [30]:
item_subtag_details = item_subtag_exploded.select(['item_id', 'date', 'col.subtag' ,'col.subtag_chinese_desc','col.subtag_eng_desc']).filter(item_subtag_exploded['col.subtag_chinese_desc'].startswith('03_'))
item_subtag_sdf = item_subtag_details.select(['item_id', 'date', 'subtag_eng_desc'])

In [31]:
item_subtag_net.show(2)

+--------------------+--------+------------------+
|             item_id|    date|   subtag_eng_desc|
+--------------------+--------+------------------+
|8236E47D-EC12-4B4...|20211226|03_wealth_view_ind|
|8236E47D-EC12-4B4...|20211226| 03_investment_ind|
+--------------------+--------+------------------+
only showing top 2 rows



In [32]:
item_subtag_sdf_train = item_subtag_sdf.filter(item_subtag_net.date == train_end_date)
# item_subtag_net_test = item_subtag_net.filter(item_subtag_net.date == test_date)

In [33]:
item_subtag_sdf_train.show(2)

+--------------------+--------+------------------+
|             item_id|    date|   subtag_eng_desc|
+--------------------+--------+------------------+
|8236E47D-EC12-4B4...|20211225|03_wealth_view_ind|
|8236E47D-EC12-4B4...|20211225| 03_investment_ind|
+--------------------+--------+------------------+
only showing top 2 rows



### 2.user-item (context)

In [23]:
context_sdf = (
    spark.read.parquet(
        "s3a://df-smart-channel/recsys-dataset/beta_v2/layer1/context"
    ).where(
        (col("date")>=train_start_date) & (col("date")<=train_end_date)
    )
)

In [26]:
# 去除沒有點擊紀錄的
clean_context_sdf = context_sdf.where(col('item_click_list').isNotNull())
print('去除沒有點擊紀錄後context的數目:', clean_context_sdf.count())
# 將item_click_list展開
clean_context_sdf = clean_context_sdf.select(clean_context_sdf.cust_no, 
                                             clean_context_sdf.date,
                                             F.explode(clean_context_sdf.item_click_list))
# 選出欄位
flatten_clean_context_sdf = clean_context_sdf.select(F.col('cust_no'), F.col('date'), F.col('col.*'))
print('展開item後context的數目:', flatten_clean_context_sdf.count())
# 選 df_smart_channel
flatten_clean_context_sdf = flatten_clean_context_sdf.where(col('hits_eventinfo_eventcategory')=='smart_channel')
print('選 df_smart_channel後context的數目:', flatten_clean_context_sdf.count())
#和item sdf合併
flatten_clean_context_sdf = flatten_clean_context_sdf.join(item_sdf.drop('click', 'show'), on=['item_id', 'date'], how='left')
# 過濾掉公版
# flatten_clean_context_sdf = flatten_clean_context_sdf.where(col('service')!='public_content')
print('過濾掉公版後context的數目:', flatten_clean_context_sdf.count())
flatten_clean_context_sdf = flatten_clean_context_sdf.select(F.col('item_id'), F.col('cust_no'), F.col('date'),
                               F.col('visitdatetime'), F.col('eventdatetime'), F.col('click'), F.col('show')
                              )
# 找出最先點擊或最先曝光(一天可能點同一個item多次)
flatten_clean_context_sdf = flatten_clean_context_sdf.withColumn("day_order", F.row_number().over(Window.partitionBy(['cust_no', 'date', 'item_id']).orderBy(flatten_clean_context_sdf['eventdatetime'])))
# 計算該顧客在當天點同一個item的點擊和曝光總數
cust_click_bydate = flatten_clean_context_sdf.groupby(['cust_no', 'date', 'item_id']).sum('click').withColumnRenamed('sum(click)', 'click')
cust_show_bydate = flatten_clean_context_sdf.groupby(['cust_no', 'date', 'item_id']).sum('show').withColumnRenamed('sum(show)', 'show')
# 選最先點擊的item
flatten_clean_context_sdf = flatten_clean_context_sdf.where(col('day_order')==1).drop('click', 'show', 'day_order')
flatten_clean_context_sdf = flatten_clean_context_sdf.join(cust_click_bydate, on=['cust_no', 'date', 'item_id'], how='left')
flatten_clean_context_sdf = flatten_clean_context_sdf.join(cust_show_bydate, on=['cust_no', 'date', 'item_id'], how='left')
# 計算點擊排名。挑出有點擊的人做排名 15674人有點擊 rank最多為2
click_sdf = flatten_clean_context_sdf.where(col('click')>0)
click_sdf = click_sdf.withColumn("rank", F.row_number().over(Window.partitionBy(['cust_no', 'date']).orderBy(click_sdf['eventdatetime'])))
flatten_clean_context_sdf = flatten_clean_context_sdf.join(click_sdf.select('click', 'cust_no', 'date', 'item_id', 'rank'), on=['click', 'cust_no', 'date', 'item_id'], how='left')
print('Total Context number:', flatten_clean_context_sdf.count())

去除沒有點擊紀錄後context的數目: 294509
展開item後context的數目: 629533
選 df_smart_channel後context的數目: 485314
過濾掉公版後context的數目: 485314
Total Context number: 485314


In [27]:
context_sdf_train = flatten_clean_context_sdf.filter(col('date') <= train_end_date)
# context_sdf_test = flatten_clean_context_sdf.filter(col('date') == test_date)

In [28]:
uniqueUsersObserved = context_sdf_train.select('cust_no').distinct().collect()

In [29]:
unique_users_list = [each_user.__getitem__('cust_no') for each_user in uniqueUsersObserved]

In [None]:
# unique_users_list

In [56]:
## filter unobserved in test
# context_sdf_test = context_sdf_test.filter(context_sdf_test.cust_no.isin(unique_users_list))

## 3. user-subtag (user)

In [34]:
user_sdf = (
    spark.read.parquet(
        "s3a://df-smart-channel/recsys-dataset/beta_v2/layer1/user"
    ).where(
        col("date") == train_end_date
        #(col("date")>=train_end_date) & (col("date")<=train_end_date
    )
)
user_sdf = user_sdf.drop('click', 'show')
user_sdf = user_sdf.select([
    'cust_no', 'date', 'mobile_login_90', 'dd_my', 'dd_md', 'onlymd_ind', 'onlycc_ind', 'efingo_card_ind',  'cl_cpa_amt', 'fc_ind'
]).filter(user_sdf.cust_no.isin(unique_users_list))
print('User Sdf數目:', user_sdf.count())

User Sdf數目: 249725


In [108]:
user_sdf_sub = user_sdf.filter(user_sdf.cust_no.isin(unique_users_list))

In [None]:
# user_df_train = user_sdf.filter(user_sdf.date == train_end_date).toPandas()
# user_df_test = user_sdf_sub.filter(user_sdf_sub.date == test_date).toPandas()

In [None]:
# export_file(user_df_test, test_date, 'user_subtag.csv')

## phase2

In [None]:
item_subtag_sdf.columns

In [None]:
context_sdf_train.columns

In [None]:
user_sdf.columns

In [110]:
user_subtag = user_sdf_sub

### get user-item

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
le_user = LabelEncoder()
context_train.cust_no = le_user.fit_transform(context_train.cust_no)
# context_test.cust_no = le_user.transform(context_test.cust_no)
user_subtag['cust_no'] = le_user.transform(user_subtag['cust_no'])

In [54]:
cust_indexer = StringIndexer(inputCol="cust_no", outputCol="cust_id") 
cust_transformer = cust_indexer.fit(context_sdf_train)

In [55]:
context_sdf_train_indexed = cust_transformer.transform(context_sdf_train) 

In [56]:
item_indexer = StringIndexer(inputCol="item_id", outputCol="item_ind") 
item_transformer = item_indexer.fit(context_sdf_train)

In [57]:
context_sdf_train_indexed = item_transformer.transform(context_sdf_train) 
item_subtag_sdf_train_indexed = item_transformer.transform(item_subtag_sdf_train) 

In [None]:
# modelPath = temp_path + "/string-indexer-model"
# model.save(modelPath)

In [None]:
# loadedTransformer = StringIndexerModel.load(modelPath)

### get user-subtag

In [111]:
user_subtag.count()

249725

In [None]:
# 行銀活躍用戶
mobile_active = user_subtag.mobile_login_90
# 有外幣帳戶
forex_digital_account = user_subtag.dd_my
# 存戶
account = user_subtag.dd_md
# 純存戶
account_only = user_subtag.onlymd_ind
# 純卡戶
credit_card_only = user_subtag.onlycc_ind
# 卡存戶有e.Fingo指定卡
card_pi_only_ubear = user_subtag.efingo_card_ind
# 有信貸者
personal_loan_account_cust = user_subtag.cl_cpa_amt.notna()
# 無理專顧客
no_fc_cust = user_subtag.fc_ind == 0

In [None]:
user_subtag = user_subtag.sort_values('cust_no')

In [None]:
user_subtag_dict = {}
for i, ids in enumerate(user_subtag.cust_no):
    subtags_list = []
    if mobile_active[i]: subtags_list.append(0);
    if forex_digital_account[i]: subtags_list.append(1);
    if account[i]: subtags_list.append(2);
    if account_only[i]: subtags_list.append(3);        
    if credit_card_only[i]: subtags_list.append(4);
    if card_pi_only_ubear[i]: subtags_list.append(5);
    if personal_loan_account_cust[i]: subtags_list.append(6);
    if no_fc_cust[i]: subtags_list.append(7);
    user_subtag_dict[ids] = subtags_list

### get item-subtag

In [58]:
mapping_df = pd.read_excel('subtag_map.xlsx', sheet_name = 'mapping')

In [63]:
item_subtag_sdf_train_indexed.show(5)

+--------------------+--------+------------------+--------+
|             item_id|    date|   subtag_eng_desc|item_ind|
+--------------------+--------+------------------+--------+
|8236E47D-EC12-4B4...|20211225|03_wealth_view_ind|     9.0|
|8236E47D-EC12-4B4...|20211225| 03_investment_ind|     9.0|
|8236E47D-EC12-4B4...|20211225|   03_account_only|     9.0|
|0266753B-F0CD-460...|20211225|03_wealth_view_ind|    17.0|
|0266753B-F0CD-460...|20211225|     03_no_fc_cust|    17.0|
+--------------------+--------+------------------+--------+
only showing top 5 rows



In [71]:
def lstrip(column, n):
    # should probably add error checking on inputs
    return expr("substring(`{col}`, {n}+1, length(`{col}`)-{n})".format(col=column, n=n))

item_subtag_sdf_train_indexed = item_subtag_sdf_train_indexed.withColumn("subtag_eng_desc", lstrip(column="subtag_eng_desc", n=3))

In [73]:
item_subtag_sdf_train_indexed.show(2)

+--------------------+--------+---------------+--------+-------------------+
|             item_id|    date|subtag_eng_desc|item_ind|new_subtag_eng_desc|
+--------------------+--------+---------------+--------+-------------------+
|8236E47D-EC12-4B4...|20211225|wealth_view_ind|     9.0|    wealth_view_ind|
|8236E47D-EC12-4B4...|20211225| investment_ind|     9.0|     investment_ind|
+--------------------+--------+---------------+--------+-------------------+
only showing top 2 rows



In [None]:
available_items = mapping_df.subtag_03.unique()
item_subtag_sdf_train_indexed = item_subtag_sdf_train_indexed.withColumn("subtag_eng_desc", lstrip(column="subtag_eng_desc", n=3))
item_subtag_sdf_train_indexed = item_subtag_sdf_train_indexed.filter(item_subtag_sdf_train_indexed.subtag_eng_desc.isin(list(available_items)))

In [85]:
item_subtag_sdf_train_indexed.show(5)

+--------------------+--------+--------------------+--------+--------------------+
|             item_id|    date|     subtag_eng_desc|item_ind| new_subtag_eng_desc|
+--------------------+--------+--------------------+--------+--------------------+
|8236E47D-EC12-4B4...|20211225|        account_only|     9.0|        account_only|
|0266753B-F0CD-460...|20211225|          no_fc_cust|    17.0|          no_fc_cust|
|0266753B-F0CD-460...|20211225|             account|    17.0|             account|
|B105ECE3-B0EA-4F0...|20211225|       mobile_active|    18.0|       mobile_active|
|B105ECE3-B0EA-4F0...|20211225|forex_digital_acc...|    18.0|forex_digital_acc...|
+--------------------+--------+--------------------+--------+--------------------+
only showing top 5 rows



In [95]:
mapping_sdf=spark.createDataFrame(mapping_df) 

DataFrame[item_id: string, date: int, subtag_eng_desc: string, item_ind: double, new_subtag_eng_desc: string]

In [99]:
item_subtags = item_subtag_sdf_train_indexed.join(mapping_sdf, item_subtag_sdf_train_indexed.subtag_eng_desc == mapping_sdf.subtag_03, 'left')

In [104]:
item_subtags_df = item_subtags.toPandas()

In [105]:
# item_subtags = pd.merge(item_subtag, mapping_df, how = 'left', left_on = 'subtag_eng_desc', right_on = 'subtag_03', copy = False)
subtags_by_item = item_subtags_df.groupby('item_ind')['code'].unique()
item_subtag_dict = dict(subtags_by_item.apply(list))

In [106]:
item_subtag_dict

{1.0: [2, 7],
 2.0: [4],
 3.0: [1],
 4.0: [1],
 5.0: [6, 0, 1],
 6.0: [5, 4],
 7.0: [2, 7],
 8.0: [2, 7],
 9.0: [3],
 12.0: [2, 4],
 13.0: [2],
 14.0: [2, 3],
 15.0: [5, 3],
 16.0: [4, 3],
 17.0: [2, 7],
 18.0: [6, 0, 1],
 19.0: [2, 4, 3],
 20.0: [5, 4]}

In [None]:
# export_file(context_df_train, test_date, 'context_train.csv')
# export_file(context_df_test, test_date, 'context_test.csv')