**案例描述**

本案例为《Spark大数据分析平台》课程第六章分类模型配套案例。数据来自手机APP"Kalboard 360"的学习管理系统（LMS）。Kalboard 360旨在利用尖端技术来提升学校K-12教育的教育水平。数据集由480个学生记录和16个特征组成。这些特征分为三大类：

（1）性别和国籍等人口统计特征。

（2）学历背景特征，如教育阶段，年级和隶属教室。

（3）行为特征，如上课举手，访问资源，家长回答问卷调查，学校满意度等。

该数据集的收集来自两个学期：第一学期收集了245个学生记录，第二学期收集了235个学生记录。最后学生依据其总成绩被分为三类：
低：0-69、中：70-89、高：90-100。

我们的任务是根据收集的数据，利用`pyspark.ml`库中的分类算法预测学生的成绩等级。`pyspark.ml`库对DataFrame进行操作，需要把数据转化为DataFrame。

**数据字段及说明：**

| 特征                     	| 说明                                                                                                      	|
|--------------------------	|-----------------------------------------------------------------------------------------------------------	|
| gender                   	| 学生性别（ 'Male' or 'Female’）                                                                           	|
| NationalITy              	| 学生国籍                                                                                                  	|
| PlaceofBirth             	| 学生的出生地                                                                                              	|
| StageID                  	| 受教育水平（‘lowerlevel’,’MiddleSchool’,’HighSchool’）                                                    	|
| GradeID                  	| 年级（‘G-01’, ‘G-02’, ‘G-03’,   ‘G-04’, ‘G-05’, ‘G-06’, ‘G-07’, ‘G-08’, ‘G-09’, ‘G-10’, ‘G-11’, ‘G-12 ‘） 	|
| SectionID                	| 隶属的教室（’A’,’B’,’C’）                                                                                 	|
| Topic                    	| 课程名                                                                                                    	|
| Semester                 	| 学校的学期（’ First’,’ Second’）                                                                          	|
| Relation                 	| 监护学生的家长（’mom’,’father’）                                                                          	|
| raisedhands              	| 学生在教室中举手次数（0-100）                                                                             	|
| VisITedResources         	| 学生访问在线课程次数（0-100）                                                                             	|
| AnnouncementsView        	| 学生检查新公告的次数（0-100）                                                                             	|
| Discussion               	| 学生参加讨论组的次数（0-100）                                                                             	|
| ParentAnsweringSurvey    	| 家长是否回答了学校提供的调查问卷（’Yes’,’No’）                                                            	|
| ParentschoolSatisfaction 	| 家长对学校的满意度（’Yes’,’No’）                                                                          	|
| StudentAbsenceDays       	| 每个学生的缺勤天数（'above-7', 'under-7'）                                                                    	|
| Class                    	| 根据学生的总成绩分为三个等级（低分：0-69，中等分数：70-89，高分：90-100）                              	|

