# Sparkify

## 概述
Sparkify是一家数字音乐服务的公司，大量用户每天使用该服务听自己喜欢的歌曲，包括游客，免费用户，付费用户。这个notebook探索了和用户流失有相关性的特征，并构建了通过用户的行为数据预测用户是否会流失的模型，最后列出了会影响用户是否会流失的重要特征。

## 数据
用户每次和Sparkify服务互动都会产生用户行为数据，例如听歌曲，访问页面，添加好友，点赞等。完整的数据是12G，此notebook使用数据的一个子集(128M)，在非集群环境运行。

In [1]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import udf, expr, col, isnan, max, avg, min, count, countDistinct, sqrt, lit, when
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import sum as fsum
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression, GBTClassifier, LinearSVC, RandomForestClassifier
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix
import pandas as pd

from pyspark.sql.types import IntegerType
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sb

import datetime
from time import time

base_color = sb.color_palette()[0]
sb.set_style("whitegrid")

%matplotlib inline

In [2]:
# create a Spark session
spark = SparkSession.builder.appName("Sparkify").getOrCreate()

## 加载和清洗数据

In [3]:
df = spark.read.json("mini_sparkify_event_data.json")
df.printSchema()
print('数据集有{}行，{}列。'.format(df.count(), len(df.columns)))

AnalysisException: 'Path does not exist: file:/Users/hanyang/Downloads/spark_proj/Sparkify/mini_sparkify_event_data.json;'

In [None]:
# 定义一个按百分比显示null的函数
def show_null_percentage(df):
    '''
    INPUT
    df - spark DataFrame
    dicimal - How many decimal to keep
    
    OUTPUT
    None - print the non_null percentage of each column
    '''
    count = df.count()
    statistics = df.describe().take(1)
    statistics_df = spark.createDataFrame(statistics)
    columns = statistics_df.columns
    kv = zip(columns[1:], (('/{},'.format(count).join(columns[1:]))+'/{}'.format(count)).split(','))
    statistics_df.select([(1-expr(v)).alias(k) for k,v in kv]).show(vertical=True)

In [None]:
# 使用定义的helper function查看缺失值比率
show_null_percentage(df)

上边观察到，artist / length / song的缺失比率一样，firstName / gender / lastName / location / registration / userAgent缺失比率一样，通过给这些行添加索引号，对比索引号发现它们缺失的是相同的行

In [None]:
# 定义一个比较两个列是否一致的函数
def column_consistent(df, kvs, how='inner'):
    '''
    INPUT
    df - spark DataFrame
    kvs - pair of column and value, compare two column's consistency. type:dict
          available dict key: columns in df.columns
          available dict value: 'null' / 'notnull' / 'nan' / 'value|value' / string / !string / number
          eg. {'animal':'human','like':'money'}: all human like money，all like mone are human
    how - inner / outer / left / right
    
    OUTPUT
    return the consistency of two columns.
    
    '''
    ks = list(kvs.keys())
    vs = list(kvs.values())
    df_index = df.select("*").withColumn("index", monotonically_increasing_id())
    switch = {
        "null": lambda *args: df_index[args[0]].isNull(),
        "nan": lambda *args: isnan(df_index[args[0]]),
        'notnull': lambda *args: df_index[args[0]].isNotNull(),
        'isin': lambda *args: df_index[args[0]].isin(args[1].split('|')),
    }
    id_pair = {
                k : df_index\
               .select(col('index').alias('{}={}_index'.format(k,v)))\
               .where((switch.get('isin' if '|' in v else v, lambda *args:df_index[args[0]]!=args[1].strip('!') if '!' in args[1] else df_index[args[0]]==args[1]))(k,v)) \
               for k,v in kvs.items()
              }
    result = id_pair[ks[0]].join(id_pair[ks[1]], id_pair[ks[0]]['{}={}_index'.format(ks[0],vs[0])]==id_pair[ks[1]]['{}={}_index'.format(ks[1],vs[1])], 'outer')
    if result.where(result[result.columns[0]].isNull() | result[result.columns[1]].isNull()).count() == 0:
        print('if "{}={}", then "{}={}". if "{}={}", then "{}={}".'.format(ks[0], vs[0], ks[1], vs[1], ks[1], vs[1], ks[0],vs[0]))
    if how == 'inner':
        return result.where(result[result.columns[0]].isNotNull() & result[result.columns[1]].isNotNull())
    if how == 'outer':
        return result
    if how == 'left':
        return result.where(result[result.columns[0]].isNotNull())
    if how == 'right':
        return result.where(result[result.columns[1]].isNotNull())

