## loading package

In [1]:
# 스파크 작업은, 코드를 입력하고 실행하면 바로 실행되는 것이 아니다
# 지연 연산(lazy evaluation)을 사용해서 기초 데이터에 적용될 변환 연산을 기억하고 있고 동작(Action)이 실행될 때 한 번에 실행된다
# 이런 과정을 통해 스파크가 자동으로 최적화 및 장애나 느리게 작업하는 일꾼을 깔끔하게 처리한다

evaluation = True

OUTPUT_BUCKET_FOLDER = "gs://capstone-01/output/"
DATA_BUCKET_FOLDER = "gs://capstone-01/data/"

In [2]:
from IPython.display import display

In [3]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [4]:
from pyspark.sql import DataFrameWriter
from pyspark.sql import SparkSession

In [5]:
import numpy as np

In [6]:
import math
import datetime
import time

In [7]:
import random
random.seed(42)

In [8]:
start_time = time.time()

## loading data

In [9]:
# 사용자 정의 함수 udf
# timestamp를 넣으면 일자(숫자) 반환
# 예를 들어, events.csv의 timestamp를 이 함수에 넣으면, ad를 클릭한 날이 기준일로부터 몇 일인지 반환 (0~12) 

truncate_day_from_timestamp_udf = F.udf(lambda ts: int(ts / 1000 / 60 / 60 / 24), IntegerType())

In [10]:
# strip 함수로 데이터의 양쪽 공백을 지우고, 앞에서 2자만 떼어내서 반환하는 함수 (데이터가 None이면 '' 반환)

extract_country_udf = F.udf(lambda geo: geo.strip()[:2] if geo != None else '', StringType())

In [11]:
# documents_meta.csv 가져오기
# dummyDocumentsMeta 컬럼 추가
# withColumn('컬럼명', F.lit(1)) -> 더미변수 만드는 함수, 하나의 컬럼으로 추가됨
# F.lit() : Creates a Column of literal value, lit 함수는 상수 컬럼을 만들 때 사용한다
# you have to use lit if you want to access any of the pyspark.sql.Column methods treating standard Python scalar as a constant column

documents_meta_schema = StructType(
                    [StructField("document_id_doc", IntegerType(), True),
                    StructField("source_id", IntegerType(), True),                    
                    StructField("publisher_id", IntegerType(), True),
                    StructField("publish_time", TimestampType(), True)]
                    )

