In [2]:
from __future__ import print_function

import pandas as pd
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.types import *
from pyspark.sql import HiveContext

In [2]:
train = pd.read_csv('/Users/chenyuanshan/temp/data/data/train_format1.csv')
test = pd.read_csv('/Users/chenyuanshan/temp/data/data/test_format1.csv')

In [3]:
info = pd.read_csv('/Users/chenyuanshan/temp/data/data/user_info_format1.csv')
log = pd.read_csv('/Users/chenyuanshan/temp/data/data/user_log_format1.csv')

In [4]:
test_info = pd.merge(test,info, on = "user_id", how = "left")
test_info.to_csv("/Users/chenyuanshan/temp/data/data/test_info.csv", index = False)

In [7]:
train_info = pd.merge(train, info, on = "user_id", how = "left")
train_info.to_csv("/Users/chenyuanshan/temp/data/data/train_info.csv", index = False)

Now we want wo have the number of the user that have viewed the store, click, buy, and add to chart. Use spark sql.

In [2]:
#conf = SparkConf.setAppName("spark_project").master("local")
#sc = SparkContext(conf=conf)
#sc.setLogLevel('WARN')
#spark = SparkSession.builder.getOrCreate()
#hive = HiveContext(sc)

myspark = SparkSession.builder \
    .appName('spark_project') \
    .config('spark.executor.memory','2g') \
    .enableHiveSupport() \
    .getOrCreate()

In [6]:
df_log = myspark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/chenyuanshan/temp/data/data/user_log_format1.csv')

In [7]:
df_train = myspark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/chenyuanshan/temp/data/data/train_info.csv')

In [8]:
df_log.createOrReplaceTempView("log")
df_log.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- cat_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- brand_id: integer (nullable = true)
 |-- time_stamp: integer (nullable = true)
 |-- action_type: integer (nullable = true)



In [10]:
log_count_user = myspark.sql("CREATE TABLE ViewUserSellerCount AS (SELECT user_id, seller_id, \
SUM(CASE WHEN action_type = 0 THEN 1 ELSE 0 END) AS click,\
SUM(CASE WHEN action_type = 1 THEN 1 ELSE 0 END) as add_chart,\
SUM(CASE WHEN action_type = 2 THEN 1 ELSE 0 END) AS buy, \
SUM(CASE WHEN action_type = 3 THEN 1 ELSE 0 END) AS favourite \
FROM log \
GROUP BY user_id, seller_id)")

In [12]:
t = myspark.sql("SELECT * FROM ViewUserSellerCount LIMIT 1")
t.head()

Row(user_id=186568, seller_id=1337, click=10, add_chart=0, buy=0, favourite=1)

In [14]:
#log_user_only = spark.sql("SELECT user_id, \
#COUNT(user_id) AS total_log, \
#SUM(CASE WHEN action_type = 0 THEN 1 ELSE 0 END) AS total_click,\
#SUM(CASE WHEN action_type = 1 THEN 1 ELSE 0 END) as total_add_chart,\
#SUM(CASE WHEN action_type = 2 THEN 1 ELSE 0 END) AS total_buy, \
#SUM(CASE WHEN action_type = 3 THEN 1 ELSE 0 END) AS total_favourite \
#FROM log \
#GROUP BY user_id")
log_user_only = myspark.sql("CREATE TABLE ViewUserCount AS (SELECT user_id, \
COUNT(user_id) AS total_log \
FROM log \
GROUP BY user_id)")
#log_user_only.head()

In [16]:
#log_seller_count = spark.sql("SELECT seller_id,\
#SUM(CASE WHEN action_type = 0 THEN 1 ELSE 0 END) AS seller_t_click,\
#SUM(CASE WHEN action_type = 1 THEN 1 ELSE 0 END) as seller_t_add_chart,\
#SUM(CASE WHEN action_type = 2 THEN 1 ELSE 0 END) AS seller_t_buy, \
#SUM(CASE WHEN action_type = 3 THEN 1 ELSE 0 END) AS seller_t_favourite \
#FROM log \
#GROUP BY seller_id")
log_seller_count = myspark.sql("CREATE TABLE ViewSellerCount AS (SELECT seller_id,\
SUM(CASE WHEN action_type = 0 THEN 1 ELSE 0 END) AS seller_t_click,\
SUM(CASE WHEN action_type = 2 THEN 1 ELSE 0 END) AS seller_t_buy \
FROM log \
GROUP BY seller_id)")
#log_seller_count.head()

In [11]:
#log_count_user.registerTempTable("ViewUserSellerCount")
#log_seller_count.registerTempTable("ViewSellerCount")
#log_user_only.registerTempTable("ViewUserCount")

In [18]:
train_User_Seller = myspark.sql("CREATE TABLE ViewJoinOne AS (SELECT t1.user_id, t1.seller_id, t1.click, t1.add_chart, t1.buy, t1. favourite, t1.store_buy_rate, t2.total_log \
FROM ((SELECT user_id, seller_id, \
click, add_chart, buy, favourite, buy/click AS store_buy_rate \
from ViewUserSellerCount)t1 \
LEFT JOIN \
(SELECT user_id, total_log FROM ViewUserCount)t2 \
ON t1.user_id = t2.user_id))")
#train_User_Seller.head()