# 目录
[1. 读取数据及探索性分析](#1)<br>
[2. 数据预处理](#2)<br>
[3. LogisticRegression分析](#3)<br>
[4. RandomForestClassifier分析](#4)<br>

<div id=1></div>

# 1. 读取数据及探索性分析

在数据量较少的情况下，可以考虑使用Pandas中的`read_csv`函数读取并查看数据。

In [1]:
path = 'file:/home/spark/Spark/xAPI-Edu-Data.csv'

In [2]:
import pandas as pd 
pandas_data = pd.read_csv(path)
print(pandas_data.shape)
pandas_data.head()


(480, 17)


Unnamed: 0,gender,NationalITy,PlaceofBirth,StageID,GradeID,SectionID,Topic,Semester,Relation,raisedhands,VisITedResources,AnnouncementsView,Discussion,ParentAnsweringSurvey,ParentschoolSatisfaction,StudentAbsenceDays,Class
0,M,KW,KuwaIT,lowerlevel,G-04,A,IT,F,Father,15,16,2,20,Yes,Good,Under-7,M
1,M,KW,KuwaIT,lowerlevel,G-04,A,IT,F,Father,20,20,3,25,Yes,Good,Under-7,M
2,M,KW,KuwaIT,lowerlevel,G-04,A,IT,F,Father,10,7,0,30,No,Bad,Above-7,L
3,M,KW,KuwaIT,lowerlevel,G-04,A,IT,F,Father,30,25,5,35,No,Bad,Above-7,L
4,M,KW,KuwaIT,lowerlevel,G-04,A,IT,F,Father,40,50,12,50,No,Bad,Above-7,M


可以看出数据集共有480行数据，数据维度为17。

接下来利用`dtypes`函数查找出哪些列中的数据不是数字。


In [3]:
str_columns = pandas_data.dtypes[pandas_data.dtypes == 'object'].index
str_columns

Index(['gender', 'NationalITy', 'PlaceofBirth', 'StageID', 'GradeID',
       'SectionID', 'Topic', 'Semester', 'Relation', 'ParentAnsweringSurvey',
       'ParentschoolSatisfaction', 'StudentAbsenceDays', 'Class'],
      dtype='object')

在把数据转化为DataFrame之前，可以通过`textFile`函数把数据转化为RDD并查看数据。

In [4]:
data = sc.textFile(path)
data.top(5)

['gender,NationalITy,PlaceofBirth,StageID,GradeID,SectionID,Topic,Semester,Relation,raisedhands,VisITedResources,AnnouncementsView,Discussion,ParentAnsweringSurvey,ParentschoolSatisfaction,StudentAbsenceDays,Class',
 'M,venzuela,venzuela,HighSchool,G-10,A,IT,F,Mum,80,90,70,80,Yes,Good,Under-7,H',
 'M,lebanon,lebanon,lowerlevel,G-02,B,French,S,Mum,40,51,20,33,No,Bad,Under-7,M',
 'M,lebanon,lebanon,MiddleSchool,G-08,C,Spanish,S,Father,80,51,40,24,No,Good,Under-7,M',
 'M,lebanon,lebanon,MiddleSchool,G-08,C,Spanish,S,Father,77,69,41,13,Yes,Good,Under-7,M']

**加载数据并把数据转化为DataFrame**

指定数据集的`schema`：首先通过`types`函数指定标签及标签类型，然后通过`types.StructType`函数指定`schema`。

In [5]:
import pyspark.sql.types as typ
labels = [
    ('gender', typ.StringType()),
    ('NationalITy', typ.StringType()),
    ('PlaceofBirth', typ.StringType()),
    ('StageID', typ.StringType()),
    ('GradeID', typ.StringType()),
    ('SectionID', typ.StringType()),
    ('Topic', typ.StringType()),
    ('Semester', typ.StringType()),
    ('Relation', typ.StringType()),
    ('raisedhands', typ.IntegerType()),
    ('VisITedResources', typ.IntegerType()),
    ('AnnouncementsView', typ.IntegerType()),
    ('Discussion', typ.IntegerType()),
    ('ParentAnsweringSurvey', typ.StringType()),
    ('ParentschoolSatisfaction', typ.StringType()),
    ('StudentAbsenceDays', typ.StringType()),
    ('Class', typ.StringType()),
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])


在`spark.read.csv`函数中传参：`schema=schema`，读入数据并把数据转化为DataFrame格式。

In [6]:
data = spark.read.csv('file:/home/spark/Spark/xAPI-Edu-Data.csv', 
                        header=True, 
                        schema=schema)
data.show(3)

+------+-----------+------------+----------+-------+---------+-----+--------+--------+-----------+----------------+-----------------+----------+---------------------+------------------------+------------------+-----+
|gender|NationalITy|PlaceofBirth|   StageID|GradeID|SectionID|Topic|Semester|Relation|raisedhands|VisITedResources|AnnouncementsView|Discussion|ParentAnsweringSurvey|ParentschoolSatisfaction|StudentAbsenceDays|Class|
+------+-----------+------------+----------+-------+---------+-----+--------+--------+-----------+----------------+-----------------+----------+---------------------+------------------------+------------------+-----+
|     M|         KW|      KuwaIT|lowerlevel|   G-04|        A|   IT|       F|  Father|         15|              16|                2|        20|                  Yes|                    Good|           Under-7|    M|
|     M|         KW|      KuwaIT|lowerlevel|   G-04|        A|   IT|       F|  Father|         20|              20|                3

利用`columns`函数查看DataFrame的所有列。

In [7]:
cols = data.columns
print(cols)
print(len(cols))

['gender', 'NationalITy', 'PlaceofBirth', 'StageID', 'GradeID', 'SectionID', 'Topic', 'Semester', 'Relation', 'raisedhands', 'VisITedResources', 'AnnouncementsView', 'Discussion', 'ParentAnsweringSurvey', 'ParentschoolSatisfaction', 'StudentAbsenceDays', 'Class']
17


利用`describe`函数对数据data进行描述性统计。描述性统计会展示关于数据集的基本信息：数据集中有多少非缺失的观测数据、列的平均值和标准偏差、还有最小值和最大值。data共有17列，在屏幕上不能很好的展示所有列，分两次展示。

In [8]:
data.describe(cols[:10]).show()

+-------+------+-----------+------------+----------+-------+---------+-------+--------+--------+----------------+
|summary|gender|NationalITy|PlaceofBirth|   StageID|GradeID|SectionID|  Topic|Semester|Relation|     raisedhands|
+-------+------+-----------+------------+----------+-------+---------+-------+--------+--------+----------------+
|  count|   480|        480|         480|       480|    480|      480|    480|     480|     480|             480|
|   mean|  null|       null|        null|      null|   null|     null|   null|    null|    null|          46.775|
| stddev|  null|       null|        null|      null|   null|     null|   null|    null|    null|30.7792225827342|
|    min|     F|      Egypt|       Egypt|HighSchool|   G-02|        A| Arabic|       F|  Father|               0|
|    max|     M|   venzuela|    venzuela|lowerlevel|   G-12|        C|Spanish|       S|     Mum|             100|
+-------+------+-----------+------------+----------+-------+---------+-------+--------+-

In [9]:
data.describe(cols[10:]).show()

+-------+------------------+------------------+------------------+---------------------+------------------------+------------------+-----+
|summary|  VisITedResources| AnnouncementsView|        Discussion|ParentAnsweringSurvey|ParentschoolSatisfaction|StudentAbsenceDays|Class|
+-------+------------------+------------------+------------------+---------------------+------------------------+------------------+-----+
|  count|               480|               480|               480|                  480|                     480|               480|  480|
|   mean|54.797916666666666|          37.91875| 43.28333333333333|                 null|                    null|              null| null|
| stddev| 33.08000669966416|26.611244081903397|27.637735038376366|                 null|                    null|              null| null|
|    min|                 0|                 0|                 1|                   No|                     Bad|           Above-7|    H|
|    max|                99

以上DataFrame中的`null`值是由于这些特征对应的数据是字符串，并不是数字，所以不能统计均值和标准偏差。

为了更好地理解列，接下来利用`groupby`函数计算这些列值的使用频率,以`gender`和`Class`列为例。

In [10]:
data.groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|  175|
|     M|  305|
+------+-----+



可以看到男生要比女生多，但数据集不存在类别分布极端不平衡的情况。

In [11]:
data.groupBy('Class').count().show()

+-----+-----+
|Class|count|
+-----+-----+
|    M|  211|
|    L|  127|
|    H|  142|
+-----+-----+



可以看到虽然成绩中等的学生要比其余两个成绩等级的学生多一些，但数据集不存在类别分布极端不平衡的情况。

<div id=2></div>

# 2. 数据预处理

从数据集的基本信息中可以看到，有些特征的类型是字符型，需要在建模前做一些预处理。

**对特征的类型是字符型的列进行数值编码**

统计模型只能对数值数据做操作，因此我们必须对非数值数据进行编码。

In [12]:
from pyspark.ml.feature import StringIndexer

In [13]:
for col in str_columns:
    indexer = StringIndexer(inputCol=col, outputCol=col+'Index')
    data = indexer.fit(data).transform(data)
for col in str_columns:
    data = data.drop(col)
data.show(2)


+-----------+----------------+-----------------+----------+-----------+----------------+-----------------+------------+------------+--------------+----------+-------------+-------------+--------------------------+-----------------------------+-----------------------+----------+
|raisedhands|VisITedResources|AnnouncementsView|Discussion|genderIndex|NationalITyIndex|PlaceofBirthIndex|StageIDIndex|GradeIDIndex|SectionIDIndex|TopicIndex|SemesterIndex|RelationIndex|ParentAnsweringSurveyIndex|ParentschoolSatisfactionIndex|StudentAbsenceDaysIndex|ClassIndex|
+-----------+----------------+-----------------+----------+-----------+----------------+-----------------+------------+------------+--------------+----------+-------------+-------------+--------------------------+-----------------------------+-----------------------+----------+
|         15|              16|                2|        20|        0.0|             0.0|              0.0|         1.0|         3.0|           0.0|       0.0|     

利用`columns`函数得出数值编码后数据data的所有列名。

In [14]:
columns = data.columns
print(columns)

['raisedhands', 'VisITedResources', 'AnnouncementsView', 'Discussion', 'genderIndex', 'NationalITyIndex', 'PlaceofBirthIndex', 'StageIDIndex', 'GradeIDIndex', 'SectionIDIndex', 'TopicIndex', 'SemesterIndex', 'RelationIndex', 'ParentAnsweringSurveyIndex', 'ParentschoolSatisfactionIndex', 'StudentAbsenceDaysIndex', 'ClassIndex']


**为方便后续建立模型，需要对除去目标特征之外的无序分类特征进行独热编码**

In [15]:
from pyspark.ml.feature import OneHotEncoderEstimator
selected_features = columns[4:-1]
encoder = OneHotEncoderEstimator(inputCols=selected_features,
                             outputCols=[i + 'vec' for i in selected_features])
data_encodered = encoder.fit(data).transform(data)
# encoder_features = encoder.getOutputCols()
# # new_labels = data_selected.columns
# features = data_features + encoder_features
# data_encodered
for col in selected_features:
    data_encodered = data_encodered.drop(col)

In [16]:
data_encodered.show(2)

+-----------+----------------+-----------------+----------+----------+---------------+-------------------+---------------+--------------------+--------------+----------------+-----------------------------+-----------------+--------------------------------+--------------------------+----------------+--------------+
|raisedhands|VisITedResources|AnnouncementsView|Discussion|ClassIndex|StageIDIndexvec|NationalITyIndexvec|GradeIDIndexvec|PlaceofBirthIndexvec| TopicIndexvec|RelationIndexvec|ParentAnsweringSurveyIndexvec|SectionIDIndexvec|ParentschoolSatisfactionIndexvec|StudentAbsenceDaysIndexvec|SemesterIndexvec|genderIndexvec|
+-----------+----------------+-----------------+----------+----------+---------------+-------------------+---------------+--------------------+--------------+----------------+-----------------------------+-----------------+--------------------------------+--------------------------+----------------+--------------+
|         15|              16|                2|    

<div id=3></div>

# 3. LogisticRegression分析

**使用`VectorAssembler`方法整合所有特征**

创建一个单一的列，将所有特征整合在一起。

In [17]:
encodered_columns = data_encodered.columns
encodered_columns.remove('ClassIndex')

import pyspark.ml.feature as ft
featuresCreator = ft.VectorAssembler(
    inputCols=encodered_columns, 
    outputCol='features')

**创建`LogisticRegression`评估器**

针对本数据集，我们先用逻辑回归进行拟合。

In [18]:
from pyspark.ml.classification import LogisticRegression
logistic = LogisticRegression(
    maxIter=10, 
    regParam=0.1, 
    labelCol='ClassIndex')


**创建管道并拟合模型**



In [19]:
from pyspark.ml import Pipeline

pipeline1 = Pipeline(stages=[
        featuresCreator, 
        logistic
    ])

In [20]:
train, test = data_encodered \
    .randomSplit([0.7, 0.3], seed=1)
model = pipeline1.fit(train)
test_model = model.transform(test)

**查看结果**

In [21]:
test_model.take(2)

[Row(raisedhands=0, VisITedResources=4, AnnouncementsView=8, Discussion=30, ClassIndex=2.0, StageIDIndexvec=SparseVector(2, {1: 1.0}), NationalITyIndexvec=SparseVector(13, {0: 1.0}), GradeIDIndexvec=SparseVector(9, {0: 1.0}), PlaceofBirthIndexvec=SparseVector(13, {0: 1.0}), TopicIndexvec=SparseVector(11, {0: 1.0}), RelationIndexvec=SparseVector(1, {}), ParentAnsweringSurveyIndexvec=SparseVector(1, {}), SectionIDIndexvec=SparseVector(2, {}), ParentschoolSatisfactionIndexvec=SparseVector(1, {}), StudentAbsenceDaysIndexvec=SparseVector(1, {}), SemesterIndexvec=SparseVector(1, {0: 1.0}), genderIndexvec=SparseVector(1, {0: 1.0}), features=SparseVector(60, {1: 4.0, 2: 8.0, 3: 30.0, 5: 1.0, 6: 1.0, 19: 1.0, 28: 1.0, 41: 1.0, 58: 1.0, 59: 1.0}), rawPrediction=DenseVector([-0.2687, -1.2538, 1.5225]), probability=DenseVector([0.1357, 0.0507, 0.8137]), prediction=2.0),
 Row(raisedhands=1, VisITedResources=7, AnnouncementsView=6, Discussion=10, ClassIndex=2.0, StageIDIndexvec=SparseVector(2, {1: 1

**模型性能评估**

本数据集目标类别为三个，属于多分类问题。调用`MulticlassClassificationEvaluator`类进行模型评估。

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    predictionCol='prediction', 
    labelCol='ClassIndex')
print('LR_Accuracy:{0:.3f}'\
      .format(evaluator.evaluate(test_model, {evaluator.metricName: "accuracy"})))

LR_Accuracy:0.647


**参数调优**

使用逻辑回归后模型的精度不是特别理想，考虑使用网格搜索进行参数调优。

In [26]:
# 导入tuning包
import pyspark.ml.tuning as tune
# 指定模型
logistic = LogisticRegression(labelCol='ClassIndex')
# 指定要循环遍历的参数列表
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2,10,30,50]).addGrid(logistic.regParam,[0.01,0.05,0.1,0.15]).build()

In [27]:
# 指定评估模型的方法
evaluator2 = MulticlassClassificationEvaluator(
    predictionCol='prediction', 
    labelCol='ClassIndex')
# 使用CrossValidator需要评估器、estimatorParamMaps和evaluator。该模型循环遍历值的网格，评估各个模型，并使用evaluator比较其性能
cv = tune.CrossValidator(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator2)
# 创建一个管道用于转化数据
from pyspark.ml import Pipeline
pipeline2 = Pipeline(stages=[
        featuresCreator, 
    ])
data_transformer = pipeline2.fit(train)
# 寻找模型的最佳参数组合
cvModel2 = cv.fit(data_transformer.transform(train))
test_model2 = cvModel2.transform(data_transformer.transform(test))

print('LR_gd_Accuracy:{0:.3f}'\
      .format(evaluator2.evaluate(test_model2, {evaluator2.metricName: "accuracy"})))

LR_gd_Accuracy:0.698


<div id=4></div>

# 4. RandomForestClassifier分析

使用逻辑回归进行分类的效果不是很理想，考虑使用随机森林对数据集进行分类。

**创建`RandomForestClassifier`评估器**

In [3]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="ClassIndex", featuresCol="features", numTrees=10)

**创建管道**

In [1]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
        featuresCreator, 
        rf
    ])

NameError: name 'featuresCreator' is not defined

**拟合模型**

In [30]:
train, test = data_encodered \
    .randomSplit([0.7, 0.3], seed=1)
model = pipeline.fit(train)
test_model = model.transform(test)

**查看结果**

In [31]:
test_model.take(2)

[Row(raisedhands=0, VisITedResources=4, AnnouncementsView=8, Discussion=30, ClassIndex=2.0, StageIDIndexvec=SparseVector(2, {1: 1.0}), NationalITyIndexvec=SparseVector(13, {0: 1.0}), GradeIDIndexvec=SparseVector(9, {0: 1.0}), PlaceofBirthIndexvec=SparseVector(13, {0: 1.0}), TopicIndexvec=SparseVector(11, {0: 1.0}), RelationIndexvec=SparseVector(1, {}), ParentAnsweringSurveyIndexvec=SparseVector(1, {}), SectionIDIndexvec=SparseVector(2, {}), ParentschoolSatisfactionIndexvec=SparseVector(1, {}), StudentAbsenceDaysIndexvec=SparseVector(1, {}), SemesterIndexvec=SparseVector(1, {0: 1.0}), genderIndexvec=SparseVector(1, {0: 1.0}), features=SparseVector(60, {1: 4.0, 2: 8.0, 3: 30.0, 5: 1.0, 6: 1.0, 19: 1.0, 28: 1.0, 41: 1.0, 58: 1.0, 59: 1.0}), rawPrediction=DenseVector([1.168, 0.0781, 8.7539]), probability=DenseVector([0.1168, 0.0078, 0.8754]), prediction=2.0),
 Row(raisedhands=1, VisITedResources=7, AnnouncementsView=6, Discussion=10, ClassIndex=2.0, StageIDIndexvec=SparseVector(2, {1: 1.0}

**评估模型性能**

In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    predictionCol='prediction', 
    labelCol='ClassIndex')

print('RF_Accuracy:{0:.3f}'\
      .format(evaluator.evaluate(test_model, {evaluator.metricName: "accuracy"})))

RF_Accuracy:0.662


**参数调优**

In [33]:
import pyspark.ml.tuning as tune

In [34]:
rf = RandomForestClassifier(labelCol="ClassIndex", featuresCol="features")
grid = tune.ParamGridBuilder().addGrid(rf.numTrees, [4,8,10,12,14,16]).build()

In [279]:
evaluator2 = MulticlassClassificationEvaluator(
    predictionCol='prediction', 
    labelCol='ClassIndex')
cv = tune.CrossValidator(estimator=rf,estimatorParamMaps=grid,evaluator=evaluator2)
from pyspark.ml import Pipeline

pipeline2 = Pipeline(stages=[
        featuresCreator, 
    ])
data_transformer = pipeline2.fit(train)
# 寻找模型的最佳参数组合
cvModel2 = cv.fit(data_transformer.transform(train))
test_model2 = cvModel2.transform(data_transformer.transform(test))
print('RF_gd_Accuracy:{0:.2f}'\
      .format(evaluator2.evaluate(test_model2, {evaluator2.metricName: "accuracy"})))

RF_gd_Accuracy:0.71
