In [16]:
#sample = sc.textFile("file:///home/hadoop/Jiaoyan/ds504-project/output1/expanded-data.csv")
#sample = textFile.sample(False, 0.5, 12345)
#sample.saveAsTextFile("file:///home/hadoop/Jiaoyan/ds504-project/sample")

In [4]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
filename = "file:///home/hadoop/Jiaoyan/ds504-project/merge.csv"
textFile = sc.textFile(filename)
header = textFile.take(1)[0]
parsedData = textFile.filter(lambda line: line != header).map(lambda line: line.split(','))

In [6]:
# convert rdd to spark dataframe
data = parsedData.map(lambda p : Row(user_id = p[0], merchant_id = p[1], item_id = p[2], cat_id = p[3], brand_id = p[4], \
                month = p[5], day = p[6], action = p[7], age_range = p[8], gender = p[9], rating = p[10], label = p[11]))


# Infer the schema, and register the DataFrame as a table.
schemaData = spark.createDataFrame(data)
schemaData.createOrReplaceTempView("data")

#schemaData.show()

In [7]:
# generate new features
# 1 Monthly click/add/favorite/purchase count for certain brand/ item/ category/ merchant
import pyspark.sql.functions as func
from pyspark.sql.functions import *
for_merchant = schemaData.groupBy(schemaData['merchant_id'],schemaData['month'], schemaData['action']).\
                            agg(count('action').alias("monthly_action_count_for_merchant"))
for_brand = schemaData.groupBy(schemaData['brand_id'],schemaData['month'], schemaData['action']).\
                            agg(count('action').alias("monthly_action_count_for_brand"))
for_category = schemaData.groupBy(schemaData['cat_id'],schemaData['month'], schemaData['action']).\
                            agg(count('action').alias("monthly_action_count_for_cat"))
for_item = schemaData.groupBy(schemaData['item_id'],schemaData['month'], schemaData['action']).\
                            agg(count('action').alias("monthly_action_count_for_item"))


addFeature1 = schemaData.join(for_merchant,["merchant_id","month","action"], "left_outer")
addFeature1 = addFeature1.join(for_brand,["brand_id","month","action"], "left_outer")
addFeature1 = addFeature1.join(for_category,["cat_id","month","action"], "left_outer")
addFeature1 = addFeature1.join(for_item,["item_id","month","action"], "left_outer")
#addFeature1.show()

In [8]:
#2. User’s monthly action count of click/add/favorite/purchase for all merchant
for_user = schemaData.groupBy(schemaData['user_id'],schemaData['month'], schemaData['action']).\
                            agg(count('merchant_id').alias('user_monthly_action'))
addFeature2 = addFeature1.join(for_user,['user_id','month','action'], 'left_outer')
#addFeature2.show()

In [9]:
#3.Days of a certains user clicked/added/favorited/purchased items in each month.
action_days = schemaData.groupBy(schemaData['user_id'],schemaData['action']).\
                            agg(count('day').alias('user_action_days'))
addFeature3 = addFeature2.join(action_days,['user_id','action'], 'left_outer')
#addFeature3.show()

In [10]:
#4.Total number of unique items/brand/category for a merchant
unique_items_for_merchant = schemaData.groupBy(schemaData['merchant_id']).\
                            agg(func.countDistinct('item_id').alias('unique_items_for_merchant'))
unique_cats_for_merchant = schemaData.groupBy(schemaData['merchant_id']).\
                            agg(func.countDistinct('cat_id').alias('unique_cats_for_merchant'))
unique_brands_for_merchant = schemaData.groupBy(schemaData['merchant_id']).\
                            agg(func.countDistinct('brand_id').alias('unique_brands_for_merchant'))

addFeature4 = addFeature3.join(unique_items_for_merchant,['merchant_id'], 'left_outer')
addFeature4 = addFeature4.join(unique_cats_for_merchant,['merchant_id'], 'left_outer')
addFeature4 = addFeature4.join(unique_brands_for_merchant,['merchant_id'], 'left_outer')
#addFeature4.show()

In [11]:
#5.The number of unique merchant that the user clicked/purchased/added/favourites in each month
unique_merchants_in_each_month = schemaData.groupBy(schemaData['user_id'],schemaData['month'],schemaData['action'])\
                            .agg(func.countDistinct('merchant_id').alias('unique_merchants_in_each_month'))

addFeature5 = addFeature4.join(unique_merchants_in_each_month,['user_id','month','action'], 'left_outer')
addFeature5.show()

+-------+-----+------+-----------+-------+------+--------+---------+---+------+-----+-------------------+---------------------------------+------------------------------+----------------------------+-----------------------------+-------------------+----------------+-------------------------+------------------------+--------------------------+------------------------------+
|user_id|month|action|merchant_id|item_id|cat_id|brand_id|age_range|day|gender|label|             rating|monthly_action_count_for_merchant|monthly_action_count_for_brand|monthly_action_count_for_cat|monthly_action_count_for_item|user_monthly_action|user_action_days|unique_items_for_merchant|unique_cats_for_merchant|unique_brands_for_merchant|unique_merchants_in_each_month|
+-------+-----+------+-----------+-------+------+--------+---------+---+------+-----+-------------------+---------------------------------+------------------------------+----------------------------+-----------------------------+-------------------

In [12]:
#6. The number of unique user who clicked/purchased/added/favourites for a given items/brands/categories/merchant
unique_users_for_item = schemaData.groupBy(schemaData['item_id'],schemaData['action']).\
                        agg(func.countDistinct('user_id').alias('unique_users_for_item'))
    
