## 业务场景

通过用户过去三个月的消费信息（流量，上网时长，通话时长，缴费金额等），预测用户下月流量，从而实现相应流量业务的精准推荐。


## 字段信息：
名称       | 说明   | 类型  |
--------------|---------|--------|-----
USER_ID     |用户标识 | INT   |
SERVICESET_FLUX| 套餐内流量（M)	| DOUBEL
ONLINE_FLUX_THISMONTH|当月流量|
ONLINE_FLUX_LASTMONTH|上个月流量|   
ONLINE_FLUX_TWOMONTHSAGO|上上个月前流量|
ONLINE_DURATION_THISMONTH|当月上网时长|
ONLINE_DURATION_LASTMONTH|上个月上网时长（分）|
ONLINE_DURATION_TWOMONTHSAGO|上个月上网时长（分）|
CALL_DURATION_THISMONTH|当月通话时长（分）|
CALL_DURATION_LASTMONTH|上个月通话时长（分）|
CALL_DURATION_TWOMONTHSAGO|上个月通话时长（分）|
NET_DURATION |入网时长（天） |LONG
LAST_RECHARGE_VALUE|最近一次缴费金额（元）
TOTAL_RECHARGE_VALUE|近三个月缴费金额(元)|
TOTAL_RECHARGE_COUNT|近三个月缴费次数|
TOTAL_OWE_COUNT|近三个月欠费次数（次）	|INT
BALANCED |余额| double
ONLINE_FLUX_NEXTMONTH|下个月流量|




In [1]:
#导入常用的库
# numpy: python中常用的多维数值计算库，常用简称 np
# pandas:python中的数据分析库，常用简称 pd
# matplotlib :python中的图形库，常用简称plt
# %matplotlib inline：IPython魔法函数（Magic Functions），可以内嵌绘图，并且可以省略掉plt.show()这一步

#约4行代码
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

### 读取数据

In [2]:
#使用pandas的读取csv文件的函数 pd.read_csv(),生成数据的dataframe
#数据文件名：carrier_linearRegeression.csv
#编码格式：utf8
#分隔符： 逗号
#约1行代码
data_pd = pd.read_csv('carrier_linearRegeression.csv',encoding='utf8')

#显示前5行
#约1行代码
data_pd.head()

Unnamed: 0,USER_ID,SERVICE,SERVICESET_FLUX,ONLINE_FLUX_THISMONTH,ONLINE_FLUX_LASTMONTH,ONLINE_FLUX_TWOMONTHSAGO,ONLINE_DURATION_THISMONTH,ONLINE_DURATION_LASTMONTH,ONLINE_DURATION_TWOMONTHSAGO,CALL_DURATION_THISMONTH,CALL_DURATION_LASTMONTH,CALL_DURATION_TWOMONTHSAGO,NET_DURATION,LAST_RECHARGE_VALUE,TOTAL_RECHARGE_VALUE,TOTAL_RECHARGE_COUNT,TOTAL_OWE_COUNT,BALANCED,ONLINE_FLUX_NEXTMONTH
0,78002,2G,700,298.0,441.0,485.0,151.0,74.0,151.0,102.0,100.0,127.0,206.0,58.0,383.0,4.0,1.240987,55.0,1254.0
1,67208,2G,500,521.0,382.0,386.0,128.0,122.0,122.0,68.0,91.0,91.0,189.0,1.0,488.0,12.0,-0.826767,71.0,1121.0
2,54739,3G,400,504.0,721.0,427.0,64.0,147.0,104.0,88.0,75.0,115.0,195.0,67.0,474.0,6.0,0.086672,55.0,1132.0
3,60265,4G,400,368.0,404.0,354.0,106.0,129.0,118.0,81.0,129.0,70.0,166.0,83.0,575.0,12.0,1.64344,41.0,971.0
4,33085,3G,500,471.0,645.0,521.0,143.0,124.0,94.0,105.0,71.0,113.0,191.0,69.0,585.0,10.0,-2.131418,0.0,1262.0


In [3]:
# 引入Python中pyspark工作模块
import pyspark
from pyspark import SparkContext, SparkConf

#创建SparkSession对象，spark API的起始点
from pyspark.sql import SparkSession

In [4]:
conf=SparkConf().setAppName("Lab2_Spark_DecisionTreeRegression_Carrier").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#创建SparkSession
spark = SparkSession\
    .builder.getOrCreate()\

In [5]:
data_df = spark.createDataFrame(data_pd)
print(data_df)
data_df.show(2)

DataFrame[USER_ID: bigint, SERVICE: string, SERVICESET_FLUX: bigint, ONLINE_FLUX_THISMONTH: double, ONLINE_FLUX_LASTMONTH: double, ONLINE_FLUX_TWOMONTHSAGO: double, ONLINE_DURATION_THISMONTH: double, ONLINE_DURATION_LASTMONTH: double, ONLINE_DURATION_TWOMONTHSAGO: double, CALL_DURATION_THISMONTH: double, CALL_DURATION_LASTMONTH: double, CALL_DURATION_TWOMONTHSAGO: double, NET_DURATION: double, LAST_RECHARGE_VALUE: double, TOTAL_RECHARGE_VALUE: double, TOTAL_RECHARGE_COUNT: double, TOTAL_OWE_COUNT: double, BALANCED: double, ONLINE_FLUX_NEXTMONTH: double]
+-------+-------+---------------+---------------------+---------------------+------------------------+-------------------------+-------------------------+----------------------------+-----------------------+-----------------------+--------------------------+------------+-------------------+--------------------+--------------------+-------------------+--------+---------------------+
|USER_ID|SERVICE|SERVICESET_FLUX|ONLINE_FLUX_THISMONTH|

### 数据探索

