In [100]:
import sys
sys.path.append('C:/Users/hoang/PycharmProjects/ReeMo_mini')

In [101]:
import constant
from pyspark.sql import SparkSession
import ctr_model.support_function as support_function

In [102]:
training_file = constant.get_ctr_training_data_file()
spark = SparkSession.builder.appName("train_ctr_model") \
    .config('spark.kryoserializer.buffer.max', '1g').getOrCreate()
df = spark.read.csv(training_file, header=True, inferSchema=True)
names_features = support_function.get_names_features(df)

In [103]:
names_features

['hr',
 'dow',
 'fq',
 'recency',
 'inview_fq',
 'inview_recency',
 'elapsed_time_rt',
 'is_rt_any',
 'is_same_domain',
 'creative_type',
 'os',
 'prob_man',
 'ctr_user',
 'inview_ratio',
 'ctr_slot',
 'iv_ctr_slot',
 'slot_category',
 'slot_site_type',
 'ctr_sp_slot',
 'iv_ctr_sp_slot']

In [105]:
df.show()

+---+---+---+-------+---------+--------------+---------------+---------+--------------+-------------+---+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------+--------------------+--------------------+----------+-------+--------+
| hr|dow| fq|recency|inview_fq|inview_recency|elapsed_time_rt|is_rt_any|is_same_domain|creative_type| os|           prob_man|            ctr_user|        inview_ratio|            ctr_slot|         iv_ctr_slot|slot_category|slot_site_type|         ctr_sp_slot|      iv_ctr_sp_slot|sponsor_id|slot_id|is_click|
+---+---+---+-------+---------+--------------+---------------+---------+--------------+-------------+---+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------+--------------------+--------------------+----------+-------+--------+
|  7|  0|  0|     26|        0|             0|             -1|        0| 

In [106]:
df_processed = support_function.preproc(df)

In [107]:
df_processed.show(n=2)

+---+---+---+-------+---------+--------------+---------------+---------+--------------+-------------+---+------------------+--------+-------------------+--------------------+--------------------+-------------+--------------+--------------------+--------------------+----------+-------+--------+
| hr|dow| fq|recency|inview_fq|inview_recency|elapsed_time_rt|is_rt_any|is_same_domain|creative_type| os|          prob_man|ctr_user|       inview_ratio|            ctr_slot|         iv_ctr_slot|slot_category|slot_site_type|         ctr_sp_slot|      iv_ctr_sp_slot|sponsor_id|slot_id|is_click|
+---+---+---+-------+---------+--------------+---------------+---------+--------------+-------------+---+------------------+--------+-------------------+--------------------+--------------------+-------------+--------------+--------------------+--------------------+----------+-------+--------+
|  7|  0|  0|     26|        0|            -1|             -1|        0|             2|           14|  2|0.63099805

In [108]:
test_size = 0.05
seed = None
train_data, test_data, dic_convmap = support_function.mk_train_test_data(df_processed, constant.MAX_CATEGORIES, test_size, seed)

In [109]:
train_data.take(2)

[Row(features=SparseVector(20, {0: 6.0, 3: -1.0, 5: -1.0, 6: -1.0, 8: 3.0, 9: 0.0, 10: 1.0, 11: 0.6718, 13: 0.0188, 14: 0.0001, 15: 0.0029, 16: 2.0}), is_click=0.0),
 Row(features=SparseVector(20, {0: 6.0, 3: -1.0, 5: -1.0, 6: -1.0, 8: 3.0, 9: 0.0, 10: 1.0, 11: 0.869, 13: 0.0188, 14: 0.0001, 15: 0.0029, 16: 2.0}), is_click=0.0)]

In [110]:
# === Test preprocess ============

In [111]:
null_gender = df['prob_man'].isNull()
null_ctr_user = (df['ctr_user'].isNull()) | (df['ctr_user'] < 1e-10)
unknown_slot = df['inview_ratio'].isNull()
mask_drop = (null_gender & null_ctr_user) | unknown_slot

In [112]:
mask_drop

Column<b'(((prob_man IS NULL) AND ((ctr_user IS NULL) OR (ctr_user < 1.0E-10))) OR (inview_ratio IS NULL))'>

In [43]:
name_features = df.columns[:-3]

In [113]:
df.filter(~mask_drop)

DataFrame[hr: int, dow: int, fq: int, recency: int, inview_fq: int, inview_recency: int, elapsed_time_rt: int, is_rt_any: int, is_same_domain: int, creative_type: int, os: int, prob_man: double, ctr_user: double, inview_ratio: double, ctr_slot: double, iv_ctr_slot: double, slot_category: int, slot_site_type: int, ctr_sp_slot: double, iv_ctr_sp_slot: double, sponsor_id: int, slot_id: int, is_click: int]

In [116]:
from pyspark.sql import functions as F
df = df.na.fill({'prob_man': -1.0, 'ctr_user': -1.0})
df = df.withColumn('recency', F.when(df['recency'] < 1e-10, -1).otherwise(df['recency']))\
    .withColumn('inview_recency', F.when(df['inview_recency'] < 1e-10, -1).otherwise(df['inview_recency']))

