In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

### Cluster Configuration <br>
Driver Memory: 30.5GB <br>
Worker Memory: 30.5GB <br>
Worker cores: 4 <br>
Minimum number of workers: 2 <br>
Maximum number of workers: 8 <br>

In [0]:
spark = SparkSession.builder \
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
            .appName("project") \
            .getOrCreate()

In [0]:
aws_access_key = '***'
aws_secret_key = '***'
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1')
#spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') 
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

In [0]:
spark._jsc.hadoopConfiguration().set('spark.network.timeout','7200s')
spark._jsc.hadoopConfiguration().set('spark.executor.heartbeatInterval','1200s')

#### DataType cast functions

In [0]:
def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except:
        return None

In [0]:
def TimeStampSafe(value):
    value = value.strip() # Timestamp starting and ending with a double quotation mark.
    try:
        value = int(value)
        return datetime.utcfromtimestamp(value)
    except ValueError:
        return None


In [0]:
def YearSafe(value):
    value = value.strip() # Timestamp starting and ending with a double quotation mark.
    try:
        return datetime.strptime(value, "%Y")
    except ValueError:
        return None

#### Load recommendation log histroy file

In [0]:
rec_log_schema = StructType([StructField("UserId", IntegerType(), False),\
    StructField("ItemId", IntegerType(), False),\
    StructField("Result", IntegerType(), False),
    StructField("Unix-timestamp", TimestampType(), True)])

In [0]:
%%time
rec_log_rdd = sc.textFile('s3://msds697-group18-2022/track1/rec_log_train.txt')\
                .map(lambda x: x.split('\t'))\
                .map(lambda x : (IntegerSafe(x[0]), IntegerSafe(x[1]), IntegerSafe(x[2]), TimeStampSafe(x[3])))

In [0]:
rec_log_rdd.first()

In [0]:
%%time
rec_log_df = spark.createDataFrame(rec_log_rdd, rec_log_schema)
rec_log_df.printSchema()

In [0]:
%%time
rec_log_df.show()

#### Load user profile data

In [0]:
user_profile_schema = StructType([StructField("UserId", IntegerType(), False),\
    StructField("Year_of_birth", IntegerType(), True),\
    StructField("Gender", IntegerType(), False),
    StructField("Number_of_tweets", IntegerType(), False),
    StructField("TagIds", StringType(), True)])

In [0]:
%%time
user_profile_rdd = sc.textFile('s3://msds697-group18-2022/track1/user_profile.txt')\
                .map(lambda x: x.split('\t'))\
                .map(lambda x : (IntegerSafe(x[0]), IntegerSafe(x[1]), IntegerSafe(x[2]), IntegerSafe(x[3]), x[4]))

In [0]:
%%time
user_profile_rdd.first()

In [0]:
%%time
user_profile_df = spark.createDataFrame(user_profile_rdd, user_profile_schema)
user_profile_df.printSchema()

In [0]:
%%time
user_profile_df.show()

#### Convert String type Tag Ids into a list of Tag Ids

In [0]:
def split_Ids(tagids):
    
    '''
    convert string tagIDs in format "831;55;198;8;450" 
    to Array of integer elements --> [831,55,198,8,450]
    '''
    
    if tagids == '0':
        return []
    else:
        return [ IntegerSafe(x) for x in tagids.split(';') ]   

In [0]:
# Register function for unwrapping of Tagids into Array as UDF
clean_ids = udf(split_Ids, ArrayType(IntegerType()))

In [0]:
%%time
user_profile_df = user_profile_df.withColumn('Tag_Ids_list', clean_ids('TagIds'))\
                                 .drop('TagIds')

In [0]:
%%time
user_details = rec_log_df.join(user_profile_df, 'UserId', 'left_outer')

In [0]:
%%time
user_details.show()

#### Load item features

In [0]:
item_schema = StructType([StructField("ItemId", IntegerType(), False),\
    StructField("Category", StringType(), True),\
    StructField("Keywords", StringType(), False)])

In [0]:
%%time
item_rdd = sc.textFile('s3://msds697-group18-2022/track1/item.txt')\
                .map(lambda x: x.split('\t'))\
                .map(lambda x : (IntegerSafe(x[0]), x[1], x[2]))

In [0]:
item_rdd.first()

In [0]:
%%time
item_df = spark.createDataFrame(item_rdd, item_schema)
item_df.printSchema()

In [0]:
%%time
item_df.show()

#### Convert String type Keywords into a list of keywords

In [0]:
%%time
item_df = item_df.withColumn('Keywords_list', clean_ids('Keywords'))\
                                 .drop('Keywords')

In [0]:
%%time
log_details = user_details.join(item_df, 'ItemId', 'left_outer')

In [0]:
%%time
log_details.show()

#### Write the dataframe into Mongo DB

In [0]:
database = 'tencent'
collection = 'recologdetail'
user_name = '***'
password = '***'
address = '**'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
%%time
log_details.write.format("mongo").option("uri",connection_string).mode("overwrite").save()

In [0]:
spark.stop()