documents_meta_df = spark.read.schema(documents_meta_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_meta.csv") \
                .withColumn('dummyDocumentsMeta', F.lit(1)).alias('documents_meta')

In [12]:
documents_meta_df.show(10)

+---------------+---------+------------+-------------------+------------------+
|document_id_doc|source_id|publisher_id|       publish_time|dummyDocumentsMeta|
+---------------+---------+------------+-------------------+------------------+
|        1595802|        1|         603|2016-06-05 00:00:00|                 1|
|        1524246|        1|         603|2016-05-26 11:00:00|                 1|
|        1617787|        1|         603|2016-05-27 00:00:00|                 1|
|        1615583|        1|         603|2016-06-07 00:00:00|                 1|
|        1615460|        1|         603|2016-06-20 00:00:00|                 1|
|        1615354|        1|         603|2016-06-10 00:00:00|                 1|
|        1614611|        1|         603|2016-06-05 13:00:00|                 1|
|        1614235|        1|         603|2016-06-09 00:00:00|                 1|
|        1614225|        1|         603|2016-06-09 00:00:00|                 1|
|        1488264|        1|         603|

In [12]:
# documents_categories.csv 가져오기 

documents_categories_schema = StructType(
                    [StructField("document_id_cat", IntegerType(), True),
                    StructField("category_id", IntegerType(), True),                    
                    StructField("confidence_level_cat", FloatType(), True)]
                    )

documents_categories_df = spark.read.schema(documents_categories_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_categories.csv") \
                .alias('documents_categories')

# documents_categories_grouped_df 생성하기
# agg 함수에는 원하는 기능과 컬럼을 전달한다
# 데이터들을 document_id로 그룹화하고, 한 document_id에 해당하는 category_id와 confidence_level을 각각 리스트로 담는다  
# F.collect_list() : 주어진 컬럼의 모든 값을 수집하여 하나의 리스트로 만듦
# dummyDocumentsCategory 컬럼 추가 (category 더미변수 생성)

documents_categories_grouped_df = documents_categories_df.groupBy('document_id_cat') \
                                            .agg(F.collect_list('category_id').alias('category_id_list'),
                                                 F.collect_list('confidence_level_cat').alias('cat_confidence_level_list')) \
                                            .withColumn('dummyDocumentsCategory', F.lit(1)) \
                                            .alias('documents_categories_grouped')

In [14]:
documents_categories_df.show(10)

+---------------+-----------+--------------------+
|document_id_cat|category_id|confidence_level_cat|
+---------------+-----------+--------------------+
|        1595802|       1611|                0.92|
|        1595802|       1610|                0.07|
|        1524246|       1807|                0.92|
|        1524246|       1608|                0.07|
|        1617787|       1807|                0.92|
|        1617787|       1608|                0.07|
|        1615583|       1305|                0.92|
|        1615583|       1806|                0.07|
|        1615460|       1613|           0.5406464|
|        1615460|       1603|          0.04113614|
+---------------+-----------+--------------------+
only showing top 10 rows



In [13]:
# documents_topics.csv 가져오기 

documents_topics_schema = StructType(
                    [StructField("document_id_top", IntegerType(), True),
                    StructField("topic_id", IntegerType(), True),                    
                    StructField("confidence_level_top", FloatType(), True)]
                    )

documents_topics_df = spark.read.schema(documents_topics_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_topics.csv")  \
                .alias('documents_topics')
    
# documents_topics_grouped_df 생성하기  
# 데이터들을 document_id로 그룹화하고, 한 document_id에 해당하는 topic_id와 confidence_level을 각각 리스트로 담는다 
# dummyDocumentsTopics 컬럼 추가 (topics 더미변수 생성)

documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \
                                            .agg(F.collect_list('topic_id').alias('topic_id_list'),
                                                 F.collect_list('confidence_level_top').alias('top_confidence_level_list')) \
                                            .withColumn('dummyDocumentsTopics', F.lit(1)) \
                                            .alias('documents_topics_grouped')

In [16]:
documents_topics_df.show(10)

+---------------+--------+--------------------+
|document_id_top|topic_id|confidence_level_top|
+---------------+--------+--------------------+
|        1595802|     140|          0.07311316|
|        1595802|      16|         0.059416488|
|        1595802|     143|         0.045420755|
|        1595802|     170|          0.03886743|
|        1524246|     113|           0.1964504|
|        1524246|     260|          0.14287816|
|        1524246|      92|          0.03315913|
|        1524246|     168|        0.0140903415|
|        1524246|      54|          0.00878222|
|        1524246|     207|         0.008282372|
+---------------+--------+--------------------+
only showing top 10 rows



In [14]:
# documents_entities.csv 가져오기

documents_entities_schema = StructType(
                    [StructField("document_id_ent", IntegerType(), True),
                    StructField("entity_id", StringType(), True),                    
                    StructField("confidence_level_ent", FloatType(), True)]
                    )

documents_entities_df = spark.read.schema(documents_entities_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_entities.csv")  \
                .alias('documents_entities')

# documents_entities_grouped_df 생성하기  
# 데이터들을 document_id로 그룹화하고, 한 document_id에 해당하는 entity_id와 confidence_level을 각각 리스트로 담는다 
# dummyDocumentsEntities 컬럼 추가 (entities 더미변수 생성)
    
documents_entities_grouped_df = documents_entities_df.groupBy('document_id_ent') \
                                            .agg(F.collect_list('entity_id').alias('entity_id_list'),
                                                 F.collect_list('confidence_level_ent').alias('ent_confidence_level_list')) \
                                            .withColumn('dummyDocumentsEntities', F.lit(1)) \
                                            .alias('documents_entities_grouped')

In [18]:
documents_entities_df.show(10)

+---------------+--------------------+--------------------+
|document_id_ent|           entity_id|confidence_level_ent|
+---------------+--------------------+--------------------+
|        1524246|f9eec25663db4cd83...|          0.67286533|
|        1524246|55ebcfbdaff1d6f60...|           0.3991137|
|        1524246|839907a972930b17b...|          0.39209574|
|        1524246|04d8f9a1ad48f126d...|          0.21399638|
|        1617787|612a1d17685a498af...|          0.38619283|
|        1617787|fb8c6cb0879e0de87...|          0.36411646|
|        1617787|793c6a6cf386edb82...|          0.34916824|
|        1617787|b525b84d5ed52a345...|          0.28700453|
|        1617787|758cb9cb3014607cb...|          0.23795699|
|        1617787|d523aaba6d3916f8b...|          0.23579852|
+---------------+--------------------+--------------------+
only showing top 10 rows



In [19]:
documents_entities_grouped_df.show(10)

+---------------+--------------------+-------------------------+----------------------+
|document_id_ent|      entity_id_list|ent_confidence_level_list|dummyDocumentsEntities|
+---------------+--------------------+-------------------------+----------------------+
|            148|[e1c74838563ef5d2...|     [0.6320258, 0.404...|                     1|
|            463|[aaa0246895d43735...|              [0.6939791]|                     1|
|            496|[0ffa5e294bd46905...|              [0.3608937]|                     1|
|            833|[430da13f06eed7d5...|     [0.5932388, 0.240...|                     1|
|           1088|[94101adfc2f6bccb...|              [0.9564353]|                     1|
|           1580|[86b630e436676e43...|     [0.92001617, 0.44...|                     1|
|           1645|[976e5e062b216f23...|     [0.66670954, 0.61...|                     1|
|           1959|[806f6ef8cca7644d...|             [0.31478134]|                     1|
|           2122|[bad3651e69ae38

In [15]:
# documents_meta, documents_categories_grouped, documents_topics_grouped, documents_entities_grouped를 조인한 documents_df 생성
# ① documents_meta와 documents_categories_grouped를 조인 (key='document_id')
# ② 위 테이블과 documents_topics_grouped를 조인 (key='document_id')
# ③ 위 테이블과 documents_entities_grouped를 조인 (key='document_id')
# cache() : 동일한 rdd를 재사용하고 싶을 때 사용하는 함수, persist 함수에서 저장 옵션을 memory_only로 한 것과 동일

documents_df = documents_meta_df.join(documents_categories_grouped_df, on=F.col("document_id_doc") == F.col("documents_categories_grouped.document_id_cat"), how='left') \
                         .join(documents_topics_grouped_df, on=F.col("document_id_doc") == F.col("documents_topics_grouped.document_id_top"), how='left') \
                         .join(documents_entities_grouped_df, on=F.col("document_id_doc") == F.col("documents_entities_grouped.document_id_ent"), how='left') \
                         .cache()

In [21]:
documents_df.show(10)

+---------------+---------+------------+-------------------+------------------+---------------+----------------+-------------------------+----------------------+---------------+--------------------+-------------------------+--------------------+---------------+--------------------+-------------------------+----------------------+
|document_id_doc|source_id|publisher_id|       publish_time|dummyDocumentsMeta|document_id_cat|category_id_list|cat_confidence_level_list|dummyDocumentsCategory|document_id_top|       topic_id_list|top_confidence_level_list|dummyDocumentsTopics|document_id_ent|      entity_id_list|ent_confidence_level_list|dummyDocumentsEntities|
+---------------+---------+------------+-------------------+------------------+---------------+----------------+-------------------------+----------------------+---------------+--------------------+-------------------------+--------------------+---------------+--------------------+-------------------------+----------------------+
|   

In [None]:
documents_df.count()

In [16]:
# 첫째 행에서 evaluation = True 줬음
# bucket에 저장한 validation set 가져오기
# validation_set_df의 uuid_event의 unique 값을 users_to_profile로 저장
# createOrReplaceTempView() : Spark는 Lazy evaluation이기 때문에 아직 실행 되기 전이다
# uuid_event, document_id_promo 두 컬럼의 unique 값을 추출해서 validation_users_docs_to_ignore로 저장
# document_id_promo는 document_id와 다른 광고 랜딩페이지이기 때문에 docs_to_ignore로 따로 저장된 것 같다
# evaluation이 false면 test set을 생성하기 위한 스키마랑 테이블을 가져와서 test set을 생성

if evaluation:
    validation_set_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+"validation_set.parquet") \
                    .alias('validation_set')        
    
    validation_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile')    
    validation_set_df.select('uuid_event','document_id_promo').distinct().createOrReplaceTempView('validation_users_docs_to_ignore')
    
else:
    events_schema = StructType(
                    [StructField("display_id", IntegerType(), True),
                    StructField("uuid_event", StringType(), True),                    
                    StructField("document_id_event", IntegerType(), True),
                    StructField("timestamp_event", IntegerType(), True),
                    StructField("platform_event", IntegerType(), True),
                    StructField("geo_location_event", StringType(), True)]
                    )

    events_df = spark.read.schema(events_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"events.csv") \
                    .withColumn('dummyEvents', F.lit(1)) \
                    .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \
                    .withColumn('event_country', extract_country_udf('geo_location_event')) \
                    .alias('events')

    events_df.createOrReplaceTempView('events')


    promoted_content_schema = StructType(
                        [StructField("ad_id", IntegerType(), True),
                        StructField("document_id_promo", IntegerType(), True),                    
                        StructField("campaign_id", IntegerType(), True),
                        StructField("advertiser_id", IntegerType(), True)]
                        )

    promoted_content_df = spark.read.schema(promoted_content_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"promoted_content.csv") \
                    .withColumn('dummyPromotedContent', F.lit(1)).alias('promoted_content')
    
    # clicks_test.csv 가져오기
    clicks_test_schema = StructType(
                        [StructField("display_id", IntegerType(), True),
                        StructField("ad_id", IntegerType(), True)]
                        )
    
    # dummyClicksTest 컬럼 추가 (display_id - ad_id 더미변수 생성)
    clicks_test_df = spark.read.schema(clicks_test_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"clicks_test.csv") \
                    .withColumn('dummyClicksTest', F.lit(1)).alias('clicks_test')
    
    # clicks_test와 promoted_content를 조인 (key='ad_id')
    # 위의 테이블과 events를 조인 (key='display_id')
    # test_set_df를 생성
    test_set_df = clicks_test_df.join(promoted_content_df, on='ad_id', how='left') \
                                .join(events_df, on='display_id', how='left')
    
    # test_set_df에서 uuid_event의 unique값을 추출하여 users_to_profile에 저장
    test_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile')
    
    # test_set_df에서 uuid_event, document_id_promo, timestamp_event의 unique 값을 추출해서 test_users_docs_timestamp_to_ignore로 저장
    test_set_df.select('uuid_event', 'document_id_promo', 'timestamp_event').distinct().createOrReplaceTempView('test_users_docs_timestamp_to_ignore')

In [None]:
# events_df.show(10)

In [None]:
# promoted_content_df.show(10)

In [None]:
# clicks_test_df.show(10)

In [None]:
# test_set_df.show(10)

In [17]:
# page_views.csv 가져오기
# day_pv 컬럼 추가 : view라는 액션이 일어난 timestamp를, 첫날을 기준으로 경과한 일자(숫자)로 반환한 값을 갖는 컬럼 

page_views_schema = StructType(
                    [StructField("uuid_pv", StringType(), True),
                    StructField("document_id_pv", IntegerType(), True),
                    StructField("timestamp_pv", IntegerType(), True),
                    StructField("platform_pv", IntegerType(), True),
                    StructField("geo_location_pv", StringType(), True),
                    StructField("traffic_source_pv", IntegerType(), True)]
                    )

# Google Storage에 저장된 page_views 파일 로드
page_views_df = spark.read.schema(page_views_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv("gs://upload-bigquery180927/page_views.csv") \
                .withColumn('day_pv', truncate_day_from_timestamp_udf('timestamp_pv')) \
                .alias('page_views')             
            
page_views_df.createOrReplaceTempView('page_views')

In [18]:
# document_id가 document_id_promo랑 일치하면 제외 (document_id_promo는 document_id와 달리 랜딩페이지이기 때문에 page view가 아님)
# (evaluation이 false면) 다음 조건 추가 : page_views의 timestamp가 event의 timestamp보다 나중인 것 제외 (click이 view보다 앞선 데이터는 신뢰도 낮음)

additional_filter = ''
if evaluation:
    additional_filter = '''
                             AND NOT EXISTS (SELECT uuid_event FROM validation_users_docs_to_ignore 
                                                      WHERE uuid_event = p.uuid_pv
                                                     AND document_id_promo = p.document_id_pv)
                        '''
else:
    additional_filter = '''
                             AND NOT EXISTS (SELECT uuid_event FROM test_users_docs_timestamp_to_ignore 
                                                      WHERE uuid_event = p.uuid_pv
                                                     AND document_id_promo = p.document_id_pv
                                                     AND p.timestamp_pv >= timestamp_event)
                        '''

# users_to_profile : validation_set_df의 uuid_event의 unique 값
# page_views_train_df 생성
# ① page_views에서 uuid가 users_to_profile의 uuid와 일치하는 모든 데이터 가져옴
# ② documents_df(meta, cate, topics, entities 조인한 것)와 조인 (key='document_id')
# ③ filter 부분 해석 불가

page_views_train_df = spark.sql('''SELECT * FROM page_views p 
                                    WHERE EXISTS (SELECT uuid_event FROM users_to_profile
                                                 WHERE uuid_event = p.uuid_pv)                                     
                                '''+ additional_filter
                               ).alias('views') \
                         .join(documents_df, on=F.col("document_id_pv") == F.col("document_id_doc"), how='left') \
                         .filter('dummyDocumentsEntities is not null OR dummyDocumentsTopics is not null OR dummyDocumentsCategory is not null')

In [36]:
page_views_train_df.show(10)

+--------------+--------------+------------+-----------+---------------+-----------------+------+---------------+---------+------------+-------------------+------------------+---------------+----------------+-------------------------+----------------------+---------------+--------------------+-------------------------+--------------------+---------------+--------------------+-------------------------+----------------------+
|       uuid_pv|document_id_pv|timestamp_pv|platform_pv|geo_location_pv|traffic_source_pv|day_pv|document_id_doc|source_id|publisher_id|       publish_time|dummyDocumentsMeta|document_id_cat|category_id_list|cat_confidence_level_list|dummyDocumentsCategory|document_id_top|       topic_id_list|top_confidence_level_list|dummyDocumentsTopics|document_id_ent|      entity_id_list|ent_confidence_level_list|dummyDocumentsEntities|
+--------------+--------------+------------+-----------+---------------+-----------------+------+---------------+---------+------------+--------

## Processing document frequencies

In [19]:
import pickle

In [20]:
# document_meta의 데이터 개수 카운트
documents_total = documents_meta_df.count()
documents_total

2999334

In [21]:
categories_docs_counts = documents_categories_df.groupBy('category_id').count().rdd.collectAsMap()
len(categories_docs_counts)

# collectAsMap() : return the results for paired RDD as Map collection
# categories_docs_counts는 category_id로 묶고, category_id와 그 개수를 pair 형태로 갖는 테이블

97

In [22]:
df_filenames_suffix = ''
if evaluation:
    df_filenames_suffix = '_eval'
    
# evaluation이 true면, df_filenames_suffix의 값을 _eval로 준다

In [23]:
with open('categories_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(categories_docs_counts, output)

# pickle : 파이썬에서 리스트, 클래스 같은 텍스트 이외의 자료형을 파일로 저장하기 위하여 사용하는 모듈    
# pickle.load(파일)을 통해서 파일 내용을 읽어오려면 pickle.dump를 사용해서 데이터를 입력한 파일이어야 한다
# pickle.dump(데이터, 파일) : 데이터를 파일에 저장한다
# 'wb' = write and binary
# categories_docs_counts를 output 폴더에 저장

In [None]:
# pickle을 사용한 데이터 save와 load 예시

# save
with open('data.pickle', 'wb') as f:
    pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)

# load
with open('data.pickle', 'rb') as f:
    data = pickle.load(f)

In [24]:
topics_docs_counts = documents_topics_df.groupBy('topic_id').count().rdd.collectAsMap()
len(topics_docs_counts)

# topics_docs_counts는 topic_id로 묶고, topic_id와 그 개수를 pair 형태로 갖는 테이블

300

In [25]:
with open('topics_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(topics_docs_counts, output)
    
# topics_docs_counts를 output 폴더에 저장

In [26]:
entities_docs_counts = documents_entities_df.groupBy('entity_id').count().rdd.collectAsMap()
len(entities_docs_counts)

# entities_docs_counts는 entity_id로 묶고, entity_id와 그 개수를 pair 형태로 갖는 테이블

1326009

In [27]:
with open('entities_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(entities_docs_counts, output)
    
# entities_docs_counts를 output 폴더에 저장

## Processing user profiles

In [28]:
# int_null_to_minus_one_udf -> int가 null값이면 -1을 반환하는 함수
# 아래 3개 함수 -> 각각 int, float, str 타입인 리스트 형태 데이터가 null 값이면 빈 리스트 반환하는 함수

int_null_to_minus_one_udf = F.udf(lambda x: x if x != None else -1, IntegerType())
int_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(IntegerType()))
float_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(FloatType()))
str_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(StringType()))

In [29]:
# page_views_by_user_df 생성
# ① page_views에서 uuid_pv, document_id_pv는 그대로 가져오고, 나머지는 위 네 함수 적용해서 가져옴
# ② uuid_pv로 그룹핑
# ③ uuid_pv에 해당하는 각 컬럼들의 정보를 하나의 리스트 안에 넣는다 -> F.collect_list()

page_views_by_user_df = page_views_train_df.select(
                           'uuid_pv', 
                           'document_id_pv', 
                           int_null_to_minus_one_udf('timestamp_pv').alias('timestamp_pv'), 
                           int_list_null_to_empty_list_udf('category_id_list').alias('category_id_list'), 
                           float_list_null_to_empty_list_udf('cat_confidence_level_list').alias('cat_confidence_level_list'), 
                           int_list_null_to_empty_list_udf('topic_id_list').alias('topic_id_list'), 
                           float_list_null_to_empty_list_udf('top_confidence_level_list').alias('top_confidence_level_list'), 
                           str_list_null_to_empty_list_udf('entity_id_list').alias('entity_id_list'), 
                           float_list_null_to_empty_list_udf('ent_confidence_level_list').alias('ent_confidence_level_list')) \
                    .groupBy('uuid_pv') \
                    .agg(F.collect_list('document_id_pv').alias('document_id_pv_list'),
                         F.collect_list('timestamp_pv').alias('timestamp_pv_list'),
                         F.collect_list('category_id_list').alias('category_id_lists'),
                         F.collect_list('cat_confidence_level_list').alias('cat_confidence_level_lists'),
                         F.collect_list('topic_id_list').alias('topic_id_lists'),
                         F.collect_list('top_confidence_level_list').alias('top_confidence_level_lists'),
                         F.collect_list('entity_id_list').alias('entity_id_lists'),
                         F.collect_list('ent_confidence_level_list').alias('ent_confidence_level_lists')
                        )

In [30]:
from collections import defaultdict

# get_user_aspects() 함수 생성

def get_user_aspects(docs_aspects, aspect_docs_counts):
    docs_aspects_merged_lists = defaultdict(list)
    
    for doc_aspects in docs_aspects:
        for key in doc_aspects.keys():
            docs_aspects_merged_lists[key].append(doc_aspects[key])
        
    docs_aspects_stats = {}
    for key in docs_aspects_merged_lists.keys():
        aspect_list = docs_aspects_merged_lists[key]
        tf = len(aspect_list)
        idf = math.log(documents_total / float(aspect_docs_counts[key]))
        
        confid_mean = sum(aspect_list) / float(len(aspect_list))
        docs_aspects_stats[key] = [tf*idf, confid_mean]
        
    return docs_aspects_stats

# generate_user_profile() 함수 생성

def generate_user_profile(docs_aspects_list, docs_aspects_confidence_list, aspect_docs_counts):    
    docs_aspects = []
    for doc_aspects_list, doc_aspects_confidence_list in zip(docs_aspects_list, docs_aspects_confidence_list):
        doc_aspects = dict(zip(doc_aspects_list, doc_aspects_confidence_list))
        docs_aspects.append(doc_aspects)
        
    user_aspects = get_user_aspects(docs_aspects, aspect_docs_counts)
    return user_aspects

In [31]:
# get_list_len_udf() : input값의 길이를 반환하는 함수

get_list_len_udf = F.udf(lambda docs_list: len(docs_list), IntegerType())

In [32]:
generate_categories_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            categories_docs_counts), 
                                          MapType(IntegerType(), 
                                                  ArrayType(FloatType()),
                                                  False))


generate_topics_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            topics_docs_counts), 
                                          MapType(IntegerType(), 
                                                  ArrayType(FloatType()),
                                                  False))


generate_entities_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            entities_docs_counts), 
                                          MapType(StringType(),
                                                  ArrayType(FloatType()),
                                                  False))

In [33]:
users_profile_df = page_views_by_user_df \
                                 .withColumn('views', get_list_len_udf('document_id_pv_list')) \
                                 .withColumn('categories', 
                                             generate_categories_user_profile_map_udf('category_id_lists', 
                                                                   'cat_confidence_level_lists')) \
                                 .withColumn('topics', 
                                             generate_topics_user_profile_map_udf('topic_id_lists', 
                                                                               'top_confidence_level_lists')) \
                                 .withColumn('entities', 
                                             generate_entities_user_profile_map_udf('entity_id_lists', 
                                                                               'ent_confidence_level_lists')) \
                                 .select(F.col('uuid_pv').alias('uuid'),
                                         F.col('document_id_pv_list').alias('doc_ids'),
                                         'views',
                                         'categories', 'topics', 'entities')

In [37]:
users_profile_df.show(10)

+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|          uuid|             doc_ids|views|          categories|              topics|            entities|
+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|100013af048bbf|[2442888, 2434217...|   46|Map(1205 -> Wrapp...|Map(5 -> WrappedA...|Map(14a7d4c4ebcc6...|
|100163b35102c4|[2516821, 2356657...|   12|Map(1907 -> Wrapp...|Map(174 -> Wrappe...|Map(6904a5638b5cf...|
|1003370a1c2d0f|[1521640, 1685708...|    4|Map(1808 -> Wrapp...|Map(69 -> Wrapped...|Map(531cadf46e145...|
|100659017f177b|            [429642]|    1|Map(1510 -> Wrapp...|Map(296 -> Wrappe...|               Map()|
|100aa12f880396|[2504276, 1792136...|    3|Map(1408 -> Wrapp...|Map(265 -> Wrappe...|Map(b366917165b76...|
|101324634e39b0|  [2672785, 2690250]|    2|Map(1210 -> Wrapp...|Map(20 -> Wrapped...|Map(14a7d4c4ebcc6...|
|101487b48a7780|[1113514, 1184645...|

In [34]:
if evaluation:
    table_name = 'user_profiles_eval'
else:
    table_name = 'user_profiles'

users_profile_df.write.parquet(OUTPUT_BUCKET_FOLDER+table_name, mode='overwrite')

In [35]:
finish_time = time.time()
print("Elapsed min: ", (finish_time-start_time)/60/60)

('Elapsed min: ', 0.5069252494308684)