In [None]:
# artist缺失，length缺失，song缺失的行是相同
column_consistent(df, {'artist':'null', 'length':'null'})
column_consistent(df, {'length':'null', 'song':'null'})

In [None]:
# firstname缺失，gender缺失，lastname缺失，location缺失，registration缺失，userAgent缺失的行是相同的
column_consistent(df, {'firstname':'null', 'gender':'null'})
column_consistent(df, {'gender':'null', 'lastname':'null'})
column_consistent(df, {'lastname':'null', 'location':'null'})
column_consistent(df, {'location':'null', 'registration':'null'})
column_consistent(df, {'registration':'null', 'userAgent':'null'})

page=Cancellation Confirmation的行 和 auth=Cancelled的行是相同的

In [None]:
column_consistent(df, {'page':'Cancellation Confirmation', 'auth':'Cancelled'})

详细看一下artist和length和song有缺失值的行，发现这些用户处在没有听任何歌曲的状态(song=None)

In [None]:
df.where((df['artist'].isNull())).take(5)

详细看下firstName和gender和lastName和location和registration和userAgen有缺失值的行，发现这些是没有注册的游客(registration=None)

In [None]:
df.where((df['firstName'].isNull())).take(5)

userId存在空字符串的行

In [None]:
df.select('userId').dropDuplicates().sort('userId').show(5)

In [None]:
column_consistent(df, {'userId':'', 'registration':'null'})

userId是空字符串的数量与游客数量相等，且是相同的行

In [None]:
print('数据中userId是空字符串的数量是： {}'.format(df.where(df.userId == '').count()))
print('游客的数量是：', df.where(df.registration.isNull()).count())
column_consistent(df, {'userId':'','registration':'null'})

业务理解1：分别抽取几个访问过Cancellation Confirmation页面的付费和免费用户(游客不会访问Cancellation Confirmation页面)，观察他们访问Cancellation Confirmation页面之后的行为，发现在访问Cancellation Confirmation之后，就没有任何活动了，可以理解为用户注销了，或者用户churn。

In [None]:
# 筛选出访问过Cancellation Confirmation的paid和free用户
df.select('userId','level').where(df.level=='paid').where(df.page=='Cancellation Confirmation').show(5)
df.select('userId','level').where(df.level=='free').where(df.page=='Cancellation Confirmation').show(5)

In [None]:
# 查看paid用户userId=18最后的活动
df.select('level','page').where(df.userId=='18').collect()[-5:]

In [None]:
# 查看paid用户userId=32最后的活动
df.select('level','page').where(df.userId=='32').collect()[-5:]

In [None]:
# 查看free用户userId=125最后的活动
df.select('level','page').where(df.userId=='125').collect()[-5:]

In [None]:
# 查看free用户userId=143最后的活动
df.select('level','page').where(df.userId=='143').collect()[-5:]

业务理解2：游客的level也可以是paid，可能是游客购买了单曲，被标识为了paid用户

In [None]:
# 查看level是paid，用户名是空字符串的记录
df.select('userId','level','registration').where(df.userId=='').where(df.level=='paid').show(5)

由于游客永远不会访问Cancellation Confirmation，对于预测churn来说没有意义，将游客的数据删除

In [None]:
# 删除registration是空的行，即删除游客的记录
df_valid = df.dropna(subset='registration')
df_valid.select('userId').dropDuplicates().sort('userId').show(5)

In [None]:
print('清洗后数据集有{}行，{}列'.format(df_valid.count(), len(df_valid.columns)))

## 探索数据

### 定义churn

In [None]:
# 添加新列churn_page，如果page=Cancellation Confirmation，churn_page=1
transform_churn = udf(lambda x:1 if x=='Cancellation Confirmation' else 0, IntegerType())
df_valid = df_valid.withColumn('churn_page', transform_churn('page'))
# 按用户分组，根据churn_page得到用户是否流失(churn列)
windowval = Window.partitionBy('userId')
df_valid = df_valid.withColumn('churn', max('churn_page').over(windowval))

#### churn的分布

