In [14]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.sql.functions import col
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F
import json
import pickle
import preproc_data

In [2]:
MODEL_VERSION_INFO = {
    'v2p1_7_40':{
        'model_location': 'C:/Users/hoang/Desktop/check_25f_abnormal/',
        # 'model_location': 'gs://reemo/models/dev/test_duong/test_j14/',
        'model_folder': '20180430_v2p1_7_40',
        'func_feature_names': preproc_data.get_names_features_v2p1,
        'func_extract_column': preproc_data.extract_column_to_feature
    },
    'v2p1p2_10_80_25f':{
        'model_location': 'C:/Users/hoang/Desktop/check_25f_abnormal/',
        # 'model_location': 'gs://reemo/models/dev/test_duong/test_j14/',
        'model_folder': '20180430_v2p1p2_10_80_25f_remake',
        'func_feature_names': preproc_data.get_names_features_v2p1p2,
        'func_extract_column': preproc_data.extract_column_to_feature
    },
    'v2p1p2_8_80_25f':{
        # 'model_location': 'C:/Users/hoang/Desktop/check_25f_abnormal/',
        'model_location': 'gs://reemo/models/dev/test_duong/test_j14/',
        'model_folder': '20180430_v2p1p2_8_80_25f',
        'func_feature_names': preproc_data.get_names_features_v2p1p2,
        'func_extract_column': preproc_data.extract_column_to_feature
    }
}

In [3]:
MODEL_FOLDER = 'ctr_model/model/'

In [4]:
def make_feature(df, version_name, convmap):
    for k in convmap:
        df = df.withColumn(k + '_' + version_name, categorical_conv(convmap[k]))


def get_convmap_dics(model_version, model_date):
    model_location = _get_model_version_folder(model_version)
    fn = 'convmap_ctr_model_%s.json' % model_date
    local_fn = '%s_%s' % (model_version, fn)
    gcs_fn = os.path.join(model_location, fn).replace('\\', '/')
    print(local_fn)
    print(gcs_fn)
    os.system('gsutil cp %s %s' % (gcs_fn, local_fn))
    return json.load(open(local_fn))


def convert_name_features(name_features, version, category_fields):
    return [field if field not in category_fields else field + '_' + version for field in name_features]


def _get_model_version_folder(model_version):
    version_infor = MODEL_VERSION_INFO[model_version]
    return os.path.join(
        version_infor['model_location'], MODEL_FOLDER, version_infor['model_folder']
    ).replace('\\', '/')


def get_model(model_version, spid, model_date):
    model_version_location = _get_model_version_folder(model_version)
    model_path = os.path.join(model_version_location, 'ctr_model_spid%d_%s' % (spid, model_date)).replace('\\', '/')
    return RandomForestClassificationModel.load(model_path)


def categorical_conv(convmap):
    f = lambda x: convmap.get(str(x), -1.0)
    return UserDefinedFunction(f, DoubleType())


def predict_with_multiple_version(df, versions, model_date, spid):
    columns = df.columns
    for version_name in versions:
        version_infor = MODEL_VERSION_INFO[version_name]
        convmaps = get_convmap_dics(version_name, model_date)
        for k in convmaps[str(spid)].keys():
            df = df.withColumn(k + '_' + version_name, categorical_conv(convmaps[str(spid)][k])(col(k)))
        name_features = version_infor['func_feature_names'](df)
        name_features = convert_name_features(name_features, version_name, list(convmaps[str(spid)]))
        df = VectorAssembler(inputCols=name_features, outputCol='features_%s' % version_name).transform(df)
    print(df.columns)
    predicted_list = []
    for version_name in versions:
        model = get_model(version_name, spid, model_date)
        prob_col_name = 'prob_%s' % version_name
        df = df.withColumn('features', col('features_%s' % version_name))
        df = model.transform(df).withColumn(prob_col_name, UserDefinedFunction(lambda x: x.tolist()[1], DoubleType())(
            col('probability')))
        predicted_list.append(version_name)
        df = df.select(columns + ['prob_%s' % v for v in predicted_list] + ['features_%s' % v for v in versions])
    df = df.select(columns + ['prob_%s' % v for v in versions])
    return df

In [6]:
spark = SparkSession.builder.appName("calc_pred_ctr_slot").getOrCreate()
versions = ['v2p1_7_40']
sponsor_id = 46
# model_name = '20180430_v2p1p2_8_80_25f'
model_date = '20180430'

df = spark.read.csv(
    'C:/Users/hoang/Desktop/check_25f_abnormal/ctr_model/training_data/spid%d/20180430_20180506' % sponsor_id,
    header=True, inferSchema=True)
df = preproc_data.preproc_v2p1(df)

df = predict_with_multiple_version(df=df, versions=versions, model_date=model_date, spid=sponsor_id).cache()

