# init

## imports

In [1]:
import matplotlib.pyplot as plt
import pandas as pd
import warnings

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import Window as W

warnings.filterwarnings('ignore')

## configs

In [2]:
SPARK_THREADS = 16
SPARK_MEMORY = 16
SPARK_TIMEZONE = 'America/Vancouver'

In [3]:
BASE_PATH = ('/home/shaghayegh/class/ad_click/data/')
USER_DATA_PATH = BASE_PATH + 'raw_data/user_profile.csv'
AD_CLICK_DATA_PATH = BASE_PATH + 'raw_data/raw_sample.csv'
AD_INFO_DATA_PATH = BASE_PATH + 'raw_data/ad_feature.csv'

MERGED_DATA_PATH = BASE_PATH + 'merged.parquet'
DATASET_PATH = BASE_PATH + 'dataset.parquet'

## spark instantiation

In [4]:
spark = (
    SparkSession 
    .builder 
    .master('local[{}]'.format(SPARK_THREADS)) 
    .config('spark.driver.memory', '{}g'.format(SPARK_MEMORY)) 
    .config('spark.sql.session.timeZone', SPARK_TIMEZONE) 
    .getOrCreate()
)

24/04/08 08:59:19 WARN Utils: Your hostname, shaghayegh-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.0.66 instead (on interface wlp92s0)
24/04/08 08:59:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/08 08:59:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## util

In [5]:
def summerize_df(df):
    print(df.count())
    df.show(3)

# load data

In [6]:
user_df = spark.read.csv(USER_DATA_PATH, header = True, inferSchema=True)
summerize_df(user_df)

1061768
+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |
+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|   234|        0|           5|                2|        5|        NULL|             3|         0|                    3|
|   523|        5|           2|                2|        2|           1|             3|         1|                    2|
|   612|        0|           8|                1|        2|           2|             3|         0|                 NULL|
+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
only showing top 3 rows



In [7]:
ad_info_df = spark.read.csv(AD_INFO_DATA_PATH, header = True, inferSchema=True)
summerize_df(ad_info_df)

846811
+----------+-------+-----------+--------+-----+-----+
|adgroup_id|cate_id|campaign_id|customer|brand|price|
+----------+-------+-----------+--------+-----+-----+
|     63133|   6406|      83237|       1|95471|170.0|
|    313401|   6406|      83237|       1|87331|199.0|
|    248909|    392|      83237|       1|32233| 38.0|
+----------+-------+-----------+--------+-----+-----+
only showing top 3 rows



In [8]:
ad_click_df = spark.read.csv(AD_CLICK_DATA_PATH, header = True, inferSchema=True)
summerize_df(ad_click_df)

[Stage 14:>                                                       (0 + 16) / 16]

26557961
+------+----------+----------+-----------+------+---+
|  user|time_stamp|adgroup_id|        pid|nonclk|clk|
+------+----------+----------+-----------+------+---+
|581738|1494137644|         1|430548_1007|     1|  0|
|449818|1494638778|         3|430548_1007|     1|  0|
|914836|1494650879|         4|430548_1007|     1|  0|
+------+----------+----------+-----------+------+---+
only showing top 3 rows



                                                                                

# join

In [10]:
ad_click_df = ad_click_df.withColumnRenamed('user', 'userid')

ad_click_user_df = ad_click_df.join(user_df, on='userid')
merged_df = ad_click_user_df.join(ad_info_df, on='adgroup_id')

merged_df.write.partitionBy('age_level', 'final_gender_code').parquet(MERGED_DATA_PATH)
summerize_df(merged_df)

                                                                                

25029435




+----------+-------+----------+-----------+------+---+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+-------+-----------+--------+------+-----+
|adgroup_id| userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand|price|
+----------+-------+----------+-----------+------+---+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+-------+-----------+--------+------+-----+
|        31| 989702|1494097392|430539_1007|     1|  0|       79|          10|                1|        4|           2|             3|         0|                    1|   8633|     398003|   52607|  NULL| 11.0|
|        34| 838097|1494478055|430539_1007|     1|  0|       81|          10|                1|        4|           2|             3|         0|                    

                                                                                

In [7]:
merged_df = spark.read.parquet(MERGED_DATA_PATH).withColumn('time', F.from_unixtime('time_stamp'))
merged_df.persist()
summerize_df(merged_df)

                                                                                

