In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession\
        .builder\
        .appName("MLlib")\
        .getOrCreate()

In [3]:
user_log = spark.read.csv(r"file:///home/lzt/Documents/lab4_data/input/user_log_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [4]:
user_info = spark.read.csv(r"file:///home/lzt/Documents/lab4_data/input/user_info_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [5]:
df_train = spark.read.csv(r"file:///home/lzt/Documents/lab4_data/input/train_format1.csv", encoding='utf8', header=True, inferSchema=True)

In [6]:
# 添加年龄、性别
df_train = df_train.join(user_info, ["user_id"], "left")
df_train.limit(5).show()

+-------+-----------+-----+---------+------+
|user_id|merchant_id|label|age_range|gender|
+-------+-----------+-----+---------+------+
|  34176|       3906|    0|        6|     0|
|  34176|        121|    0|        6|     0|
|  34176|       4356|    1|        6|     0|
|  34176|       2217|    0|        6|     0|
| 230784|       4818|    0|        0|     0|
+-------+-----------+-----+---------+------+



In [7]:
# log_count为某用户在某店加入购物车、购买、收藏的总数
user_log_exceptClick = user_log[user_log["action_type"] != 0]
log_count = user_log_exceptClick.groupby(["user_id", "seller_id"]).count()
log_count = log_count.withColumnRenamed("seller_id", "merchant_id").withColumnRenamed("count", "log_count")
df_train = df_train.join(log_count, ["user_id", "merchant_id"],"left")
df_train.limit(5).show()

+-------+-----------+-----+---------+------+---------+
|user_id|merchant_id|label|age_range|gender|log_count|
+-------+-----------+-----+---------+------+---------+
|    464|       4718|    0|        6|     0|        1|
|    867|       3152|    0|        3|     0|        1|
|   1882|       4377|    0|        6|     1|        1|
|   2450|       2760|    0|        0|     0|        2|
|   2766|       3885|    0|        4|     1|        1|
+-------+-----------+-----+---------+------+---------+



In [8]:
# cart_count为某用户在某店加入购物车的总数
user_log_cart = user_log[user_log["action_type"] == 1]
cart_count = user_log_cart.groupby(["user_id", "seller_id"]).count()
cart_count = cart_count.withColumnRenamed("seller_id", "merchant_id").withColumnRenamed("count", "cart_count")
df_train = df_train.join(cart_count, ["user_id", "merchant_id"], "left")
df_train.limit(5).show()

+-------+-----------+-----+---------+------+---------+----------+
|user_id|merchant_id|label|age_range|gender|log_count|cart_count|
+-------+-----------+-----+---------+------+---------+----------+
|    464|       4718|    0|        6|     0|        1|      null|
|    867|       3152|    0|        3|     0|        1|      null|
|   1882|       4377|    0|        6|     1|        1|      null|
|   2450|       2760|    0|        0|     0|        2|      null|
|   2766|       3885|    0|        4|     1|        1|      null|
+-------+-----------+-----+---------+------+---------+----------+



In [9]:
# buy_count为某用户在某店购买的总数
user_log_buy = user_log[user_log["action_type"] == 2]
buy_count = user_log_buy.groupby(["user_id", "seller_id"]).count()
buy_count = buy_count.withColumnRenamed("seller_id", "merchant_id").withColumnRenamed("count", "buy_count")
df_train = df_train.join(buy_count, ["user_id", "merchant_id"], "left")
df_train.limit(5).show()

+-------+-----------+-----+---------+------+---------+----------+---------+
|user_id|merchant_id|label|age_range|gender|log_count|cart_count|buy_count|
+-------+-----------+-----+---------+------+---------+----------+---------+
|    464|       4718|    0|        6|     0|        1|      null|        1|
|    867|       3152|    0|        3|     0|        1|      null|        1|
|   1882|       4377|    0|        6|     1|        1|      null|        1|
|   2450|       2760|    0|        0|     0|        2|      null|        2|
|   2766|       3885|    0|        4|     1|        1|      null|        1|
+-------+-----------+-----+---------+------+---------+----------+---------+



In [10]:
# collect_count为某用户在某店收藏的总数
user_log_collect = user_log[user_log["action_type"] == 3]
collect_count = user_log_collect.groupby(["user_id", "seller_id"]).count()
collect_count = collect_count.withColumnRenamed("seller_id", "merchant_id").withColumnRenamed("count", "collect_count")
df_train = df_train.join(collect_count, ["user_id", "merchant_id"], "left")
df_train.limit(5).show()

+-------+-----------+-----+---------+------+---------+----------+---------+-------------+
|user_id|merchant_id|label|age_range|gender|log_count|cart_count|buy_count|collect_count|
+-------+-----------+-----+---------+------+---------+----------+---------+-------------+
|    464|       4718|    0|        6|     0|        1|      null|        1|         null|
|    867|       3152|    0|        3|     0|        1|      null|        1|         null|
|   1882|       4377|    0|        6|     1|        1|      null|        1|         null|
|   2450|       2760|    0|        0|     0|        2|      null|        2|         null|
|   2766|       3885|    0|        4|     1|        1|      null|        1|         null|
+-------+-----------+-----+---------+------+---------+----------+---------+-------------+



In [11]:
df_train = df_train.fillna(0)
df_train.limit(5).show()

+-------+-----------+-----+---------+------+---------+----------+---------+-------------+
|user_id|merchant_id|label|age_range|gender|log_count|cart_count|buy_count|collect_count|
+-------+-----------+-----+---------+------+---------+----------+---------+-------------+
|    464|       4718|    0|        6|     0|        1|         0|        1|            0|
|    867|       3152|    0|        3|     0|        1|         0|        1|            0|
|   1882|       4377|    0|        6|     1|        1|         0|        1|            0|
|   2450|       2760|    0|        0|     0|        2|         0|        2|            0|
|   2766|       3885|    0|        4|     1|        1|         0|        1|            0|
+-------+-----------+-----+---------+------+---------+----------+---------+-------------+



In [12]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

In [13]:
# 将数据转化为RDD
train_rdd = df_train.rdd
train_rdd = train_rdd.map(lambda x: LabeledPoint(x[2], Vectors.dense(x[3:])))
training, test = train_rdd.randomSplit([0.7, 0.3], seed=1)
training.cache()

PythonRDD[268] at RDD at PythonRDD.scala:53

In [14]:
# 逻辑回归，参考官方代码
# Run training algorithm to build the model
lr_model = LogisticRegressionWithLBFGS.train(training)

# Compute raw scores on the test set
predictionAndLabels1 = test.map(lambda lp: (float(lr_model.predict(lp.features)), lp.label))

# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictionAndLabels1)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

# Evaluating the model on test data
test_error = predictionAndLabels1.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())
print("Test Error = " + str(test_error))

Area under PR = 0.09310795813683474
Area under ROC = 0.50004247074441
Test Error = 0.061263329029790085


In [15]:
# SVM
# Build the model
from pyspark.mllib.classification import SVMWithSGD, SVMModel
svm_model = SVMWithSGD.train(training, iterations=100)

# Evaluating the model on test data
predictionAndLabels2 = test.map(lambda lp: (lp.label, svm_model.predict(lp.features)))
test_error = predictionAndLabels2.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())
print("Test Error = " + str(test_error))