unique_users_for_cat = schemaData.groupBy(schemaData['cat_id'],schemaData['action']).\
                        agg(func.countDistinct('user_id').alias('unique_users_for_cat'))
    
unique_users_for_brand= schemaData.groupBy(schemaData['brand_id'],schemaData['action']).\
                        agg(func.countDistinct('user_id').alias('unique_users_for_brand'))
    
unique_users_for_merchant = schemaData.groupBy(schemaData['merchant_id'],schemaData['action']).\
                        agg(func.countDistinct('user_id').alias('unique_users_for_merchant'))

addFeature6 = addFeature5.join(unique_users_for_item,['item_id','action'], 'left_outer')
addFeature6 = addFeature6.join(unique_users_for_cat,['cat_id','action'], 'left_outer')
addFeature6 = addFeature6.join(unique_users_for_brand,['brand_id','action'], 'left_outer')
addFeature6 = addFeature6.join(unique_users_for_merchant,['merchant_id','action'], 'left_outer')
#addFeature6.show()

In [13]:
#7.Counts of clicked/purchased/added/favorited of items for a certain user on the last Double 11 day
filtered = schemaData.filter((schemaData['month'] == 11) & (schemaData['day'] == 11))
action_on_double11 = filtered.groupBy(filtered['user_id'],filtered['action']).\
                        agg(func.countDistinct('action').alias('action_on_double11'))
addFeature7 = addFeature6.join(action_on_double11,['user_id','action'], 'left_outer')
addFeature7 = addFeature7.na.fill(0)
#addFeature7.show()

In [14]:
#8.1 The number of users who bought on at least two different days from items
filtered_purchase =  schemaData.filter(schemaData['action'] == 2)
purchase_item = filtered_purchase.groupBy(filtered_purchase['item_id'],filtered_purchase['user_id']).\
                            agg(func.countDistinct('day').alias('purchase_days'))
purchase_item_two_times = purchase_item.filter(purchase_item['purchase_days'] > 1)
user_purchase_item_two_times = purchase_item_two_times.groupBy(purchase_item_two_times['item_id']).\
                            agg(func.countDistinct('user_id').alias('user_purchase_item_two_times'))

join_item_purchase = addFeature7.join(user_purchase_item_two_times,['item_id'], 'left_outer')
join_item_purchase = join_item_purchase.na.fill(0)

#8.2 The number of users who bought on at least two different days from brands
purchase_brand = filtered_purchase.groupBy(filtered_purchase['brand_id'],filtered_purchase['user_id']).\
                            agg(func.countDistinct('day').alias('purchase_days'))
purchase_brand_two_times = purchase_brand.filter(purchase_brand['purchase_days'] > 1)
user_purchase_brand_two_times = purchase_brand_two_times.groupBy(purchase_brand_two_times['brand_id']).\
                            agg(func.countDistinct('user_id').alias('user_purchase_brand_two_times'))

join_brand_purchase = join_item_purchase.join(user_purchase_brand_two_times,['brand_id'], 'left_outer')
join_brand_purchase = join_brand_purchase.na.fill(0)

#8.3 The number of users who bought on at least two different days from categories
purchase_cat = filtered_purchase.groupBy(filtered_purchase['cat_id'],filtered_purchase['user_id']).\
                            agg(func.countDistinct('day').alias('purchase_days'))
purchase_cat_two_times = purchase_cat.filter(purchase_cat['purchase_days'] > 1)
user_purchase_cat_two_times = purchase_cat_two_times.groupBy(purchase_cat_two_times['cat_id']).\
                            agg(func.countDistinct('user_id').alias('user_purchase_cat_two_times'))

join_cat_purchase = join_brand_purchase.join(user_purchase_cat_two_times,['cat_id'], 'left_outer')
join_cat_purchase = join_cat_purchase.na.fill(0)

#8.4 The number of users who bought on at least two different days from merchants
purchase_merchant = filtered_purchase.groupBy(filtered_purchase['merchant_id'],filtered_purchase['user_id']).\
                            agg(func.countDistinct('day').alias('purchase_days'))
purchase_merchant_two_times = purchase_merchant.filter(purchase_merchant['purchase_days'] > 1)
user_purchase_merchant_two_times = purchase_merchant_two_times.groupBy(purchase_merchant_two_times['merchant_id']).\
                            agg(func.countDistinct('user_id').alias('user_purchase_merchant_two_times'))

join_merchant_purchase = join_cat_purchase.join(user_purchase_merchant_two_times,['merchant_id'], 'left_outer')
join_merchant_purchase = join_merchant_purchase.na.fill(0)

join_merchant_purchase.show()

+-----------+------+--------+-------+-------+------+-----+---------+---+------+-----+--------------------+---------------------------------+------------------------------+----------------------------+-----------------------------+-------------------+----------------+-------------------------+------------------------+--------------------------+------------------------------+---------------------+--------------------+----------------------+-------------------------+------------------+----------------------------+-----------------------------+---------------------------+--------------------------------+
|merchant_id|cat_id|brand_id|item_id|user_id|action|month|age_range|day|gender|label|              rating|monthly_action_count_for_merchant|monthly_action_count_for_brand|monthly_action_count_for_cat|monthly_action_count_for_item|user_monthly_action|user_action_days|unique_items_for_merchant|unique_cats_for_merchant|unique_brands_for_merchant|unique_merchants_in_each_month|unique_users_for

In [15]:
join_merchant_purchase.coalesce(1).write.format('com.databricks.spark.csv').\
            save('file:///home/hadoop/Jiaoyan/ds504-project/output1',header = 'true')