25029435
+----------+------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+-----+-----+---------+-----------------+-------------------+
|adgroup_id|userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer|brand|price|age_level|final_gender_code|               time|
+----------+------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+-----+-----+---------+-----------------+-------------------+
|       107|753380|1494597575|430548_1007|     1|  0|        0|           3|        NULL|             3|         0|                    4|    619|     115758|    2461| NULL| 33.6|        3|                2|2017-05-12 06:59:35|
|       107|479850|1494159370|430539_1007|     1|  0|        0|           3|       

# features preparation

## userid features

In [8]:
def get_user_features(df):
    window = W.partitionBy('userid').orderBy('time').rowsBetween(W.unboundedPreceding,-1)
    
    return (
        df
        .withColumn('user_ad_count', F.count('clk').over(window))
        .withColumn('user_ad_clk_count', F.sum('clk').over(window))
    )
    
users_features_df = get_user_features(merged_df)
users_features_df.select('userid', 'time', 'user_ad_count', 'user_ad_clk_count').sort('userid', 'time').show(50)



+------+-------------------+-------------+-----------------+
|userid|               time|user_ad_count|user_ad_clk_count|
+------+-------------------+-------------+-----------------+
|     1|2017-05-10 21:50:36|            0|             NULL|
|     1|2017-05-11 23:45:25|            1|                0|
|     1|2017-05-11 23:45:25|            2|                0|
|     1|2017-05-11 23:45:25|            3|                0|
|     2|2017-05-08 06:33:12|            0|             NULL|
|     2|2017-05-08 06:33:12|            1|                1|
|     2|2017-05-08 06:33:12|            2|                1|
|     3|2017-05-07 16:07:35|            0|             NULL|
|     3|2017-05-07 16:07:35|            1|                0|
|     3|2017-05-07 16:41:33|            2|                1|
|     3|2017-05-07 16:41:33|            3|                1|
|     3|2017-05-07 16:41:33|            4|                1|
|     3|2017-05-10 17:44:04|            5|                1|
|     3|2017-05-10 17:44

                                                                                

## adgroup features

In [9]:
def get_ad_features(df):
    window = W.partitionBy('adgroup_id').orderBy('time').rowsBetween(W.unboundedPreceding,-1)
    
    return (
        df
        .withColumn('adgroup_count', F.count('clk').over(window))
        .withColumn('adgroup_clk_count', F.sum('clk').over(window))
    )
    
ad_features_df = get_ad_features(users_features_df)
(
    ad_features_df
    .select('adgroup_id', 'time', 'adgroup_count', 'adgroup_clk_count')
    .sort('adgroup_id', 'time')
    .show(30)
)



+----------+-------------------+-------------+-----------------+
|adgroup_id|               time|adgroup_count|adgroup_clk_count|
+----------+-------------------+-------------+-----------------+
|         1|2017-05-06 23:14:04|            0|             NULL|
|         2|2017-05-08 17:38:53|            0|             NULL|
|         2|2017-05-09 05:38:06|            1|                0|
|         2|2017-05-10 00:33:38|            2|                0|
|         2|2017-05-12 18:08:46|            3|                0|
|         2|2017-05-12 18:43:57|            4|                0|
|         2|2017-05-12 21:21:21|            5|                0|
|         4|2017-05-12 21:47:59|            0|             NULL|
|         5|2017-05-12 21:50:29|            0|             NULL|
|         6|2017-05-12 23:27:30|            0|             NULL|
|         7|2017-05-06 20:57:22|            0|             NULL|
|         7|2017-05-09 03:06:09|            1|                0|
|         7|2017-05-10 07

                                                                                

## campaign features

In [10]:
def get_campaign_features(df):
    window = W.partitionBy('campaign_id').orderBy('time').rowsBetween(W.unboundedPreceding,-1)
    
    return (
        df
        .withColumn('campaign_count', F.count('clk').over(window))
        .withColumn('campaign_clk_count', F.sum('clk').over(window))
    )
    
campaign_features_df = get_campaign_features(ad_features_df)
campaign_features_df.show()

24/03/28 09:10:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+-----+-----+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------+--------------+------------------+
|adgroup_id| userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer|brand|price|age_level|final_gender_code|               time|user_ad_count|user_ad_clk_count|adgroup_count|adgroup_clk_count|campaign_count|campaign_clk_count|
+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+-----+-----+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------+--------------+------------------+
|    698868| 803506|14

                                                                                