In [16]:
#train_User_Seller.registerTempTable("ViewJoinOne")

In [4]:
train_seller = myspark.sql("SELECT t1.user_id, t1.seller_id, t1.total_log, t1.click, t1.add_chart, t1.buy, t1. favourite, t1.store_buy_rate, t2.store_rate \
FROM (SELECT user_id, seller_id, click, add_chart, buy, favourite, store_buy_rate, total_log \
FROM ViewJoinOne)t1 \
LEFT JOIN \
(SELECT seller_id, seller_t_buy/seller_t_click AS store_rate \
FROM ViewSellerCount)t2 \
ON t1.seller_id = t2.seller_id")
# Watch out! here to stop!


In [5]:
train_seller.repartition(1).write.format("com.databricks.spark.csv").options(header='true',inferschema='true').save('/Users/chenyuanshan/temp/data/data/tempsave.csv')

In [20]:
train_seller = myspark.sql("CREATE TABLE last AS (SELECT t1.user_id, t1.seller_id, t1.total_log, t1.click, t1.add_chart, t1.buy, t1. favourite, t1.store_buy_rate, t2.store_rate \
FROM (SELECT user_id, seller_id, click, add_chart, buy, favourite, store_buy_rate, total_log \
FROM ViewJoinOne)t1 \
LEFT JOIN \
(SELECT seller_id, seller_t_buy/seller_t_click AS store_rate \
FROM ViewSellerCount)t2 \
ON t1.seller_id = t2.seller_id)")
#train_seller.head()

In [21]:
df_train.registerTempTable("train")
#train_seller.registerTempTable("last")
df_train.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- age_range: double (nullable = true)
 |-- gender: double (nullable = true)



It finally export sucessfully!

In [20]:
#trainMatrix.registerTempTable("outcome")

Don't use temp view, don't use HIVE table and export in sql way.

print("phase one, merge needed later")

Because of file too large

In [20]:
trainMatrix.createOrReplaceTempView("smallMatrix")
trainMatrix.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- total_log: long (nullable = true)
 |-- click: long (nullable = true)
 |-- add_chart: long (nullable = true)
 |-- buy: long (nullable = true)
 |-- favourite: long (nullable = true)
 |-- store_buy_rate: double (nullable = true)
 |-- store_rate: double (nullable = true)



In [14]:
train_null = pd.read_csv('/Users/chenyuanshan/temp/data/data/featureMatrix/train.csv',header = None)
test_null = pd.read_csv('/Users/chenyuanshan/temp/data/data/testMatrix/test.csv',header = None)

In [15]:
train_null.isnull().sum(axis=0)

0         0
1         0
2      1253
3      3711
4         0
5         0
6         0
7         0
8         0
9     29873
10        0
11        0
dtype: int64

In [16]:
train_fill = train_null.fillna(method='ffill')

In [17]:
train_fill.isnull().sum(axis=0)

0     0
1     0
2     0
3     0
4     0
5     0
6     0
7     0
8     0
9     0
10    0
11    0
dtype: int64

In [18]:
test_null.isnull().sum(axis=0)

0          0
1          0
2       1325
3       3834
4          0
5          0
6          0
7          0
8          0
9      29535
10         0
11    261477
dtype: int64

In [19]:
test_fill = test_null.fillna(method='bfill')
test_fill = test_fill.fillna(method='ffill')

In [20]:
test_fill.isnull().sum(axis=0)

0          0
1          0
2          0
3          0
4          0
5          0
6          0
7          0
8          0
9          0
10         0
11    261477
dtype: int64

In [21]:
train_fill.to_csv('/Users/chenyuanshan/temp/data/data/featureMatrix/trainfill.csv', header = False, index = False)

In [22]:
test_fill.to_csv('/Users/chenyuanshan/temp/data/data/testMatrix/testfill.csv', header = False, index = False)

In [3]:
add_header = pd.read_csv('/Users/chenyuanshan/temp/data/data/featureMatrix/featureMatrix.csv')

In [5]:
add_header = add_header.fillna(method = 'ffill')

In [6]:
add_header.isnull().sum(axis=0)

user_id           0
merchant_id       0
age_range         0
gender            0
total_log         0
click             0
add_chart         0
buy               0
favourite         0
store_buy_rate    0
store_rate        0
label             0
dtype: int64

In [7]:
add_header.to_csv('/Users/chenyuanshan/temp/data/data/featureMatrix/withHeader.csv',index = False)

In [8]:
add_header = pd.read_csv('/Users/chenyuanshan/temp/data/data/testMatrix/testMatrix.csv')

In [10]:
add_header = add_header.fillna(method = 'ffill')

In [12]:
add_header.to_csv('/Users/chenyuanshan/temp/data/data/testMatrix/withHeader.csv',index = False)