In [None]:
label_df = df_valid.select('userId',col('churn').alias('label'))\
    .dropDuplicates(subset=['userId']).toPandas()

In [None]:
sb.countplot(x='label',data=label_df);
plt.title('most user do not churn');

可以看到没有churn的客户远多于churn的客户，label是偏态分布的，这可能会影响预测结果

### 探索和churn相关的特征
通过从数据中提取用户的特征，观察用户特征和churn的关系

#### churn 和 level

In [None]:
churn_per_level_df = df_valid.where(df_valid['page']=='Cancellation Confirmation').groupBy('level').count().toPandas()
plt.bar(churn_per_level_df['level'], churn_per_level_df['count']);
plt.ylabel('churn count');
plt.title('Paid users are more likely to churn');

#### churn 和 gender

In [None]:
# 计算男女的churn比率，保存在churn_ratio列
w = Window.partitionBy('gender')
gender_churn_ratio_df = df_valid.groupBy('churn','gender').count()\
    .withColumn('total',fsum('count').over(w))\
    .select('gender',(col('count')/col('total')).alias('churn_ratio'))\
    .where(df_valid.churn==1)\
    .toPandas()

In [None]:
bar = plt.bar(gender_churn_ratio_df['gender'],gender_churn_ratio_df['churn_ratio'])
for p in bar.patches:
    plt.annotate(format(p.get_height(), '.2f'), (p.get_x() + p.get_width() / 2., p.get_height()),
                 ha = 'center', va = 'top', xytext = (0, 10), textcoords = 'offset points')
plt.ylabel('churn_ratio');
plt.title('Male users are more likely to churn');

#### churn 和 用户注册时长

In [None]:
hours_since_reg_df = df_valid.select(((col('ts') - col('registration'))/1000/60/60).alias('hours_since_reg'),'churn','userId')\
    .groupBy('churn','userId').agg(max('hours_since_reg'))\
    .withColumnRenamed('max(hours_since_reg)','hours_since_reg')\
    .toPandas()

In [None]:
sb.boxplot('hours_since_reg', 'churn', data=hours_since_reg_df, orient="h");
plt.title('users with short hours_since_reg are more likely to churn');

In [None]:
sb.distplot(hours_since_reg_df['hours_since_reg']);
plt.title('hours_since_reg is Normal distribution');

#### churn 和 用户会话会中song数量的均值

In [None]:
avg_song_count_df = df_valid.where(df_valid['song'] != '')\
    .groupBy('userId','sessionId','churn')\
    .agg(count('song').alias('song_count'))\
    .groupBy('userId','churn')\
    .agg(avg('song_count').alias('avg_song_count'))\
    .select('churn','avg_song_count').toPandas()

In [None]:
sb.boxplot('avg_song_count','churn',data=avg_song_count_df, orient="h");
plt.title('users with less avg_song_count are more likely to churn');

In [None]:
sb.distplot(avg_song_count_df['avg_song_count']);
plt.title('avg_song_count is Normal distribution');

#### churn 和 用户的会话数量

In [None]:
session_per_user_df = df_valid.groupBy('userId','churn')\
    .agg(countDistinct('sessionId').alias('unique_session'))\
    .withColumn('unique_session_sqrt',sqrt('unique_session'))\
    .toPandas()

In [None]:
sb.boxplot('unique_session','churn',data=session_per_user_df, orient="h");
plt.title('users with less unique_session are more likely to churn');

In [None]:
session_per_user_df.info()

In [None]:
sb.distplot(session_per_user_df['unique_session']);
plt.title('unique_session is Skewed');

会话数量是偏态分布的，这里进行开平方处理，处理后近似正态分布。

In [None]:
sb.distplot(session_per_user_df['unique_session_sqrt']);
plt.title('after sqrt unique_session is Normal distribution');

#### churn 和 用户将歌曲添加到播放列表的数量

In [None]:
play_list_df = df_valid.where(df_valid.page=='Add to Playlist')\
    .groupBy('userId','churn').agg(count('userId').alias('num_playlist'))\
    .withColumn('num_playlist_sqrt',sqrt('num_playlist'))\
    .toPandas()

In [None]:
sb.boxplot('num_playlist','churn',data=play_list_df, orient="h");
plt.title('users with less num_playlist are more likely to churn');

In [None]:
sb.distplot(play_list_df['num_playlist']);
plt.title('num_playlist is Skewed');