In [117]:
# === Make feature vector ===

In [118]:
df.columns

['hr',
 'dow',
 'fq',
 'recency',
 'inview_fq',
 'inview_recency',
 'elapsed_time_rt',
 'is_rt_any',
 'is_same_domain',
 'creative_type',
 'os',
 'prob_man',
 'ctr_user',
 'inview_ratio',
 'ctr_slot',
 'iv_ctr_slot',
 'slot_category',
 'slot_site_type',
 'ctr_sp_slot',
 'iv_ctr_sp_slot',
 'sponsor_id',
 'slot_id',
 'is_click']

In [119]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
df_convert = df.withColumn('is_click', col('is_click').cast(DoubleType()))\
        .withColumnRenamed('slot_category', 'slot_category_org')
df_convert.dtypes

[('hr', 'int'),
 ('dow', 'int'),
 ('fq', 'int'),
 ('recency', 'int'),
 ('inview_fq', 'int'),
 ('inview_recency', 'int'),
 ('elapsed_time_rt', 'int'),
 ('is_rt_any', 'int'),
 ('is_same_domain', 'int'),
 ('creative_type', 'int'),
 ('os', 'int'),
 ('prob_man', 'double'),
 ('ctr_user', 'double'),
 ('inview_ratio', 'double'),
 ('ctr_slot', 'double'),
 ('iv_ctr_slot', 'double'),
 ('slot_category_org', 'int'),
 ('slot_site_type', 'int'),
 ('ctr_sp_slot', 'double'),
 ('iv_ctr_sp_slot', 'double'),
 ('sponsor_id', 'int'),
 ('slot_id', 'int'),
 ('is_click', 'double')]

In [120]:
df_convert.select('slot_category_org').show()

+-----------------+
|slot_category_org|
+-----------------+
|               17|
|               14|
|               27|
|               22|
|               14|
|               17|
|               14|
|               17|
|               17|
|                1|
|               14|
|                1|
|               27|
|                1|
|               27|
|                1|
|               17|
|               21|
|                1|
|               14|
+-----------------+
only showing top 20 rows



In [121]:
from pyspark.ml.feature import StringIndexer
td1 = StringIndexer(inputCol='slot_category_org', outputCol='slot_category')\
        .fit(df_convert).transform(df_convert)
td1.select('slot_category').show()
td1.show()

+-------------+
|slot_category|
+-------------+
|          1.0|
|          2.0|
|          3.0|
|          4.0|
|          2.0|
|          1.0|
|          2.0|
|          1.0|
|          1.0|
|          0.0|
|          2.0|
|          0.0|
|          3.0|
|          0.0|
|          3.0|
|          0.0|
|          1.0|
|          5.0|
|          0.0|
|          2.0|
+-------------+
only showing top 20 rows

+---+---+---+-------+---------+--------------+---------------+---------+--------------+-------------+---+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------+--------------------+--------------------+----------+-------+--------+-------------+
| hr|dow| fq|recency|inview_fq|inview_recency|elapsed_time_rt|is_rt_any|is_same_domain|creative_type| os|           prob_man|            ctr_user|        inview_ratio|            ctr_slot|         iv_ctr_slot|slot_category_org|slot_site_type|         ctr_sp_slot|

In [122]:
from pyspark.ml.feature import VectorAssembler
td2 = VectorAssembler(inputCols=names_features, outputCol='features_inter')\
        .transform(td1)
td2.select('features_inter').show()