v2p1_7_40_convmap_ctr_model_20180430.json
C:/Users/hoang/Desktop/check_25f_abnormal/ctr_model/model/20180430_v2p1_7_40/convmap_ctr_model_20180430.json


['hr', 'dow', 'fq', 'recency', 'inview_fq', 'inview_recency', 'elapsed_time_rt', 'is_rt_any', 'is_same_domain', 'creative_type', 'os', 'prob_man', 'ctr_user', 'inview_ratio', 'ctr_slot', 'iv_ctr_slot', 'slot_category', 'slot_site_type', 'ctr_sp_slot', 'iv_ctr_sp_slot', 'prob_man_stats', 'uu_ratio', 'ctr_user_avg', 'hour_stats', 'dow_stats', 'slot_sponsor_rt_stats', 'slot_sponsor_cv_stats', 'sponsor_rt_probman_stats', 'sponsor_cv_probman_stats', 'sponsor_id', 'ssp_id', 'slot_id', 'dsp_id', 'is_click', 'creative_type_v2p1_7_40', 'is_rt_any_v2p1_7_40', 'os_v2p1_7_40', 'is_same_domain_v2p1_7_40', 'slot_category_v2p1_7_40', 'slot_site_type_v2p1_7_40', 'features_v2p1_7_40']


In [9]:
df.columns

['hr',
 'dow',
 'fq',
 'recency',
 'inview_fq',
 'inview_recency',
 'elapsed_time_rt',
 'is_rt_any',
 'is_same_domain',
 'creative_type',
 'os',
 'prob_man',
 'ctr_user',
 'inview_ratio',
 'ctr_slot',
 'iv_ctr_slot',
 'slot_category',
 'slot_site_type',
 'ctr_sp_slot',
 'iv_ctr_sp_slot',
 'prob_man_stats',
 'uu_ratio',
 'ctr_user_avg',
 'hour_stats',
 'dow_stats',
 'slot_sponsor_rt_stats',
 'slot_sponsor_cv_stats',
 'sponsor_rt_probman_stats',
 'sponsor_cv_probman_stats',
 'sponsor_id',
 'ssp_id',
 'slot_id',
 'dsp_id',
 'is_click',
 'prob_v2p1_7_40']

In [12]:
df.select('ssp_id', 'slot_id', 'prob_%s'%versions[0]).groupBy('ssp_id', 'slot_id').avg('prob_%s'%versions[0]).show()

+------+-------+--------------------+
|ssp_id|slot_id| avg(prob_v2p1_7_40)|
+------+-------+--------------------+
|     1|  38205|9.831650944377621E-4|
|     1|  40293|0.002503490912265927|
|     1|  37893|1.274543035737063E-4|
|     1|  23534|1.518064553194205...|
|     1|  30036|3.318074029513365E-4|
|     1|  33195|1.217768771785300...|
|     1|  19607|8.758319749406891E-5|
|     1|  36063|0.001548419034140...|
|     1|  30234| 7.06011883398962E-4|
|     1|  31977|0.001561437993093...|
|     1|  21739|1.886742777528509...|
|     1|  27833|0.001159563807774...|
|     1|  28678|8.080484093612346E-4|
|     1|  36142|0.001090145112101...|
|     1|  32899|0.001310493652108...|
|     1|  27973|0.001628243178627917|
|     1|  29426|1.005185086303680...|
|     1|  28679|8.744358974527624E-5|
|     1|  37894|2.032425638188467...|
|     1|  41436|5.375235095516819E-4|
+------+-------+--------------------+
only showing top 20 rows



In [15]:
df.select('ssp_id', 'slot_id', 'prob_%s'%versions[0]).groupBy('ssp_id', 'slot_id').agg(F.avg(col('prob_%s'%versions[0])).alias('ctr_avg')).show()

+------+-------+--------------------+
|ssp_id|slot_id|             ctr_avg|
+------+-------+--------------------+
|     1|  38205|9.831650944377621E-4|
|     1|  40293|0.002503490912265927|
|     1|  37893|1.274543035737063E-4|
|     1|  23534|1.518064553194205...|
|     1|  30036|3.318074029513365E-4|
|     1|  33195|1.217768771785300...|
|     1|  19607|8.758319749406891E-5|
|     1|  36063|0.001548419034140...|
|     1|  30234| 7.06011883398962E-4|
|     1|  31977|0.001561437993093...|
|     1|  21739|1.886742777528509...|
|     1|  27833|0.001159563807774...|
|     1|  28678|8.080484093612346E-4|
|     1|  36142|0.001090145112101...|
|     1|  32899|0.001310493652108...|
|     1|  27973|0.001628243178627917|
|     1|  29426|1.005185086303680...|
|     1|  28679|8.744358974527624E-5|
|     1|  37894|2.032425638188467...|
|     1|  41436|5.375235095516819E-4|
+------+-------+--------------------+
only showing top 20 rows