Test Error = 0.061205948453115284


In [16]:
# 下面在测试数据上做预测，先对测试数据计算特征
df_test = spark.read.csv(r"file:///home/lzt/Documents/lab4_data/input/test_format1.csv", encoding='utf8', header=True, inferSchema=True) 

# 添加年龄、性别
df_test = df_test.join(user_info, ["user_id"], "left")

# log_count为某用户在某店加入购物车、购买、收藏的总数
df_test = df_test.join(log_count, ["user_id", "merchant_id"],"left")

# cart_count为某用户在某店加入购物车的总数
df_test = df_test.join(cart_count, ["user_id", "merchant_id"], "left")

# buy_count为某用户在某店购买的总数
df_test = df_test.join(buy_count, ["user_id", "merchant_id"], "left")

# collect_count为某用户在某店收藏的总数
df_test = df_test.join(collect_count, ["user_id", "merchant_id"], "left")

df_test = df_test.fillna(0)
df_test.limit(5).show()

+-------+-----------+----+---------+------+---------+----------+---------+-------------+
|user_id|merchant_id|prob|age_range|gender|log_count|cart_count|buy_count|collect_count|
+-------+-----------+----+---------+------+---------+----------+---------+-------------+
|    565|       1970|null|        5|     0|        1|         0|        1|            0|
|    596|       3936|null|        2|     0|        2|         0|        1|            1|
|    919|       4044|null|        3|     0|        3|         0|        2|            1|
|   2026|       1557|null|        7|     1|        2|         0|        1|            1|
|   2491|       2991|null|        0|     0|        1|         0|        1|            0|
+-------+-----------+----+---------+------+---------+----------+---------+-------------+



In [97]:
# 将数据转化为RDD
test_rdd = df_test.rdd
# test_rdd = test_rdd.map(lambda x: LabeledPoint(0.5, Vectors.dense(x[3:])))
test_rdd = test_rdd.map(lambda x: (x[0], x[1], Vectors.dense(x[3:])))
test_rdd.take(10)

[(565, 1970, DenseVector([5.0, 0.0, 1.0, 0.0, 1.0, 0.0])),
 (596, 3936, DenseVector([2.0, 0.0, 2.0, 0.0, 1.0, 1.0])),
 (919, 4044, DenseVector([3.0, 0.0, 3.0, 0.0, 2.0, 1.0])),
 (2026, 1557, DenseVector([7.0, 1.0, 2.0, 0.0, 1.0, 1.0])),
 (2491, 2991, DenseVector([0.0, 0.0, 1.0, 0.0, 1.0, 0.0])),
 (2544, 191, DenseVector([2.0, 0.0, 3.0, 0.0, 2.0, 1.0])),
 (2604, 4618, DenseVector([3.0, 0.0, 1.0, 0.0, 1.0, 0.0])),
 (3002, 1359, DenseVector([0.0, 0.0, 2.0, 0.0, 2.0, 0.0])),
 (3776, 2482, DenseVector([2.0, 0.0, 3.0, 0.0, 1.0, 2.0])),
 (3802, 10, DenseVector([5.0, 0.0, 1.0, 0.0, 1.0, 0.0]))]

In [100]:
lr_res = test_rdd.map(lambda x: (x[0], x[1], float(lr_model.predict(x[2]))))
lr_res_df = lr_res.toDF()
lr_res_df.coalesce(1).write.options(header="true").csv("file:///home/lzt/Documents/mllib/lr_res_df")

In [101]:
svm_res = test_rdd.map(lambda x: (x[0], x[1], float(svm_model.predict(x[2]))))
svm_res_df = svm_res.toDF()
svm_res_df.coalesce(1).write.options(header="true").csv("file:///home/lzt/Documents/mllib/svm_res_df")