In [6]:
print(data_df.count())
print(data_df.columns)

10000
['USER_ID', 'SERVICE', 'SERVICESET_FLUX', 'ONLINE_FLUX_THISMONTH', 'ONLINE_FLUX_LASTMONTH', 'ONLINE_FLUX_TWOMONTHSAGO', 'ONLINE_DURATION_THISMONTH', 'ONLINE_DURATION_LASTMONTH', 'ONLINE_DURATION_TWOMONTHSAGO', 'CALL_DURATION_THISMONTH', 'CALL_DURATION_LASTMONTH', 'CALL_DURATION_TWOMONTHSAGO', 'NET_DURATION', 'LAST_RECHARGE_VALUE', 'TOTAL_RECHARGE_VALUE', 'TOTAL_RECHARGE_COUNT', 'TOTAL_OWE_COUNT', 'BALANCED', 'ONLINE_FLUX_NEXTMONTH']


In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

### 类别特征编码
将SERVICE类型变量转化为0-1编码的格式，从而可以进行数值计算

In [34]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
stringIndexer = StringIndexer(inputCol="SERVICE", outputCol="categoryIndex")
model = stringIndexer.fit(data_df)
indexed = model.transform(data_df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="SERVICE_categoryVec")
encoded_df = encoder.transform(indexed)
encoded_df.show(2)

+-------+-------+---------------+---------------------+---------------------+------------------------+-------------------------+-------------------------+----------------------------+-----------------------+-----------------------+--------------------------+------------+-------------------+--------------------+--------------------+-------------------+--------+---------------------+-------------+-------------------+
|USER_ID|SERVICE|SERVICESET_FLUX|ONLINE_FLUX_THISMONTH|ONLINE_FLUX_LASTMONTH|ONLINE_FLUX_TWOMONTHSAGO|ONLINE_DURATION_THISMONTH|ONLINE_DURATION_LASTMONTH|ONLINE_DURATION_TWOMONTHSAGO|CALL_DURATION_THISMONTH|CALL_DURATION_LASTMONTH|CALL_DURATION_TWOMONTHSAGO|NET_DURATION|LAST_RECHARGE_VALUE|TOTAL_RECHARGE_VALUE|TOTAL_RECHARGE_COUNT|    TOTAL_OWE_COUNT|BALANCED|ONLINE_FLUX_NEXTMONTH|categoryIndex|SERVICE_categoryVec|
+-------+-------+---------------+---------------------+---------------------+------------------------+-------------------------+-------------------------+--------

### 选取训练所需特征字段

In [44]:
columns = encoded_df.columns
print(columns)
train_columns = columns[2:-2]
train_columns.append(columns[-1])
print(feature_columns)
encoded_selected_df  = encoded_df.select(train_columns)

['USER_ID', 'SERVICE', 'SERVICESET_FLUX', 'ONLINE_FLUX_THISMONTH', 'ONLINE_FLUX_LASTMONTH', 'ONLINE_FLUX_TWOMONTHSAGO', 'ONLINE_DURATION_THISMONTH', 'ONLINE_DURATION_LASTMONTH', 'ONLINE_DURATION_TWOMONTHSAGO', 'CALL_DURATION_THISMONTH', 'CALL_DURATION_LASTMONTH', 'CALL_DURATION_TWOMONTHSAGO', 'NET_DURATION', 'LAST_RECHARGE_VALUE', 'TOTAL_RECHARGE_VALUE', 'TOTAL_RECHARGE_COUNT', 'TOTAL_OWE_COUNT', 'BALANCED', 'ONLINE_FLUX_NEXTMONTH', 'categoryIndex', 'SERVICE_categoryVec']
['SERVICESET_FLUX', 'ONLINE_FLUX_THISMONTH', 'ONLINE_FLUX_LASTMONTH', 'ONLINE_FLUX_TWOMONTHSAGO', 'ONLINE_DURATION_THISMONTH', 'ONLINE_DURATION_LASTMONTH', 'ONLINE_DURATION_TWOMONTHSAGO', 'CALL_DURATION_THISMONTH', 'CALL_DURATION_LASTMONTH', 'CALL_DURATION_TWOMONTHSAGO', 'NET_DURATION', 'LAST_RECHARGE_VALUE', 'TOTAL_RECHARGE_VALUE', 'TOTAL_RECHARGE_COUNT', 'TOTAL_OWE_COUNT', 'BALANCED', 'ONLINE_FLUX_NEXTMONTH', 'SERVICE_categoryVec']


### 转换训练矩阵

In [49]:
from pyspark.ml.feature import RFormula
rf = RFormula(formula="ONLINE_FLUX_NEXTMONTH ~.")
model = rf.fit(encoded_selected_df)
transformed_df = model.transform(encoded_selected_df)

### 训练模型

In [50]:
# Split the data into training and test sets (30% held out for testing)
(training_Data, test_Data) = transformed_df.randomSplit([0.8, 0.2])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol='features',labelCol= 'ONLINE_FLUX_NEXTMONTH')

In [51]:
# # Chain indexer and tree in a Pipeline
# pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = dt.fit(training_Data)

# Make predictions.
predictions = model.transform(test_Data)


In [52]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)


+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
|1159.7012987012988|1139.0|[200.0,550.0,305....|
|1062.0442477876106| 795.0|[300.0,166.0,585....|
|1062.0442477876106| 847.0|[300.0,290.0,555....|
| 981.1578947368421| 970.0|[300.0,384.0,298....|
|1000.1137931034483|1058.0|[300.0,398.0,533....|
+------------------+------+--------------------+
only showing top 5 rows



## 模型评估

In [53]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 108.003


In [56]:
model

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_41f08d732d69ec0c9113) of depth 5 with 63 nodes