In [8]:
OUTPUT_BUCKET_FOLDER = "gs://kaggle-ocp-data/output/"
DATA_BUCKET_FOLDER = "gs://kaggle-ocp-data/data/"

In [9]:
from IPython.display import display
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrameWriter
import numpy as np
import math
import datetime
import time
import pandas as pd

In [10]:
import random
random.seed(42)

# Loading data

# documents

In [11]:
documents_meta_schema = StructType(
                    [StructField("document_id_doc", IntegerType(), True),
                    StructField("source_id", IntegerType(), True),                    
                    StructField("publisher_id", IntegerType(), True),
                    StructField("publish_time", TimestampType(), True)]
                    )

documents_meta_df = spark.read.schema(documents_meta_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_meta.csv") \
                .withColumn('dummyDocumentsMeta', F.lit(1)).alias('documents_meta')

In [12]:
print((documents_meta_df.count(), len(documents_meta_df.columns)))

(2999334, 5)


In [13]:
documents_meta_df.show(10)

+---------------+---------+------------+--------------------+------------------+
|document_id_doc|source_id|publisher_id|        publish_time|dummyDocumentsMeta|
+---------------+---------+------------+--------------------+------------------+
|        1595802|        1|         603|2016-06-05 00:00:...|                 1|
|        1524246|        1|         603|2016-05-26 11:00:...|                 1|
|        1617787|        1|         603|2016-05-27 00:00:...|                 1|
|        1615583|        1|         603|2016-06-07 00:00:...|                 1|
|        1615460|        1|         603|2016-06-20 00:00:...|                 1|
|        1615354|        1|         603|2016-06-10 00:00:...|                 1|
|        1614611|        1|         603|2016-06-05 13:00:...|                 1|
|        1614235|        1|         603|2016-06-09 00:00:...|                 1|
|        1614225|        1|         603|2016-06-09 00:00:...|                 1|
|        1488264|        1| 

In [14]:
documents_meta_df.groupBy('dummyDocumentsMeta').count().show()

+------------------+-------+
|dummyDocumentsMeta|  count|
+------------------+-------+
|                 1|2999334|
+------------------+-------+



In [15]:
documents_categories_schema = StructType(
                    [StructField("document_id_cat", IntegerType(), True),
                    StructField("category_id", IntegerType(), True),                    
                    StructField("confidence_level_cat", FloatType(), True)]
                    )

documents_categories_df = spark.read.schema(documents_categories_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_categories.csv") \
                .alias('documents_categories')

In [16]:
print((documents_categories_df.count(), len(documents_categories_df.columns)))

(5481475, 3)


In [17]:
documents_categories_df.show(10)

+---------------+-----------+--------------------+
|document_id_cat|category_id|confidence_level_cat|
+---------------+-----------+--------------------+
|        1595802|       1611|                0.92|
|        1595802|       1610|                0.07|
|        1524246|       1807|                0.92|
|        1524246|       1608|                0.07|
|        1617787|       1807|                0.92|
|        1617787|       1608|                0.07|
|        1615583|       1305|                0.92|
|        1615583|       1806|                0.07|
|        1615460|       1613|           0.5406464|
|        1615460|       1603|          0.04113614|
+---------------+-----------+--------------------+
only showing top 10 rows



In [18]:
documents_categories_grouped_df = documents_categories_df.groupBy('document_id_cat') \
                                            .agg(F.collect_list('category_id').alias('category_id_list'),
                                                 F.collect_list('confidence_level_cat').alias('cat_confidence_level_list')) \
                                            .withColumn('dummyDocumentsCategory', F.lit(1)) \
                                            .alias('documents_categories_grouped')

In [19]:
print((documents_categories_grouped_df.count(), len(documents_categories_grouped_df.columns)))

(2828649, 4)


In [20]:
documents_categories_grouped_df.show(10)

+---------------+----------------+-------------------------+----------------------+
|document_id_cat|category_id_list|cat_confidence_level_list|dummyDocumentsCategory|
+---------------+----------------+-------------------------+----------------------+
|            148|    [1403, 1702]|             [0.92, 0.07]|                     1|
|            463|    [1513, 1808]|     [0.8932095, 0.067...|                     1|
|            471|    [1504, 1609]|             [0.92, 0.07]|                     1|
|            496|    [1210, 1203]|             [0.92, 0.07]|                     1|
|            833|    [1305, 2004]|             [0.92, 0.07]|                     1|
|           1088|    [2006, 1210]|     [0.8364613, 0.063...|                     1|
|           1238|    [1100, 1407]|     [0.34836665, 0.02...|                     1|
|           1342|    [1408, 2004]|     [0.42835742, 0.03...|                     1|
|           1580|    [1403, 1402]|     [0.65625566, 0.04...|                

In [21]:
documents_categories_grouped_df.groupBy('dummyDocumentsCategory').count().show(20, False)

+----------------------+-------+
|dummyDocumentsCategory|count  |
+----------------------+-------+
|1                     |2828649|
+----------------------+-------+



In [22]:
documents_topics_schema = StructType(
                    [StructField("document_id_top", IntegerType(), True),
                    StructField("topic_id", IntegerType(), True),                    
                    StructField("confidence_level_top", FloatType(), True)]
                    )

documents_topics_df = spark.read.schema(documents_topics_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_topics.csv")  \
                .alias('documents_topics')
    
documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \
                                            .agg(F.collect_list('topic_id').alias('topic_id_list'),
                                                 F.collect_list('confidence_level_top').alias('top_confidence_level_list')) \
                                            .withColumn('dummyDocumentsTopics', F.lit(1)) \
                                            .alias('documents_topics_grouped')

In [23]:
print((documents_topics_df.count(), len(documents_topics_df.columns)))

(11325960, 3)


In [24]:
documents_topics_df.show(10)

+---------------+--------+--------------------+
|document_id_top|topic_id|confidence_level_top|
+---------------+--------+--------------------+
|        1595802|     140|          0.07311316|
|        1595802|      16|         0.059416488|
|        1595802|     143|         0.045420755|
|        1595802|     170|          0.03886743|
|        1524246|     113|           0.1964504|
|        1524246|     260|          0.14287816|
|        1524246|      92|          0.03315913|
|        1524246|     168|        0.0140903415|
|        1524246|      54|          0.00878222|
|        1524246|     207|         0.008282372|
+---------------+--------+--------------------+
only showing top 10 rows



In [25]:
documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \
                                            .agg(F.collect_list('topic_id').alias('topic_id_list'),
                                                 F.collect_list('confidence_level_top').alias('top_confidence_level_list')) \
                                            .withColumn('dummyDocumentsTopics', F.lit(1)) \
                                            .alias('documents_topics_grouped')

In [26]:
print((documents_topics_grouped_df.count(), len(documents_topics_grouped_df.columns)))

(2495423, 4)


In [27]:
documents_topics_grouped_df.show(10)

+---------------+--------------------+-------------------------+--------------------+
|document_id_top|       topic_id_list|top_confidence_level_list|dummyDocumentsTopics|
+---------------+--------------------+-------------------------+--------------------+
|            148|[153, 140, 8, 172...|     [0.07523697, 0.07...|                   1|
|            463|[181, 292, 24, 25...|     [0.11870128, 0.05...|                   1|
|            471|[285, 238, 153, 193]|     [0.15588789, 0.04...|                   1|
|            496|[244, 294, 196, 1...|     [0.18284231, 0.11...|                   1|
|            833|[294, 89, 174, 86...|     [0.11430275, 0.04...|                   1|
|           1088|[107, 75, 153, 64...|     [0.10822894, 0.06...|                   1|
|           1238| [89, 221, 192, 236]|     [0.023348164, 0.0...|                   1|
|           1342|[271, 283, 181, 2...|     [0.0457309, 0.025...|                   1|
|           1580|[8, 37, 136, 12, ...|     [0.08965496

In [28]:
documents_topics_grouped_df.groupBy('dummyDocumentsTopics').count().show(20, False)

+--------------------+-------+
|dummyDocumentsTopics|count  |
+--------------------+-------+
|1                   |2495423|
+--------------------+-------+



In [29]:
documents_entities_schema = StructType(
                    [StructField("document_id_ent", IntegerType(), True),
                    StructField("entity_id", StringType(), True),                    
                    StructField("confidence_level_ent", FloatType(), True)]
                    )

documents_entities_df = spark.read.schema(documents_entities_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_entities.csv")  \
                .alias('documents_entities')

In [30]:
print((documents_entities_df.count(), len(documents_entities_df.columns)))

(5537552, 3)


In [31]:
documents_entities_df.show(10)

+---------------+--------------------+--------------------+
|document_id_ent|           entity_id|confidence_level_ent|
+---------------+--------------------+--------------------+
|        1524246|f9eec25663db4cd83...|          0.67286533|
|        1524246|55ebcfbdaff1d6f60...|           0.3991137|
|        1524246|839907a972930b17b...|          0.39209574|
|        1524246|04d8f9a1ad48f126d...|          0.21399638|
|        1617787|612a1d17685a498af...|          0.38619283|
|        1617787|fb8c6cb0879e0de87...|          0.36411646|
|        1617787|793c6a6cf386edb82...|          0.34916824|
|        1617787|b525b84d5ed52a345...|          0.28700453|
|        1617787|758cb9cb3014607cb...|          0.23795699|
|        1617787|d523aaba6d3916f8b...|          0.23579852|
+---------------+--------------------+--------------------+
only showing top 10 rows



In [32]:
documents_entities_grouped_df = documents_entities_df.groupBy('document_id_ent') \
                                            .agg(F.collect_list('entity_id').alias('entity_id_list'),
                                                 F.collect_list('confidence_level_ent').alias('ent_confidence_level_list')) \
                                            .withColumn('dummyDocumentsEntities', F.lit(1)) \
                                            .alias('documents_entities_grouped')

In [33]:
print((documents_entities_grouped_df.count(), len(documents_entities_grouped_df.columns)))

(1791420, 4)


In [34]:
documents_entities_grouped_df.show(10)

+---------------+--------------------+-------------------------+----------------------+
|document_id_ent|      entity_id_list|ent_confidence_level_list|dummyDocumentsEntities|
+---------------+--------------------+-------------------------+----------------------+
|            148|[e1c74838563ef5d2...|     [0.6320258, 0.404...|                     1|
|            463|[aaa0246895d43735...|              [0.6939791]|                     1|
|            496|[0ffa5e294bd46905...|              [0.3608937]|                     1|
|            833|[430da13f06eed7d5...|     [0.5932388, 0.240...|                     1|
|           1088|[94101adfc2f6bccb...|              [0.9564353]|                     1|
|           1580|[86b630e436676e43...|     [0.92001617, 0.44...|                     1|
|           1645|[976e5e062b216f23...|     [0.66670954, 0.61...|                     1|
|           1959|[806f6ef8cca7644d...|             [0.31478134]|                     1|
|           2122|[bad3651e69ae38

In [35]:
documents_entities_grouped_df.groupBy('dummyDocumentsEntities').count().show(20, False)

+----------------------+-------+
|dummyDocumentsEntities|count  |
+----------------------+-------+
|1                     |1791420|
+----------------------+-------+



In [36]:
documents_df = documents_meta_df.join(documents_categories_grouped_df, on=F.col("document_id_doc") == F.col("documents_categories_grouped.document_id_cat"), how='left') \
                         .join(documents_topics_grouped_df, on=F.col("document_id_doc") == F.col("documents_topics_grouped.document_id_top"), how='left') \
                         .join(documents_entities_grouped_df, on=F.col("document_id_doc") == F.col("documents_entities_grouped.document_id_ent"), how='left') \
                         .cache()

In [37]:
print((documents_df.count(), len(documents_df.columns)))

(2999334, 17)


In [86]:
documents_df.take(10)

[Row(document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e0259aa820f', u'6eb92e281e46d463ce80317efd785d68', u'c323569535ca4c3d2ce474f4d825cc80', u'daf2f4c9cd8dbf10482f06200e613939'], ent_confidence_l

# Processing document frequencies

In [39]:
import pickle

In [40]:
documents_total = documents_meta_df.count()
documents_total

2999334

In [41]:
categories_docs_counts = documents_categories_df.groupBy('category_id').count().rdd.collectAsMap()
len(categories_docs_counts)

97

In [42]:
categories_docs_counts

{1000: 5074,
 1100: 212249,
 1200: 7,
 1202: 3259,
 1203: 30511,
 1204: 8258,
 1205: 103539,
 1206: 7523,
 1207: 28540,
 1208: 3575,
 1209: 34390,
 1210: 51624,
 1211: 4871,
 1302: 29105,
 1303: 14198,
 1304: 6660,
 1305: 5426,
 1306: 13248,
 1307: 1506,
 1308: 473,
 1400: 1,
 1402: 54763,
 1403: 572107,
 1404: 30667,
 1405: 64063,
 1406: 54394,
 1407: 124783,
 1408: 155883,
 1500: 2,
 1502: 16262,
 1503: 57335,
 1504: 15084,
 1505: 51339,
 1506: 14846,
 1507: 2569,
 1509: 2941,
 1510: 83877,
 1511: 25254,
 1512: 18502,
 1513: 276203,
 1514: 60131,
 1515: 31992,
 1516: 2454,
 1600: 3,
 1602: 74315,
 1603: 77881,
 1604: 67342,
 1605: 2094,
 1606: 32408,
 1607: 25349,
 1608: 57479,
 1609: 63995,
 1610: 49192,
 1611: 50112,
 1612: 32503,
 1613: 44567,
 1614: 9141,
 1700: 1,
 1702: 408499,
 1703: 93883,
 1704: 2861,
 1705: 6553,
 1706: 105170,
 1707: 136830,
 1708: 142908,
 1709: 18922,
 1710: 13242,
 1711: 46729,
 1800: 8,
 1802: 7586,
 1804: 3632,
 1805: 40300,
 1806: 68248,
 1807: 43922

In [130]:
with open('categories_docs_counts.pickle', 'wb') as output:
    pickle.dump(categories_docs_counts, output)

In [43]:
topics_docs_counts = documents_topics_df.groupBy('topic_id').count().rdd.collectAsMap()
len(topics_docs_counts)

300

In [44]:
topics_docs_counts

{0: 17042,
 1: 35067,
 2: 32800,
 3: 3957,
 4: 7905,
 5: 54970,
 6: 24878,
 7: 3378,
 8: 139734,
 9: 19063,
 10: 83993,
 11: 16735,
 12: 6454,
 13: 70234,
 14: 2584,
 15: 31150,
 16: 268216,
 17: 13493,
 18: 5078,
 19: 37158,
 20: 226877,
 21: 15124,
 22: 6836,
 23: 28440,
 24: 106559,
 25: 105219,
 26: 91760,
 27: 51659,
 28: 3107,
 29: 22201,
 30: 3772,
 31: 3017,
 32: 86861,
 33: 6893,
 34: 2008,
 35: 85100,
 36: 80320,
 37: 48967,
 38: 8234,
 39: 14574,
 40: 2094,
 41: 53463,
 42: 24060,
 43: 111586,
 44: 35108,
 45: 22712,
 46: 32730,
 47: 39577,
 48: 7114,
 49: 154783,
 50: 3238,
 51: 13000,
 52: 35135,
 53: 15796,
 54: 18639,
 55: 53509,
 56: 20214,
 57: 23935,
 58: 4185,
 59: 2959,
 60: 13122,
 61: 43365,
 62: 62091,
 63: 6731,
 64: 44482,
 65: 41630,
 66: 81719,
 67: 26683,
 68: 39252,
 69: 8072,
 70: 4082,
 71: 89300,
 72: 24910,
 73: 4936,
 74: 92482,
 75: 69990,
 76: 3091,
 77: 6872,
 78: 9281,
 79: 20152,
 80: 10416,
 81: 20203,
 82: 50106,
 83: 7748,
 84: 84964,
 85: 7354

In [133]:
with open('topics_docs_counts.pickle', 'wb') as output:
    pickle.dump(topics_docs_counts, output)

In [45]:
entities_docs_counts = documents_entities_df.groupBy('entity_id').count().rdd.collectAsMap()
len(entities_docs_counts)

1326009

In [46]:
entities_docs_counts

{u'72f364c8af13913d19bd803a3584228c': 3,
 u'87c8054997888824c9a2fd446b8568c9': 1,
 u'c5b15375624f83017b24f53a09883a3b': 32,
 u'cd9a81360ff3e85545806c1f4b61932a': 1,
 u'61e6c101e169f084ff8badb2846856ee': 1,
 u'1e35fc7b6ee074f40fd49367744b00d6': 1,
 u'9720e325b33b74c4845588d9e6c3f3ff': 1,
 u'68f1f97c2601c7ff8cc8926976d0a2d7': 1,
 u'e758b2e0d801c21374fa51fff05f71f2': 1,
 u'e16386fe525e35a42948435561305e15': 1,
 u'aa3c449e5a76852d3d2563874a291e9d': 1,
 u'4cf05b5925fd0cdad4cf2a8a70529cca': 1,
 u'188655ad1013803e0c02e9a66f39f4b5': 2,
 u'e554756d7b63d2a6a85f743c0c3f0ba4': 13,
 u'97947be1d6c1e353b413ed3c8a73aada': 4,
 u'f31427427c327fc0d6546dd437416ede': 1,
 u'6abcda7ba336150e016bc95f2124cd9d': 1,
 u'cc15f8705ca1d4e3eb4236ff1765cf73': 1,
 u'e2f5f5452114ec01e8c6fef1df39c933': 1,
 u'463497856abaa7ed163e82c84fee970e': 1,
 u'393841ef0412ebe937d2318a91ceb012': 1,
 u'da4fce03b68cc140a0dff1c3418ff2ca': 1,
 u'5d58f18fb0ba8b0267d23693da746cff': 1,
 u'442f2ef42b4c621c161aae89cce0a23a': 1,
 u'383dbeb4f51

In [137]:
with open('entities_docs_counts.pickle', 'wb') as output:
    pickle.dump(entities_docs_counts, output)

# event

In [47]:
truncate_day_from_timestamp_udf = F.udf(lambda ts: int(ts / 1000 / 60 / 60 / 24), IntegerType())

In [48]:
extract_country_udf = F.udf(lambda geo: geo.strip()[:2] if geo != None else '', StringType())

In [49]:
events_schema = StructType(
                    [StructField("display_id", IntegerType(), True),
                    StructField("uuid_event", StringType(), True),                    
                    StructField("document_id_event", IntegerType(), True),
                    StructField("timestamp_event", IntegerType(), True),
                    StructField("platform_event", IntegerType(), True),
                    StructField("geo_location_event", StringType(), True)]
                    )

events_df = spark.read.schema(events_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"events.csv") \
                    .withColumn('dummyEvents', F.lit(1)) \
                    .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \
                    .withColumn('event_country', extract_country_udf('geo_location_event')) \
                    .alias('events')

In [50]:
print((events_df.count(), len(events_df.columns)))

(23120126, 9)


In [51]:
events_df.show(10)

+----------+--------------+-----------------+---------------+--------------+------------------+-----------+---------+-------------+
|display_id|    uuid_event|document_id_event|timestamp_event|platform_event|geo_location_event|dummyEvents|day_event|event_country|
+----------+--------------+-----------------+---------------+--------------+------------------+-----------+---------+-------------+
|         1|cb8c55702adb93|           379743|             61|             3|         US>SC>519|          1|        0|           US|
|         2|79a85fa78311b9|          1794259|             81|             2|         US>CA>807|          1|        0|           US|
|         3|822932ce3d8757|          1179111|            182|             2|         US>MI>505|          1|        0|           US|
|         4|85281d0a49f7ac|          1777797|            234|             2|         US>WV>564|          1|        0|           US|
|         5|8d0daef4bf5b56|           252458|            338|             2|

In [52]:
events_df.groupBy('dummyEvents').count().show(20, False)

+-----------+--------+
|dummyEvents|count   |
+-----------+--------+
|1          |23120126|
+-----------+--------+



In [53]:
events_df.groupBy('day_event').count().show(20, False)

+---------+-------+
|day_event|count  |
+---------+-------+
|12       |1477939|
|1        |1516879|
|13       |1672275|
|6        |1699343|
|3        |1486649|
|5        |1319657|
|9        |1614693|
|4        |1310388|
|8        |1616280|
|7        |1596045|
|10       |1583387|
|11       |1356708|
|14       |1595929|
|2        |1573079|
|0        |1700875|
+---------+-------+



In [54]:
events_df.groupBy('event_country').count().show(20, False)

+-------------+-----+
|event_country|count|
+-------------+-----+
|LT           |1836 |
|DZ           |1410 |
|CI           |456  |
|TC           |297  |
|FI           |13560|
|AZ           |474  |
|SC           |395  |
|PM           |5    |
|UA           |2052 |
|RO           |10697|
|ZM           |4030 |
|KI           |18   |
|SL           |720  |
|NL           |51209|
|LA           |270  |
|SB           |300  |
|BS           |2537 |
|MN           |611  |
|BW           |2325 |
|PS           |530  |
+-------------+-----+
only showing top 20 rows



In [55]:
events_df.createOrReplaceTempView('events')
# createOrReplaceTempView는 현재 메모리가 존재하지 않는 테이블의 임시 뷰를 생성했지만, 그 위에 SQL 쿼리를 실행 할 수 있음

# promoted_content

In [56]:
promoted_content_schema = StructType(
                        [StructField("ad_id", IntegerType(), True),
                        StructField("document_id_promo", IntegerType(), True),                    
                        StructField("campaign_id", IntegerType(), True),
                        StructField("advertiser_id", IntegerType(), True)]
                        )

promoted_content_df = spark.read.schema(promoted_content_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"promoted_content.csv") \
                    .withColumn('dummyPromotedContent', F.lit(1)).alias('promoted_content')

In [57]:
print((promoted_content_df.count(), len(promoted_content_df.columns)))

(559583, 5)


In [58]:
promoted_content_df.show(10)

+-----+-----------------+-----------+-------------+--------------------+
|ad_id|document_id_promo|campaign_id|advertiser_id|dummyPromotedContent|
+-----+-----------------+-----------+-------------+--------------------+
|    1|             6614|          1|            7|                   1|
|    2|           471467|          2|            7|                   1|
|    3|             7692|          3|            7|                   1|
|    4|           471471|          2|            7|                   1|
|    5|           471472|          2|            7|                   1|
|    6|            12736|          1|            7|                   1|
|    7|            12808|          1|            7|                   1|
|    8|           471477|          2|            7|                   1|
|    9|            13379|          1|            7|                   1|
|   10|            13885|          1|            7|                   1|
+-----+-----------------+-----------+-------------+

In [59]:
promoted_content_df.groupBy('dummyPromotedContent').count().show(20, False)

+--------------------+------+
|dummyPromotedContent|count |
+--------------------+------+
|1                   |559583|
+--------------------+------+



# clicks_test

In [60]:
clicks_test_schema = StructType(
                        [StructField("display_id", IntegerType(), True),
                        StructField("ad_id", IntegerType(), True)]
                        )

clicks_test_df = spark.read.schema(clicks_test_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"clicks_test.csv") \
                    .withColumn('dummyClicksTest', F.lit(1)).alias('clicks_test')

In [61]:
print((clicks_test_df.count(), len(clicks_test_df.columns)))

(32225162, 3)


In [62]:
clicks_test_df.show(10)

+----------+------+---------------+
|display_id| ad_id|dummyClicksTest|
+----------+------+---------------+
|  16874594| 66758|              1|
|  16874594|150083|              1|
|  16874594|162754|              1|
|  16874594|170392|              1|
|  16874594|172888|              1|
|  16874594|180797|              1|
|  16874595|  8846|              1|
|  16874595| 30609|              1|
|  16874595|143982|              1|
|  16874596| 11430|              1|
+----------+------+---------------+
only showing top 10 rows



In [63]:
clicks_test_df.groupBy('dummyClicksTest').count().show(20, False)

+---------------+--------+
|dummyClicksTest|count   |
+---------------+--------+
|1              |32225162|
+---------------+--------+



In [64]:
test_set_df = clicks_test_df.join(promoted_content_df, on='ad_id', how='left') \
                                .join(events_df, on='display_id', how='left')
        
test_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile') 
# test_set_df에 uuid_event 중복제거하여, createOrReplaceTempView는 현재 메모리가 존재하지 않는 테이블의 임시 뷰를 생성했지만, 그 위에 SQL 쿼리를 실행

test_set_df.select('uuid_event','document_id_promo', 'timestamp_event').distinct().createOrReplaceTempView('test_users_docs_timestamp_to_ignore')
# test_set_df에 uuid_event, document_id_promo, timestamp_event 중복제거하여, 
# createOrReplaceTempView는 현재 메모리가 존재하지 않는 테이블의 임시 뷰를 생성했지만, 그 위에 SQL 쿼리를 실행

In [65]:
print((test_set_df.count(), len(test_set_df.columns)))

(32225162, 15)


In [88]:
test_set_df.take(10)

[Row(display_id=16874807, ad_id=192855, dummyClicksTest=1, document_id_promo=1460834, campaign_id=22748, advertiser_id=622, dummyPromotedContent=1, uuid_event=u'a296494aa7a041', document_id_event=399863, timestamp_event=87414, platform_event=2, geo_location_event=u'AU>02', dummyEvents=1, day_event=0, event_country=u'AU'),
 Row(display_id=16874807, ad_id=213769, dummyClicksTest=1, document_id_promo=903092, campaign_id=23770, advertiser_id=712, dummyPromotedContent=1, uuid_event=u'a296494aa7a041', document_id_event=399863, timestamp_event=87414, platform_event=2, geo_location_event=u'AU>02', dummyEvents=1, day_event=0, event_country=u'AU'),
 Row(display_id=16874807, ad_id=192759, dummyClicksTest=1, document_id_promo=1469601, campaign_id=22742, advertiser_id=1975, dummyPromotedContent=1, uuid_event=u'a296494aa7a041', document_id_event=399863, timestamp_event=87414, platform_event=2, geo_location_event=u'AU>02', dummyEvents=1, day_event=0, event_country=u'AU'),
 Row(display_id=16874807, ad

# page_views

In [67]:
page_views_schema = StructType(
                    [StructField("uuid_pv", StringType(), True),
                    StructField("document_id_pv", IntegerType(), True),
                    StructField("timestamp_pv", IntegerType(), True),
                    StructField("platform_pv", IntegerType(), True),
                    StructField("geo_location_pv", StringType(), True),
                    StructField("traffic_source_pv", IntegerType(), True)]
                    )
page_views_df = spark.read.schema(page_views_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"page_views.csv") \
                .alias('page_views')        
            
page_views_df.createOrReplaceTempView('page_views')

In [68]:
print((page_views_df.count(), len(page_views_df.columns)))

(2034275448, 6)


In [69]:
page_views_df.show(10)

+--------------+--------------+------------+-----------+---------------+-----------------+
|       uuid_pv|document_id_pv|timestamp_pv|platform_pv|geo_location_pv|traffic_source_pv|
+--------------+--------------+------------+-----------+---------------+-----------------+
|1fd5f051fba643|           120|    31905835|          1|             RS|                2|
|8557aa9004be3b|           120|    32053104|          1|          VN>44|                2|
|c351b277a358f0|           120|    54013023|          1|          KR>12|                1|
|8205775c5387f9|           120|    44196592|          1|          IN>16|                2|
|9cb0ccd8458371|           120|    65817371|          1|      US>CA>807|                2|
|2aa611f32875c7|           120|    71495491|          1|          CA>ON|                2|
|f55a6eaf2b34ab|           120|    73309199|          1|          BR>27|                2|
|cc01b582c8cbff|           120|    50033577|          1|          CA>BC|                2|

In [70]:
additional_filter = ''

# test_set_df에 uuid_event, document_id_promo, timestamp_event 중복제거 가상 테이블 설정
additional_filter = '''
                        AND NOT EXISTS (SELECT uuid_event FROM test_users_docs_timestamp_to_ignore 
                                                      WHERE uuid_event = p.uuid_pv
                                                     AND document_id_promo = p.document_id_pv
                                                     AND p.timestamp_pv >= timestamp_event)
                    '''

# users_to_profile : test_set_df에 uuid_event 중복제거 가상 테이블 설정
page_views_train_df = spark.sql('''SELECT * FROM page_views p 
                                    WHERE EXISTS (SELECT uuid_event FROM users_to_profile
                                                 WHERE uuid_event = p.uuid_pv)                                     
                                '''+ additional_filter
                               ).alias('views') \
                         .join(documents_df, on=F.col("document_id_pv") == F.col("document_id_doc"), how='left') \
                         .filter('dummyDocumentsEntities is not null OR dummyDocumentsTopics is not null OR dummyDocumentsCategory is not null')

In [117]:
print((page_views_train_df.count(), len(page_views_train_df.columns)))

(62604426, 23)


In [None]:
page_views_train_df.take(10)

[Row(uuid_pv=u'4d22935665a23c', document_id_pv=148, timestamp_pv=1183603952, platform_pv=1, geo_location_pv=u'US>PA>577', traffic_source_pv=1, document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e025

[Row(uuid_pv=u'4d22935665a23c', document_id_pv=148, timestamp_pv=1183603952, platform_pv=1, geo_location_pv=u'US>PA>577', traffic_source_pv=1, document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e025

[Row(uuid_pv=u'4d22935665a23c', document_id_pv=148, timestamp_pv=1183603952, platform_pv=1, geo_location_pv=u'US>PA>577', traffic_source_pv=1, document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e025

[Row(uuid_pv=u'4d22935665a23c', document_id_pv=148, timestamp_pv=1183603952, platform_pv=1, geo_location_pv=u'US>PA>577', traffic_source_pv=1, document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e025

[Row(uuid_pv=u'4d22935665a23c', document_id_pv=148, timestamp_pv=1183603952, platform_pv=1, geo_location_pv=u'US>PA>577', traffic_source_pv=1, document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e025

[Row(uuid_pv=u'4d22935665a23c', document_id_pv=148, timestamp_pv=1183603952, platform_pv=1, geo_location_pv=u'US>PA>577', traffic_source_pv=1, document_id_doc=148, source_id=1787, publisher_id=118, publish_time=datetime.datetime(2008, 7, 1, 0, 0), dummyDocumentsMeta=1, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=[u'e1c74838563ef5d205063b0d95afa414', u'6fd68f102042c6554cb2592fae942264', u'ae3de5466bfa10459eebcbe02ac9b3ee', u'9da9595caa381755c9353ae7179f2117', u'b973c2e55831fb4025003e025

# Processing user profiles

In [71]:
int_null_to_minus_one_udf = F.udf(lambda x: x if x != None else -1, IntegerType())
int_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(IntegerType()))
float_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(FloatType()))
str_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(StringType()))

In [74]:
page_views_by_user_df = page_views_train_df.select(
                           'uuid_pv', 
                           'document_id_pv', 
                           int_null_to_minus_one_udf('timestamp_pv').alias('timestamp_pv'), # None는 -1, 아니면 그대로 사용
                           int_list_null_to_empty_list_udf('category_id_list').alias('category_id_list'), # None는 [], 아니면 그대로 사용
                           float_list_null_to_empty_list_udf('cat_confidence_level_list').alias('cat_confidence_level_list'), # None는 [], 아니면 그대로 사용
                           int_list_null_to_empty_list_udf('topic_id_list').alias('topic_id_list'), # None는 [], 아니면 그대로 사용
                           float_list_null_to_empty_list_udf('top_confidence_level_list').alias('top_confidence_level_list'), 
                           str_list_null_to_empty_list_udf('entity_id_list').alias('entity_id_list'), # None는 [], 아니면 그대로 사용
                           float_list_null_to_empty_list_udf('ent_confidence_level_list').alias('ent_confidence_level_list')) \
                    .groupBy('uuid_pv') \
                    .agg(F.collect_list('document_id_pv').alias('document_id_pv_list'), # uuid_pv 기준으로, document_id_pv를 document_id_pv_list로 row를 합쳐서 생성 : [document_id_pv1, document_id_pv2]
                         F.collect_list('timestamp_pv').alias('timestamp_pv_list'), # uuid_pv 기준으로, timestamp_pv를 timestamp_pv_list로 row를 합쳐서 생성 : [timestamp_pv1, timestamp_pv2]
                         F.collect_list('category_id_list').alias('category_id_lists'), # uuid_pv 기준으로, category_id_list를 category_id_lists로 row를 합쳐서 생성 : [category_id_list1, category_id_list2]
                         F.collect_list('cat_confidence_level_list').alias('cat_confidence_level_lists'), # uuid_pv 기준으로, cat_confidence_level_list를 cat_confidence_level_lists로 row를 합쳐서 생성 : [cat_confidence_level_list1, cat_confidence_level_list2]
                         F.collect_list('topic_id_list').alias('topic_id_lists'), # uuid_pv 기준으로, topic_id_list를 topic_id_lists로 row를 합쳐서 생성 : [topic_id_list1, topic_id_list2]
                         F.collect_list('top_confidence_level_list').alias('top_confidence_level_lists'), # uuid_pv 기준으로, top_confidence_level_list를 top_confidence_level_lists로 row를 합쳐서 생성 : [top_confidence_level_list1, top_confidence_level_list2]
                         F.collect_list('entity_id_list').alias('entity_id_lists'), # uuid_pv 기준으로, entity_id_list를 entity_id_lists로 row를 합쳐서 생성 : [entity_id_list1, entity_id_list2]
                         F.collect_list('ent_confidence_level_list').alias('ent_confidence_level_lists') # uuid_pv 기준으로, ent_confidence_level_list를 ent_confidence_level_lists로 row를 합쳐서 생성 : [ent_confidence_level_list1, ent_confidence_level_list2]
                        )

# .groupBy().agg(F.collect_list()) 참조 - https://stackoverflow.com/questions/37580782/pyspark-collect-set-or-collect-list-with-groupby

In [75]:
print((page_views_by_user_df.count(), len(page_views_by_user_df.columns)))

(5799892, 9)


In [None]:
page_views_by_user_df.take(3)

[Row(uuid_pv=u'1000615e760786', document_id_pv_list=[2959725, 2730005, 2730005], timestamp_pv_list=[1288970662, 1032421362, 1088915797], category_id_lists=[[1907, 1914], [1903, 1403], [1903, 1403]], cat_confidence_level_lists=[[0.8844558000564575, 0.06729555130004883], [0.44797781109809875, 0.034085266292095184], [0.44797781109809875, 0.034085266292095184]], topic_id_lists=[[77], [266], [266]], top_confidence_level_lists=[[0.2766975164413452], [0.48858878016471863], [0.48858878016471863]], entity_id_lists=[[u'753fa42329661c4eb3b1e99e63a7e46d', u'3b4a364141e7c25731a15cd4ef643d9e', u'ea7f7e8b98b3212620cea38b90d89321', u'246f2c584db092a36a14533b067ccb1b', u'87f8f9a6d35ec4f07d4fd425db84c998', u'8a74cb33e81530f941bcb99a067a6baa', u'504a2a4f3b97b8511737fc0b7d55dc46'], [], []], ent_confidence_level_lists=[[0.8671315312385559, 0.2762681841850281, 0.26690584421157837, 0.2412770390510559, 0.23743495345115662, 0.2357538640499115, 0.2116575986146927], [], []]),
 Row(uuid_pv=u'10042103b7ff2b', docu

In [78]:
from collections import defaultdict

def get_user_aspects(docs_aspects, aspect_docs_counts):
    docs_aspects_merged_lists = defaultdict(list)
    
    for doc_aspects in docs_aspects:
        for key in doc_aspects.keys():
            docs_aspects_merged_lists[key].append(doc_aspects[key])
        
    docs_aspects_stats = {}
    for key in docs_aspects_merged_lists.keys():
        aspect_list = docs_aspects_merged_lists[key] # aspect_list : category, topic, entities id 의 confidence_level , aspect_docs_counts[key] : category, topic, entities id 의 개수
        tf = len(aspect_list) # category, topic, entities id 의 confidence_level 개수
        idf = math.log(documents_total / float(aspect_docs_counts[key])) # log(documents_meta row 개수 / aspect_docs_counts[key] : category, topic, entities id 의 개수)
        
        confid_mean = sum(aspect_list) / float(len(aspect_list)) # category, topic, entities id 의 confidence_level 의 평균
        docs_aspects_stats[key] = [tf*idf, confid_mean] 

        
    return docs_aspects_stats # category, topic, entities id : tf*idf, confid_mean


def generate_user_profile(docs_aspects_list, docs_aspects_confidence_list, aspect_docs_counts):    
    docs_aspects = []
    for doc_aspects_list, doc_aspects_confidence_list in zip(docs_aspects_list, docs_aspects_confidence_list):
        doc_aspects = dict(zip(doc_aspects_list, doc_aspects_confidence_list))
        docs_aspects.append(doc_aspects)
        
    user_aspects = get_user_aspects(docs_aspects, aspect_docs_counts)
    return user_aspects

In [79]:
get_list_len_udf = F.udf(lambda docs_list: len(docs_list), IntegerType())

In [80]:
generate_categories_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            categories_docs_counts), 
                                          MapType(IntegerType(), 
                                                  ArrayType(FloatType()),
                                                  False))


generate_topics_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            topics_docs_counts), 
                                          MapType(IntegerType(), 
                                                  ArrayType(FloatType()),
                                                  False))


generate_entities_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            entities_docs_counts), 
                                          MapType(StringType(),
                                                  ArrayType(FloatType()),
                                                  False))

In [81]:
users_profile_df = page_views_by_user_df \
                                 .withColumn('views', get_list_len_udf('document_id_pv_list')) \
                                 .withColumn('categories', 
                                             generate_categories_user_profile_map_udf('category_id_lists', 
                                                                   'cat_confidence_level_lists')) \
                                 .withColumn('topics', 
                                             generate_topics_user_profile_map_udf('topic_id_lists', 
                                                                               'top_confidence_level_lists')) \
                                 .withColumn('entities', 
                                             generate_entities_user_profile_map_udf('entity_id_lists', 
                                                                               'ent_confidence_level_lists')) \
                                 .select(F.col('uuid_pv').alias('uuid'),
                                         F.col('document_id_pv_list').alias('doc_ids'),
                                         'views',
                                         'categories', 'topics', 'entities')

In [None]:
print((users_profile_df.count(), len(users_profile_df.columns)))

(5799892, 6)


In [None]:
users_profile_df.take(3)

[Row(uuid=u'1000615e760786', doc_ids=[2959725, 2730005, 2730005], views=3, categories={1403: [3.313638925552368, 0.034085266292095184], 1914: [4.851147174835205, 0.06729555130004883], 1907: [2.805744171142578, 0.8844558000564575], 1903: [6.904221534729004, 0.44797781109809875]}, topics={266: [11.43613052368164, 0.48858878016471863], 77: [6.078690528869629, 0.2766975164413452]}, entities={u'87f8f9a6d35ec4f07d4fd425db84c998': [7.4499640464782715, 0.23743495345115662], u'8a74cb33e81530f941bcb99a067a6baa': [9.864045143127441, 0.2357538640499115], u'3b4a364141e7c25731a15cd4ef643d9e': [13.815288543701172, 0.2762681841850281], u'753fa42329661c4eb3b1e99e63a7e46d': [12.080687522888184, 0.8671315312385559], u'504a2a4f3b97b8511737fc0b7d55dc46': [14.913900375366211, 0.2116575986146927], u'246f2c584db092a36a14533b067ccb1b': [8.648599624633789, 0.2412770390510559], u'ea7f7e8b98b3212620cea38b90d89321': [12.080687522888184, 0.26690584421157837]}),
 Row(uuid=u'10042103b7ff2b', doc_ids=[1415309, 1314190

In [None]:
table_name = 'user_profiles'

users_profile_df.write.parquet(OUTPUT_BUCKET_FOLDER+table_name, mode='overwrite')