添加到播放列表的数量是偏态分布的，这里进行开平方处理，处理后近似正态分布。

In [None]:
sb.distplot(play_list_df['num_playlist_sqrt']);

#### churn 和 用户添加好友数量

In [None]:
add_friend_df = df_valid.where(df_valid.page=='Add Friend')\
    .groupBy('userId','churn').agg(count('userId').alias('num_friend'))\
     .withColumn('num_friend_sqrt',sqrt('num_friend'))\
    .toPandas()

In [None]:
sb.boxplot('num_friend','churn',data=add_friend_df, orient="h");
plt.title('users with less num_friend are more likely to churn');

In [None]:
sb.distplot(add_friend_df['num_friend']);
plt.title('num_friend is Skewed');

好友数量是偏态分布的，这里进行开平方处理，处理后近似正态分布。

In [None]:
sb.distplot(np.sqrt(add_friend_df['num_friend_sqrt']));

#### churn 和 没有听歌曲的比率

In [None]:
row_count = df_valid.groupBy('userId','churn').agg(count('length').alias('row_count'))
length_count = df_valid.groupBy('userId').count()
null_ratio_df = row_count.join(length_count,row_count.userId==length_count.userId,'inner')\
    .withColumn('null_ratio',(1-(col('row_count')/col('count'))))\
    .select('churn','null_ratio').toPandas()

In [None]:
sb.boxplot('null_ratio','churn', data=null_ratio_df, orient="h");
plt.title('users with more num_friend are more likely to churn');

In [None]:
sb.distplot(null_ratio_df['null_ratio']);

#### churn 和 最大itemInSession

itemInSession 与 sessionId有密切关系，itemInSession表示每个sessionId中有多少个item，通过观察某个sessionId的数据，发现itemInSession是递增的，这应该是统计的日志写入那一时刻session中的item数量，单看此列没有意义，这里计算会话中item的实际数量的最大值，观察该最大值与churn的关系。

In [None]:
max_itemInSession_df = df_valid\
    .groupBy('churn','userId','sessionId')\
    .agg(max('itemInSession').alias('totalItemInSession'))\
    .groupBy('churn','userId')\
    .agg(max('totalItemInSession').alias('maxItemInSession'))\
    .withColumn('maxItemInSession_sqrt',sqrt('maxItemInSession'))\
    .toPandas()

In [None]:
sb.boxplot('maxItemInSession','churn',data=max_itemInSession_df, orient="h");
plt.title('users with less maxItemInSession are more likely to churn');

In [None]:
sb.distplot(max_itemInSession_df['maxItemInSession']);
plt.title('maxItemInSession is Skewed');

该特征是偏态分布的，这里进行开平方处理，处理后近似正态分布。

In [None]:
sb.distplot(max_itemInSession_df['maxItemInSession_sqrt']);

## 特征工程
选取了探索过的9个特征作为算法的输入，将特征和标签保存在df_features中

In [None]:
# 用户的level
f1 = df_valid.select('userId','level').dropDuplicates(subset=['userId'])

In [None]:
# 用户的性别
f2 = df_valid.select('userId','gender').dropDuplicates(subset=['userId'])

In [None]:
# 用户注册时长
f3 = df_valid.select(((col('ts') - col('registration'))/1000/60/60).alias('hours_since_reg'),'userId')\
    .groupBy('userId').agg(max('hours_since_reg'))\
    .withColumnRenamed('max(hours_since_reg)','hours_since_reg')

In [None]:
# 用户所有会话听的歌曲数量均值
f4 = avg_song_count_df = df_valid.where(df_valid['song'] != '')\
    .groupBy('userId','sessionId')\
    .agg(count('song').alias('song_count'))\
    .groupBy('userId')\
    .agg(avg('song_count').alias('avg_song_count'))

In [None]:
# 用户的会话数量
f5 = df_valid.groupBy('userId')\
    .agg(countDistinct('sessionId').alias('unique_session_num'))\
    .withColumn('unique_session_num_sqrt',sqrt('unique_session_num'))

In [None]:
# 用户将歌曲添加到播放列表的数量
f6 = df_valid.where(df_valid.page=='Add to Playlist')\
    .groupBy('userId').agg(count('page').alias('add_playlist_num'))\
    .withColumn('num_playlist_sqrt',sqrt('add_playlist_num'))