## category features

In [11]:
def get_category_features(df):
    window = W.partitionBy('cate_id').orderBy('time').rowsBetween(W.unboundedPreceding,-1)
    
    return (
        df
        .withColumn('cate_count', F.count('clk').over(window))
        .withColumn('cate_clk_count', F.sum('clk').over(window))
    )
    
category_features_df = get_category_features(campaign_features_df)
category_features_df.show()

                                                                                

+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------+--------------+------------------+----------+--------------+
|adgroup_id| userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand| price|age_level|final_gender_code|               time|user_ad_count|user_ad_clk_count|adgroup_count|adgroup_clk_count|campaign_count|campaign_clk_count|cate_count|cate_clk_count|
+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------

## category & gender features

In [12]:
def get_cat_gender_features(df):
    window = W.partitionBy('cate_id', 'final_gender_code').orderBy('time').rowsBetween(W.unboundedPreceding,-1)
    
    return (
        df
        .withColumn('cat_gender_ad_count', F.count('clk').over(window))
        .withColumn('cat_gender_ad_clk_count', F.sum('clk').over(window))
    )
    
cat_gender_features_df =  get_cat_gender_features(category_features_df)
cat_gender_features_df.show()

[Stage 51:>                                                         (0 + 1) / 1]

+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------+--------------+------------------+----------+--------------+-------------------+-----------------------+
|adgroup_id| userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand| price|age_level|final_gender_code|               time|user_ad_count|user_ad_clk_count|adgroup_count|adgroup_clk_count|campaign_count|campaign_clk_count|cate_count|cate_clk_count|cat_gender_ad_count|cat_gender_ad_clk_count|
+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+---------+-------------

                                                                                

## category & age features

In [13]:
def get_cat_age_features(df):
    window = W.partitionBy('cate_id', 'age_level').orderBy('time').rowsBetween(W.unboundedPreceding,-1)
    
    return (
        df
        .withColumn('cat_age_ad_count', F.count('clk').over(window))
        .withColumn('cat_age_ad_clk_count', F.sum('clk').over(window))
    )
    
cat_age_features_df =  get_cat_age_features(cat_gender_features_df)
cat_age_features_df.show()

[Stage 66:>                                                         (0 + 1) / 1]

+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------+--------------+------------------+----------+--------------+-------------------+-----------------------+----------------+--------------------+
|adgroup_id| userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand| price|age_level|final_gender_code|               time|user_ad_count|user_ad_clk_count|adgroup_count|adgroup_clk_count|campaign_count|campaign_clk_count|cate_count|cate_clk_count|cat_gender_ad_count|cat_gender_ad_clk_count|cat_age_ad_count|cat_age_ad_clk_count|
+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+------------

                                                                                

## gender, age & ad features

In [15]:
def get_gender_age_ad_features(df):
    window = (
        W.partitionBy('adgroup_id', 'age_level', 'final_gender_code')
        .orderBy('time')
        .rowsBetween(W.unboundedPreceding,-1)
    )
    return (
        df
        .withColumn('gender_age_ad_count', F.count('clk').over(window))
        .withColumn('gender_age_ad_clk_count', F.sum('clk').over(window))
    )
    
gender_age_ad_features_df =  get_gender_age_ad_features(cat_age_features_df)
gender_age_ad_features_df.show()



+----------+-------+----------+-----------+------+---+---------+------------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+---------+-----------------+-------------------+-------------+-----------------+-------------+-----------------+--------------+------------------+----------+--------------+-------------------+-----------------------+----------------+--------------------+-------------------+-----------------------+
|adgroup_id| userid|time_stamp|        pid|nonclk|clk|cms_segid|cms_group_id|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand| price|age_level|final_gender_code|               time|user_ad_count|user_ad_clk_count|adgroup_count|adgroup_clk_count|campaign_count|campaign_clk_count|cate_count|cate_clk_count|cat_gender_ad_count|cat_gender_ad_clk_count|cat_age_ad_count|cat_age_ad_clk_count|gender_age_ad_count|gender_age_ad_clk_count|
+----------+-------+----------+---------

                                                                                

# save dataset

In [16]:
gender_age_ad_features_df.write.parquet(DATASET_PATH)

                                                                                