In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession\
        .builder\
        .master("local")\
        .appName("DataProcess")\
        .config("spark.executor.memory","3g")\
        .config("spark.executor.instances","5")\
        .getOrCreate()

In [3]:
#导入user_log数据
user_log = spark.read.csv(r"hdfs://node1:9000/user/root/exp4/data_format1/user_log_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [4]:
user_log.orderBy("user_id").limit(10).show()

+-------+-------+------+---------+--------+----------+-----------+
|user_id|item_id|cat_id|seller_id|brand_id|time_stamp|action_type|
+-------+-------+------+---------+--------+----------+-----------+
|      1| 112203|  1252|     4026|    1469|      1018|          0|
|      1|1110495|   992|     1019|    6805|      1111|          0|
|      1|1110495|   992|     1019|    6805|      1111|          0|
|      1| 556107|  1252|      739|    6822|      1018|          0|
|      1| 411984|  1252|     4177|    1960|      1018|          0|
|      1| 112203|  1252|     4026|    1469|      1018|          0|
|      1| 112203|  1252|     4026|    1469|      1021|          2|
|      1| 112203|  1252|     4026|    1469|      1021|          0|
|      1|1110495|   992|     1019|    6805|      1111|          0|
|      1|1110495|   992|     1019|    6805|      1111|          0|
+-------+-------+------+---------+--------+----------+-----------+



In [5]:
#导入训练集
df_train = spark.read.csv(r"hdfs://node1:9000/user/root/exp4/data_format1/train_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [6]:
df_train.limit(10).show()

+-------+-----------+-----+
|user_id|merchant_id|label|
+-------+-----------+-----+
|  34176|       3906|    0|
|  34176|        121|    0|
|  34176|       4356|    1|
|  34176|       2217|    0|
| 230784|       4818|    0|
| 362112|       2618|    0|
|  34944|       2051|    0|
| 231552|       3828|    1|
| 231552|       2124|    0|
| 232320|       1168|    0|
+-------+-----------+-----+



In [7]:
#导入用户信息
user_info = spark.read.csv(r"hdfs://node1:9000/user/root/exp4/data_format1/user_info_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [8]:
user_info.limit(10).show()

+-------+---------+------+
|user_id|age_range|gender|
+-------+---------+------+
| 376517|        6|     1|
| 234512|        5|     0|
| 344532|        5|     0|
| 186135|        5|     0|
|  30230|        5|     0|
| 272389|        6|     1|
| 281071|        4|     0|
| 139859|        7|     0|
| 198411|        5|     1|
|  67037|        4|     1|
+-------+---------+------+



想要建立的特征
需要根据user_id，和merchant_id（seller_id）,从用户画像表以及用户日志表中提取特征，填写到df_train这个数据框中，从而训练评估模型 需要建立的特征如下：

用户的年龄(age_range)  
用户的性别(gender)  
某用户在该商家日志的总条数(total_logs)  
用户浏览的商品的数目，就是浏览了多少个商品(unique_item_ids)  
浏览的商品的种类的数目，就是浏览了多少种商品(categories)  
用户浏览的天数(browse_days)  
用户单击的次数(one_clicks)  
用户添加购物车的次数(shopping_carts)  
用户购买的次数(purchase_times)  
用户收藏的次数(favourite_times)  

In [9]:
#age_range,gender特征添加
df_train = df_train.join(user_info,["user_id"],"left")

In [10]:
#total_logs(某用户在该商家日志的总条数)特征添加
total_logs_temp = user_log.groupby(["user_id","seller_id"]).count()
total_logs_temp.orderBy("user_id").limit(20).show()

+-------+---------+-----+
|user_id|seller_id|count|
+-------+---------+-----+
|      1|      739|    1|
|      1|      925|    4|
|      1|     1019|   14|
|      1|     4335|    1|
|      1|     4026|    5|
|      1|     2245|    5|
|      1|      471|    1|
|      1|     1156|    1|
|      1|     4177|    1|
|      2|     1784|    2|
|      2|     1544|    1|
|      2|     1974|   20|
|      2|     2223|    2|
|      2|      420|   26|
|      2|     1679|    3|
|      2|     3716|    1|
|      2|     4924|    1|
|      2|     2412|    1|
|      2|     1816|    1|
|      2|     1179|    1|
+-------+---------+-----+



In [11]:
total_logs_temp = total_logs_temp.withColumnRenamed("seller_id","merchant_id").withColumnRenamed("count","total_logs")
total_logs_temp.limit(1).show()

+-------+-----------+----------+
|user_id|merchant_id|total_logs|
+-------+-----------+----------+
| 328862|       1200|         5|
+-------+-----------+----------+



In [12]:
df_train = df_train.join(total_logs_temp,["user_id","merchant_id"],"left")
df_train.limit(10).show()

+-------+-----------+-----+---------+------+----------+
|user_id|merchant_id|label|age_range|gender|total_logs|
+-------+-----------+-----+---------+------+----------+
|    464|       4718|    0|        6|     0|         4|
|    867|       3152|    0|        3|     0|        17|
|   1882|       4377|    0|        6|     1|         4|
|   2450|       2760|    0|        0|     0|         5|
|   2766|       3885|    0|        4|     1|         1|
|   2829|        467|    0|        4|     0|         3|
|   2861|       4973|    0|        6|     0|         1|
|   3359|       4090|    0|        2|     0|         8|
|   3487|       1861|    0|        6|     1|         4|
|   5460|       1485|    0|        4|     0|        58|
+-------+-----------+-----+---------+------+----------+



In [13]:
#unique_item_ids特征添加
unique_item_ids_temp = user_log.groupby(["user_id","seller_id","item_id"]).count()
unique_item_ids_temp = unique_item_ids_temp.selectExpr("user_id","seller_id","item_id")
unique_item_ids_temp.show()

+-------+---------+-------+
|user_id|seller_id|item_id|
+-------+---------+-------+
| 328862|     1253| 315621|
| 328862|     1138| 431931|
| 328862|      465|  92347|
| 328862|     3828| 671405|
| 356311|     4650| 238223|
|  92396|     4499| 680664|
|  37908|     2883| 150707|
|   2859|      296| 408609|
|   2859|     1703| 220138|
| 153790|      158| 209679|
|  26516|     3326| 154863|
| 366342|     1892|  44341|
| 171799|     1488| 614056|
| 300681|      606| 402611|
| 300681|     1461| 603371|
| 300681|     2594| 533712|
| 300681|      527| 117760|
| 300681|      606| 552140|
| 300681|     2900| 637306|
| 401805|     4432| 209498|
+-------+---------+-------+
only showing top 20 rows



In [14]:
unique_item_ids_temp = unique_item_ids_temp.groupBy(["user_id","seller_id"]).count()
unique_item_ids_temp = unique_item_ids_temp.withColumnRenamed("seller_id","merchant_id").withColumnRenamed("count","unique_item_ids")
unique_item_ids_temp.limit(10).show()

+-------+-----------+---------------+
|user_id|merchant_id|unique_item_ids|
+-------+-----------+---------------+
| 303135|        306|             51|
| 197844|       1550|              7|
| 380525|        913|              1|
|  70920|       2328|              1|
|  96653|       2114|             19|
|  24240|       3471|              1|
| 416856|        133|              7|
| 390478|        113|              2|
| 257715|       1617|              4|
|  29193|       1606|             14|
+-------+-----------+---------------+



In [15]:
df_train = df_train.join(unique_item_ids_temp,["user_id","merchant_id"],"left")
df_train.limit(10).show()

+-------+-----------+-----+---------+------+----------+---------------+
|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|
+-------+-----------+-----+---------+------+----------+---------------+
|    464|       4718|    0|        6|     0|         4|              2|
|    867|       3152|    0|        3|     0|        17|              3|
|   1882|       4377|    0|        6|     1|         4|              1|
|   2450|       2760|    0|        0|     0|         5|              1|
|   2766|       3885|    0|        4|     1|         1|              1|
|   2829|        467|    0|        4|     0|         3|              2|
|   2861|       4973|    0|        6|     0|         1|              1|
|   3359|       4090|    0|        2|     0|         8|              3|
|   3487|       1861|    0|        6|     1|         4|              1|
|   5460|       1485|    0|        4|     0|        58|             29|
+-------+-----------+-----+---------+------+----------+---------

In [16]:
#categories特征构建
categories_temp = user_log.groupby(["user_id", "seller_id", "cat_id"]).count()
#categories_temp.show()

In [17]:
categories_temp = categories_temp.selectExpr("user_id","seller_id","cat_id")
#categories_temp.show()

In [18]:
categories_temp = categories_temp.groupBy(["user_id","seller_id"]).count()
categories_temp = categories_temp.withColumnRenamed("seller_id","merchant_id").withColumnRenamed("count","categories")
#categories_temp.limit(10).show()

In [19]:
df_train = df_train.join(categories_temp,["user_id","merchant_id"],"left")
df_train.limit(10).show()

+-------+-----------+-----+---------+------+----------+---------------+----------+
|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|
+-------+-----------+-----+---------+------+----------+---------------+----------+
|    464|       4718|    0|        6|     0|         4|              2|         2|
|    867|       3152|    0|        3|     0|        17|              3|         1|
|   1882|       4377|    0|        6|     1|         4|              1|         1|
|   2450|       2760|    0|        0|     0|         5|              1|         1|
|   2766|       3885|    0|        4|     1|         1|              1|         1|
|   2829|        467|    0|        4|     0|         3|              2|         1|
|   2861|       4973|    0|        6|     0|         1|              1|         1|
|   3359|       4090|    0|        2|     0|         8|              3|         2|
|   3487|       1861|    0|        6|     1|         4|              1|         1|
|   

In [20]:
#browse_days特征构建
browse_days_temp = user_log.groupby(["user_id","seller_id","time_stamp"]).count()
#browse_days_temp.limit(10).show()

In [21]:
browse_days_temp = browse_days_temp.selectExpr("user_id","seller_id","time_stamp")
browse_days_temp = browse_days_temp.groupby(["user_id","seller_id"]).count()
browse_days_temp = browse_days_temp.withColumnRenamed("seller_id","merchant_id").withColumnRenamed("count","browse_days")
#browse_days_temp.limit(10).show()

In [22]:
df_train = df_train.join(browse_days_temp,["user_id","merchant_id"],"left")
df_train.limit(10).show()

+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+
|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|browse_days|
+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+
|    464|       4718|    0|        6|     0|         4|              2|         2|          1|
|    867|       3152|    0|        3|     0|        17|              3|         1|          4|
|   1882|       4377|    0|        6|     1|         4|              1|         1|          1|
|   2450|       2760|    0|        0|     0|         5|              1|         1|          1|
|   2766|       3885|    0|        4|     1|         1|              1|         1|          1|
|   2829|        467|    0|        4|     0|         3|              2|         1|          1|
|   2861|       4973|    0|        6|     0|         1|              1|         1|          1|
|   3359|       4090|    0|        2|     0|      

In [23]:
#先将处理好的暂时写到文件中
df_train.write.options(header="true").csv("hdfs://node1:9000/user/root/exp4/procd_train_temp.csv", mode = 'overwrite')

In [24]:
#为了避免jvm崩掉，只能另起一个session
spark.stop()
spark = SparkSession\
        .builder\
        .master("local")\
        .appName("DataProcess2")\
        .config("spark.executor.memory","3g")\
        .config("spark.executor.instances","5")\
        .getOrCreate()

In [25]:
#导入user_log数据
user_log = spark.read.csv(r"hdfs://node1:9000/user/root/exp4/data_format1/user_log_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [26]:
#one_clicks、shopping_carts、purchase_times、favourite_times特征构建
one_clicks_temp = user_log.groupby(["user_id","seller_id","action_type"]).count()
one_clicks_temp = one_clicks_temp.withColumnRenamed("seller_id","merchant_id").withColumnRenamed("count","times")
#one_clicks_temp.limit(10).show()

In [27]:
from pyspark.sql import functions
from pyspark.sql.types import *
def click_time(action,times):
    if action == 0:
        return 0
    else:
        return times
udf_click_time = functions.udf(click_time,IntegerType())
one_clicks_temp = one_clicks_temp.withColumn("one_clicks",udf_click_time("action_type","times"))
#one_clicks_temp.limit(10).show()

In [28]:
def shopping_click(action,times):
    if action == 1:
        return times
    else:
        return 0
udf_click_time = functions.udf(shopping_click,IntegerType())
one_clicks_temp = one_clicks_temp.withColumn("shopping_carts",udf_click_time("action_type","times"))
#one_clicks_temp.limit(10).show()

In [29]:
def purchase_click(action,times):
    if action == 2:
        return times
    else:
        return 0
udf_click_time = functions.udf(purchase_click,IntegerType())
one_clicks_temp = one_clicks_temp.withColumn("purchase_times",udf_click_time("action_type","times"))
#one_clicks_temp.limit(10).show()

In [30]:
def favor_click(action,times):
    if action == 3:
        return times
    else:
        return 0
udf_click_time = functions.udf(favor_click,IntegerType())
one_clicks_temp = one_clicks_temp.withColumn("favor_times",udf_click_time("action_type","times"))
#one_clicks_temp.limit(10).show()

In [31]:
four_features = one_clicks_temp.groupby(["user_id","merchant_id"]).sum()
#four_features.limit(10).show()

In [32]:
four_features = four_features.selectExpr("user_id","merchant_id","`sum(one_clicks)` as one_clicks",
"`sum(shopping_carts)` as shopping_carts","`sum(purchase_times)` as purchase_times","`sum(favor_times)` as favor_times")
four_features.limit(10).show()

+-------+-----------+----------+--------------+--------------+-----------+
|user_id|merchant_id|one_clicks|shopping_carts|purchase_times|favor_times|
+-------+-----------+----------+--------------+--------------+-----------+
|  26535|       3322|         0|             0|             0|          0|
| 324592|       1704|         0|             0|             0|          0|
| 109128|       3639|         0|             0|             0|          0|
| 135474|        476|         0|             0|             0|          0|
| 265449|       1636|         1|             0|             1|          0|
| 360493|        167|         1|             0|             1|          0|
| 233113|        941|         0|             0|             0|          0|
| 211495|       3450|         0|             0|             0|          0|
| 230439|       3682|         0|             0|             0|          0|
| 332399|        158|         0|             0|             0|          0|
+-------+-----------+----

In [34]:
#导入train_data数据
df_train = spark.read.csv(r"hdfs://node1:9000/user/root/exp4/procd_train_temp.csv", encoding='utf8', header=True, inferSchema=True)

In [35]:
df_train = df_train.join(four_features,["user_id","merchant_id"],"left")
df_train.limit(10).show()

+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+
|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|browse_days|one_clicks|shopping_carts|purchase_times|favor_times|
+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+
|    464|       4718|    0|        6|     0|         4|              2|         2|          1|         1|             0|             1|          0|
|    867|       3152|    0|        3|     0|        17|              3|         1|          4|         1|             0|             1|          0|
|   1882|       4377|    0|        6|     1|         4|              1|         1|          1|         1|             0|             1|          0|
|   2450|       2760|    0|        0|     0|         5|              1|         1|          1|         2|       

In [36]:
df_train.write.options(header="true").csv("hdfs://node1:9000/user/root/exp4/procd_train_real.csv")
df_train.write.parquet("hdfs://node1:9000/user/root/exp4/procd_train_real.parquet")

In [37]:
#填充缺失值
#第一种策略是将后8个特征所有null值填充为0
df_train_filled = df_train.fillna(0)
df_train_filled.show()

+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+
|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|browse_days|one_clicks|shopping_carts|purchase_times|favor_times|
+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+
|    464|       4718|    0|        6|     0|         4|              2|         2|          1|         1|             0|             1|          0|
|    867|       3152|    0|        3|     0|        17|              3|         1|          4|         1|             0|             1|          0|
|   1882|       4377|    0|        6|     1|         4|              1|         1|          1|         1|             0|             1|          0|
|   2450|       2760|    0|        0|     0|         5|              1|         1|          1|         2|       

In [38]:
#将数据转为合适的格式
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
#先转成RDD
df_train_rdd = df_train_filled.rdd
#改成(label,features)的格式
df_train_rdd = df_train_rdd.map(lambda line: LabeledPoint(line[2],Vectors.dense(line[3:])))

In [39]:
#保存为LibSVMFile格式，方便后面训练使用
from pyspark.mllib.util import MLUtils
MLUtils.saveAsLibSVMFile(df_train_rdd, "hdfs://node1:9000/user/root/exp4/procd_train_real")

In [40]:
#别忘了关掉session
spark.stop()