In [None]:
# 用户添加好友数量
f7 = df_valid.where(df_valid.page=='Add Friend')\
    .groupBy('userId').agg(count('page').alias('add_friend_num'))\
    .withColumn('add_friend_num_sqrt',sqrt('add_friend_num'))

In [None]:
# 没有听歌曲的比率
row_count = df_valid.groupBy('userId').agg(count('length').alias('row_count'))
length_count = df_valid.groupBy('userId').count()
f8 = row_count.join(length_count, 'userId','inner')\
    .withColumn('null_ratio',(1-(col('row_count')/col('count'))))\
    .select('userId','null_ratio')

In [None]:
# 最大itemInSession
f9 = df_valid\
    .groupBy('userId','sessionId')\
    .agg(max('itemInSession').alias('totalItemInSession'))\
    .groupBy('userId').agg(max('totalItemInSession').alias('maxItemInSession'))\
    .withColumn('maxItemInSession_sqrt',sqrt('maxItemInSession'))

In [None]:
label = df_valid.select('userId',col('churn').alias('label')).dropDuplicates(subset=['userId'])

In [None]:
df_features = f1.join(f2,'userId','outer')\
    .join(f3,'userId','outer')\
    .join(f4,'userId','outer')\
    .join(f5,'userId','outer')\
    .join(f6,'userId','outer')\
    .join(f7,'userId','outer')\
    .join(f8,'userId','outer')\
    .join(f9,'userId','outer')\
    .join(label,'userId','outer')\
    .fillna(0).drop('userId').persist()
df_features.show(n=1,vertical=True)
print('样本数量：{}'.format(df_features.count()))

# 建模
此部分将数据集分成训练集、测试集和验证集，使用F1 score 评估了四个算法：逻辑回归，GBT，支持向量机，随机森林。使用k折交叉验证选出表现最好的算法，然后使用网格搜索调整超参数训练出在训练集上表现最好的模型，最后用该模型在测试集上进行验证。

### 评估指标
使用F1 score作为评估指标，我们不希望出现预测是churn而实际不是的情况(这样可能会浪费资源)，也不希望预测不是churn而实际是churn的情况(这样可能会丢失用户)，所以使用F1 socre。由于标签中的churn数量远大于非churn的数量，模型的预测结果会出很多Fasle Negtaive(预测为0实际为1)的错误，使用F1 score可以让评估指标捕捉到这些错误。同时也使用Accuracy作为参考。

### 划分数据集
60%作为训练集，20%作为验证集，20%作为测试集

In [None]:
train, rest = df_features.randomSplit([0.6, 0.4], seed=42)
test, validation = rest.randomSplit([0.5, 0.5], seed=42)

### 基准模型
将训练集中的数据全部预测为0，或者全部预测为1，作为基准模型

In [None]:
evaluator = MulticlassClassificationEvaluator()

In [None]:
results_1 = test.withColumn('prediction', lit(1.0))
print('Accuracy: {}'.format(evaluator.evaluate(results_1, {evaluator.metricName: "accuracy"})))
print('F1 Score:{}'.format(evaluator.evaluate(results_1, {evaluator.metricName: "f1"})))

In [None]:
results_0 = test.withColumn('prediction', lit(0.0))
print('Accuracy: {}'.format(evaluator.evaluate(results_0, {evaluator.metricName: "accuracy"})))
print('F1 Score:{}'.format(evaluator.evaluate(results_0, {evaluator.metricName: "f1"})))

预测全部都是0的基准模型在测试集上也有较好表现

### 模型选择
使用了pipeline (pipeline包含特征预处理)。使用交叉验证评估了四个模型，选择在验证集上表现最好的模型。

In [None]:
# 定义使用的feature列表
features = ['avg_song_count','unique_session_num','add_playlist_num','add_friend_num','maxItemInSession',
            'null_ratio','hours_since_reg','gender_index','level_index']
features_sqrt = ['avg_song_count','unique_session_num_sqrt','num_playlist_sqrt','add_friend_num_sqrt',
            'maxItemInSession_sqrt','null_ratio','hours_since_reg','gender_index','level_index']