+--------------------+
|      features_inter|
+--------------------+
|[7.0,0.0,0.0,26.0...|
|[4.0,0.0,0.0,-1.0...|
|[0.0,0.0,12.0,-1....|
|[21.0,0.0,6.0,2.0...|
|[23.0,0.0,0.0,178...|
|[8.0,0.0,5.0,-1.0...|
|[23.0,0.0,1.0,18....|
|[20.0,0.0,4.0,129...|
|[11.0,0.0,0.0,817...|
|[10.0,0.0,4.0,249...|
|[17.0,0.0,0.0,-1....|
|[21.0,0.0,7.0,531...|
|[6.0,0.0,0.0,3412...|
|[21.0,0.0,2.0,33....|
|[22.0,0.0,1.0,216...|
|[19.0,0.0,3.0,1.0...|
|[0.0,0.0,0.0,-1.0...|
|[10.0,0.0,4.0,112...|
|[9.0,0.0,0.0,-1.0...|
|[8.0,0.0,0.0,-1.0...|
+--------------------+
only showing top 20 rows



In [123]:
from pyspark.ml.feature import VectorIndexer
data_proc = VectorIndexer(inputCol='features_inter', outputCol='features',
                              maxCategories=5).fit(td2)\
        .transform(td2).select('features', 'is_click')

In [124]:
data_proc.show()

+--------------------+--------+
|            features|is_click|
+--------------------+--------+
|[7.0,0.0,0.0,26.0...|     0.0|
|[4.0,0.0,0.0,-1.0...|     0.0|
|[0.0,0.0,12.0,-1....|     0.0|
|[21.0,0.0,6.0,2.0...|     0.0|
|[23.0,0.0,0.0,178...|     0.0|
|[8.0,0.0,5.0,-1.0...|     0.0|
|[23.0,0.0,1.0,18....|     0.0|
|[20.0,0.0,4.0,129...|     0.0|
|[11.0,0.0,0.0,817...|     0.0|
|[10.0,0.0,4.0,249...|     0.0|
|[17.0,0.0,0.0,-1....|     0.0|
|[21.0,0.0,7.0,531...|     0.0|
|[6.0,0.0,0.0,3412...|     0.0|
|[21.0,0.0,2.0,33....|     0.0|
|[22.0,0.0,1.0,216...|     0.0|
|[19.0,0.0,3.0,1.0...|     0.0|
|[0.0,0.0,0.0,-1.0...|     0.0|
|[10.0,0.0,4.0,112...|     0.0|
|[9.0,0.0,0.0,-1.0...|     0.0|
|[8.0,0.0,0.0,-1.0...|     0.0|
+--------------------+--------+
only showing top 20 rows



In [126]:
data_proc

DataFrame[features: vector, is_click: double]

In [127]:
# === Check convmap ===

In [137]:
data_proc.schema.fields[0].metadata['ml_attr']

{'attrs': {'binary': [{'idx': 7, 'name': 'is_rt_any', 'vals': ['0.0', '1.0']}],
  'nominal': [{'idx': 1, 'name': 'dow', 'ord': False, 'vals': ['0.0']},
   {'idx': 8,
    'name': 'is_same_domain',
    'ord': False,
    'vals': ['0.0', '1.0', '2.0', '3.0', '4.0']},
   {'idx': 9, 'name': 'creative_type', 'ord': False, 'vals': ['14.0']},
   {'idx': 10,
    'name': 'os',
    'ord': False,
    'vals': ['0.0', '1.0', '2.0', '3.0', '4.0']},
   {'idx': 16,
    'name': 'slot_category',
    'vals': ['1',
     '17',
     '14',
     '27',
     '22',
     '21',
     '11',
     '12',
     '7',
     '23',
     '25',
     '29',
     '9',
     '8',
     '10',
     '13',
     '15',
     '16',
     '24']},
   {'idx': 17, 'name': 'slot_site_type', 'ord': False, 'vals': ['0.0']}],
  'numeric': [{'idx': 0, 'name': 'hr'},
   {'idx': 2, 'name': 'fq'},
   {'idx': 3, 'name': 'recency'},
   {'idx': 4, 'name': 'inview_fq'},
   {'idx': 5, 'name': 'inview_recency'},
   {'idx': 6, 'name': 'elapsed_time_rt'},
   {'idx

In [215]:
dic_attr = [field.metadata['ml_attr']['attrs']
                for field in data_proc.schema.fields
                if field.name == 'features'][0]
list_attr = dic_attr['binary'] + dic_attr['nominal']
dic_convmap = {d['name']: {int(eval(v)): float(i)
                               for i, v in enumerate(d['vals'])}
                   for d in list_attr}
dic_convmap

{'creative_type': {14: 0.0},
 'dow': {0: 0.0},
 'is_rt_any': {0: 0.0, 1: 1.0},
 'is_same_domain': {0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0},
 'os': {0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0},
 'slot_category': {1: 0.0,
  7: 8.0,
  8: 13.0,
  9: 12.0,
  10: 14.0,
  11: 6.0,
  12: 7.0,
  13: 15.0,
  14: 2.0,
  15: 16.0,
  16: 17.0,
  17: 1.0,
  21: 5.0,
  22: 4.0,
  23: 9.0,
  24: 18.0,
  25: 10.0,
  27: 3.0,
  29: 11.0},
 'slot_site_type': {0: 0.0}}

In [220]:
list_attr = dic_attr['binary'] + dic_attr['nominal']
conv = {d['name']: {eval(category_name):float(index) for index, category_name in enumerate(d['vals'])} for d in list_attr}
conv

{'creative_type': {14.0: 0.0},
 'dow': {0.0: 0.0},
 'is_rt_any': {0.0: 0.0, 1.0: 1.0},
 'is_same_domain': {0.0: 0.0, 1.0: 1.0, 2.0: 2.0, 3.0: 3.0, 4.0: 4.0},
 'os': {0.0: 0.0, 1.0: 1.0, 2.0: 2.0, 3.0: 3.0, 4.0: 4.0},
 'slot_category': {1: 0.0,
  7: 8.0,
  8: 13.0,
  9: 12.0,
  10: 14.0,
  11: 6.0,
  12: 7.0,
  13: 15.0,
  14: 2.0,
  15: 16.0,
  16: 17.0,
  17: 1.0,
  21: 5.0,
  22: 4.0,
  23: 9.0,
  24: 18.0,
  25: 10.0,
  27: 3.0,
  29: 11.0},
 'slot_site_type': {0.0: 0.0}}