# 使用StringIndexer处理分类变量
indexer_gender = StringIndexer(inputCol="gender", outputCol="gender_index")
indexer_level = StringIndexer(inputCol="level", outputCol="level_index")
# 创建特征向量
assembler = VectorAssembler(inputCols=features, outputCol="NumFeatures")
assembler_sqrt = VectorAssembler(inputCols=features_sqrt, outputCol="NumFeatures_sqrt")
# 使用StandardScaler缩放特征
scaler_ss = StandardScaler(inputCol="NumFeatures", outputCol="ScaledNumFeatures_ss", withStd=True, withMean=False)
scaler_ss_sqrt = StandardScaler(inputCol="NumFeatures_sqrt", outputCol="ScaledNumFeatures_ss_sqrt", withStd=True, withMean=False)
# 定义评估器
evaluator = MulticlassClassificationEvaluator()
# 定义网格搜索参数
paramGrid = ParamGridBuilder().build()

#### LogisticRegression

In [None]:
# 初始化逻辑回归
lr = LogisticRegression(maxIter=10,featuresCol='ScaledNumFeatures_ss', labelCol='label')
# 定义逻辑回归的pipeline
pipeline_lr = Pipeline(stages=[indexer_gender, indexer_level, assembler, scaler_ss, lr])
s = time()
Model_lr = pipeline_lr.fit(train)
# 使用逻辑回归pipeline拟合训练数据
e = time()
print('time used:',e-s)

In [None]:
lr_result = Model_lr.transform(validation)
print('f1 score:',evaluator.evaluate(lr_result, {evaluator.metricName: "f1"}))
print('accuracy:',evaluator.evaluate(lr_result, {evaluator.metricName: "accuracy"}))

由于特征中有偏态数据，偏态特征违反了逻辑回归的假设，这里逻辑回归使用了开平方后的特征又拟合了一遍，但是accuracy和f1 score没有提升。后面算法均不使用原始特征(不使用开方后的特征)

In [None]:
# 初始化逻辑回归
lr = LogisticRegression(maxIter=10,featuresCol='ScaledNumFeatures_ss_sqrt', labelCol='label')
# 定义逻辑回归的pipeline
pipeline_lr = Pipeline(stages=[indexer_gender, indexer_level, assembler_sqrt, scaler_ss_sqrt, lr])
s = time()
Model_lr_sqrt = pipeline_lr.fit(train)
# 使用逻辑回归pipeline拟合训练数据
e = time()
print('time used:',e-s)

In [None]:
lr_result_sqrt = Model_lr_sqrt.transform(validation)
print('f1 score:',evaluator.evaluate(lr_result_sqrt, {evaluator.metricName: "f1"}))
print('accuracy:',evaluator.evaluate(lr_result_sqrt, {evaluator.metricName: "accuracy"}))

#### GBDT

In [None]:
# 初始化GBT
gbt = GBTClassifier(maxIter=10,featuresCol='ScaledNumFeatures_ss',labelCol='label', seed=42)
# 定义GBT的pipeline
pipeline_gbt = Pipeline(stages=[indexer_gender, indexer_level, assembler, scaler_ss, gbt])
s = time()
# 使用GBT的ipeline拟合训练数据
Model_gbt = pipeline_gbt.fit(train)
e = time()
print('time used:',e-s)

In [None]:
gbt_result = Model_gbt.transform(validation)
print('f1 score:',evaluator.evaluate(gbt_result, {evaluator.metricName: "f1"}))
print('accuracy:',evaluator.evaluate(gbt_result, {evaluator.metricName: "accuracy"}))

#### Support Vector Machine

In [None]:
# 初始化支持向量机
svm = LinearSVC(maxIter=10,featuresCol='ScaledNumFeatures_ss',labelCol='label')
# 定义SVM的pipeline
pipeline_svm = Pipeline(stages=[indexer_gender, indexer_level, assembler, scaler_ss, svm])
s = time()
# 使用SVM的pipeline拟合训练数据
Model_svm = pipeline_svm.fit(train)
e = time()
print('time used:',e-s)

In [None]:
svm_result = Model_svm.transform(validation)
print('f1 score:',evaluator.evaluate(svm_result, {evaluator.metricName: "f1"}))
print('accuracy:',evaluator.evaluate(svm_result, {evaluator.metricName: "accuracy"}))

#### Random Forest

In [None]:
# 初始化随机森林
rf = RandomForestClassifier(featuresCol='ScaledNumFeatures_ss',labelCol='label', seed=42)
# 定义随机森林的pipeline
pipeline_rf = Pipeline(stages=[indexer_gender, indexer_level, assembler, scaler_ss, rf])
s = time()
# 使用随机森林pipeline拟合训练数据
Model_rf = pipeline_rf.fit(train)
e = time()
print('time used:',e-s)

In [None]:
rf_result = Model_rf.transform(validation)
print('f1:',evaluator.evaluate(rf_result, {evaluator.metricName: "f1"}))
print('accuracy:',evaluator.evaluate(rf_result, {evaluator.metricName: "accuracy"}))

默认参数的模型在验证集上的表现：

逻辑回归：
- f1 score: 0.7678967239069541
- accuracy: 0.8088235294117647

GBT：
- f1 score: 0.7116493656286045
- accuracy: 0.7058823529411765

支持向量机：
- f1 score: 0.6627450980392157
- accuracy: 0.7647058823529411

随机森林：

- f1: 0.8141923436041085
- accuracy: 0.8235294117647058

**可以看到随机森林相比其他三个有更高的F1得分，所以下一步用随机森林进行参数调整。**

### 调整模型参数
在随机森林上算法上使用网格搜索进行超参数选择

In [None]:
rf = RandomForestClassifier(featuresCol='ScaledNumFeatures_ss',labelCol='label', seed=42)
paramGrid_tune = ParamGridBuilder() \
    .addGrid(rf.numTrees, [4,6,8]) \
    .addGrid(rf.maxDepth, [4,6,8]) \
    .build()
evaluator = MulticlassClassificationEvaluator()
pipeline_rf = Pipeline(stages=[indexer_gender, indexer_level, assembler, scaler_ss, rf])
crossval = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid_tune,
                          evaluator=evaluator,
                          numFolds=3)
s = time()
cvModel_rf_tune = crossval.fit(train)
e = time()
print('time used:',e-s)
print('cv average f1_score',cvModel_rf_tune.avgMetrics)

In [None]:
best_rf_result_tune = cvModel_rf_tune.bestModel.transform(test)
print('f1:',evaluator.evaluate(best_rf_result_tune, {evaluator.metricName: "f1"}))
print('accuracy:',evaluator.evaluate(best_rf_result_tune, {evaluator.metricName: "accuracy"}))

In [None]:
best_rf_result_tune_df = best_rf_result_tune.toPandas()
confusion_matrix(best_rf_result_tune_df['label'], best_rf_result_tune_df['prediction'], labels=[1,0])

调整后的模型在测试机上的F1 score是0.784，accuracy是0.8，在测试集的15个样本中，有2个真实是1被预测为0，有1个真实是0被预测为1。最终结果的F1 socre比基准模型(F1 score:0.71)有提高，但是提高并不多。

## 特征重要性

In [None]:
best_model = cvModel_rf_tune.bestModel.stages[-1]
importance = best_model.featureImportances.values
feature_index = best_model.featureImportances.indices
features = pd.Series(['avg_song_count','unique_session_num','add_playlist_num','add_friend_num','maxItemInSession',
            'null_ratio','hours_since_reg','gender_index','level_index'])
plt.barh(list(features[feature_index]) ,importance);
plt.title('RandomForest feature importance ')
plt.xlabel('importance');

随机森林的feature_importance显示hour_since_reg对于用户是否会churn有着很大的影响，这是合理的，因为注册的时间长的用户代表用户在很长时间内都没有注销其账号，可能是经常使用，这样的用户更不容易注销账号，但也可能是很久以前注册过一直没用，也没有注销。

## 结论

这个notebook在128M的数据上训练了用于预测用户是否会churn的模型，评估了四个模型：逻辑回归，GBT，支持向量机，随机森林，通过对比F1 socre选择了随机森林并在其上调整参数，使用最优模型在测试集上得到的F1 score 是0.784，准确率是0.8，但是测试集只有15个样本，所以在测试集上的到的得分有较大偏差。最后列出随机森林给出的对于用户是否会churn的最重要特征是用户注册了多长时间(hour_since_reg)。此notebook使用的是数据的子集，这样才能在单台机器上运行，如果用完整数据(12G)，对于现有机器性能已经是大数据，需要在集群环境运行。数据有只225个用户样本，所以模型实际意义并不大，如果有更多数据，可以将本notebook拓展到spark环境，将对模型表现有很大程度的提升。