## 创建SparkSession对象

In [1]:
from pyspark.sql import SparkSession
saprk = SparkSession.builder.appName('ModelBuilding').getOrCreate()

## 读取数据集

In [2]:
df = saprk.read.csv("hdfs://master:9000/home/hadoop/data/edu/GeneralData.csv", inferSchema=True, header = True)

## 数据描述性分析

In [3]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 年份: integer (nullable = true)
 |-- 招生数: double (nullable = true)
 |-- 在校学生数: double (nullable = true)
 |-- 毕业生数: double (nullable = true)
 |-- 获得职业资格证书毕业生数: double (nullable = true)
 |-- 预计毕业生数: double (nullable = true)
 |-- 教职工总数: double (nullable = true)
 |-- 专任教师数: double (nullable = true)
 |-- 普通中等专业学校数: double (nullable = true)
 |-- 成人中等专业学校数: double (nullable = true)



In [4]:
df.describe().show(3, False)###False 是否全部显示 False 不隐藏

+-------+------------------+------------------+------------------+------------------+-----------------+------------------------+-----------------+-----------------+------------------+------------------+------------------+
|summary|_c0               |年份              |招生数            |在校学生数        |毕业生数         |获得职业资格证书毕业生数|预计毕业生数     |教职工总数       |专任教师数        |普通中等专业学校数|成人中等专业学校数|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------------+-----------------+-----------------+------------------+------------------+------------------+
|count  |13                |13                |13                |13                |13               |13                      |13               |13               |13                |13                |13                |
|mean   |6.0               |2014.0            |548.085323076923  |1481.9263923076921|476.1026615384616|335.42013076923075      |483.6312846153846|87.65639999999999|66.19698

## 特征关联性分析

In [5]:
# 使用corr函数检查输入和输出变量之间的相互关系
from pyspark.sql.functions import corr

df.select(corr('专任教师数','招生数')).show()
df.select(corr('教职工总数','招生数')).show()
df.select(corr('普通中等专业学校数','招生数')).show()
df.select(corr('成人中等专业学校数','招生数')).show()
df.select(corr('在校学生数','招生数')).show()
df.select(corr('毕业生数','招生数')).show()
df.select(corr('预计毕业生数','招生数')).show()
df.select(corr('获得职业资格证书毕业生数','招生数')).show()

+------------------------+
|corr(专任教师数, 招生数)|
+------------------------+
|      0.9103651820014628|
+------------------------+

+------------------------+
|corr(教职工总数, 招生数)|
+------------------------+
|      0.9635610737014085|
+------------------------+

+--------------------------------+
|corr(普通中等专业学校数, 招生数)|
+--------------------------------+
|              0.9514088239921596|
+--------------------------------+

+--------------------------------+
|corr(成人中等专业学校数, 招生数)|
+--------------------------------+
|              0.8904020920770594|
+--------------------------------+

+------------------------+
|corr(在校学生数, 招生数)|
+------------------------+
|      0.9722661752850992|
+------------------------+

+----------------------+
|corr(毕业生数, 招生数)|
+----------------------+
|    0.6948556482451111|
+----------------------+

+--------------------------+
|corr(预计毕业生数, 招生数)|
+--------------------------+
|        0.8741069809057641|
+--------------------------+

+-------------------------------

## 特征工程
- VectorAssembler合并所有输入特征，输出单个特征向量化。

In [6]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

vec = VectorAssembler(inputCols=['教职工总数', '普通中等专业学校数'], outputCol='features')
features_df = vec.transform(df)
features_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 年份: integer (nullable = true)
 |-- 招生数: double (nullable = true)
 |-- 在校学生数: double (nullable = true)
 |-- 毕业生数: double (nullable = true)
 |-- 获得职业资格证书毕业生数: double (nullable = true)
 |-- 预计毕业生数: double (nullable = true)
 |-- 教职工总数: double (nullable = true)
 |-- 专任教师数: double (nullable = true)
 |-- 普通中等专业学校数: double (nullable = true)
 |-- 成人中等专业学校数: double (nullable = true)
 |-- features: vector (nullable = true)



In [7]:
model_df = features_df.select('features','招生数')
model_df.show(13, False)

+----------------+--------+
|features        |招生数  |
+----------------+--------+
|[97.3431,3846.0]|650.2739|
|[96.9477,3789.0]|711.777 |
|[95.6631,3938.0]|711.3957|
|[94.5081,3753.0]|649.9626|
|[92.1332,3681.0]|597.0785|
|[88.3959,3577.0]|541.2624|
|[86.6905,3536.0]|495.3553|
|[84.1495,3456.0]|479.8174|
|[82.1047,3398.0]|466.1428|
|[81.1147,3346.0]|451.5235|
|[79.9593,3322.0]|428.5024|
|[80.1482,3339.0]|457.4121|
|[80.3752,3266.0]|484.6056|
+----------------+--------+



## 划分数据集

In [8]:
train_df, test_df = model_df.randomSplit([0.7, 0.3])
print((train_df.count(), len(train_df.columns)))

(10, 2)


In [9]:
train_df.head(5)

[Row(features=DenseVector([80.1482, 3339.0]), 招生数=457.4121),
 Row(features=DenseVector([80.3752, 3266.0]), 招生数=484.6056),
 Row(features=DenseVector([81.1147, 3346.0]), 招生数=451.5235),
 Row(features=DenseVector([82.1047, 3398.0]), 招生数=466.1428),
 Row(features=DenseVector([84.1495, 3456.0]), 招生数=479.8174)]

## 构建和训练线性回归模型
- 使用r^2来评估线性回归模型在训练数据上的性能。

In [10]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(featuresCol='features',labelCol='招生数')
lr_model = lin_reg.fit(train_df)
lr_model.coefficients

DenseVector([36.0715, -0.7574])

In [11]:
print('{}{}'.format('方程截距:',lr_model.intercept))  #查看训练后线性方程的截距，模型中的intercept
print('{}{}'.format('方程参数系数:',lr_model.coefficients))  # 回归方程中的中5个自变量的系数

方程截距:68.42549825856315
方程参数系数:[36.07146182582057,-0.7574467456407157]


## 预测评估
- 以`'教职工总数', '普通中等专业学校数'`为输入变量

In [12]:
#通过R2判定系数，评估模型的拟合程度，其值越接近1说明模型有较高的价值
train_p = lr_model.evaluate(train_df)
print('{}{}'.format('训练集R2判定系数：',train_p.r2 ))
test_p = lr_model.evaluate(test_df)
print('{}{}'.format('预测集R2判定系数：',test_p.r2 ))
#通过均方误差meanSquaredError评估估值的准确性
print('{}{}'.format('均方误差:',train_p.meanSquaredError))

训练集R2判定系数：0.9721166799817282
预测集R2判定系数：0.24054479703726783
均方误差:230.1331043510782


In [13]:
#预测结果与真实结果对比
predictions = lr_model.transform(test_df)
print (predictions.collect())
print (predictions.show())

[Row(features=DenseVector([79.9593, 3322.0]), 招生数=428.5024, prediction=436.43624680944015), Row(features=DenseVector([92.1332, 3681.0]), 招生数=597.0785, prediction=603.6432342457803), Row(features=DenseVector([95.6631, 3938.0]), 招生数=711.3957, prediction=536.3080737150805)]
+----------------+--------+------------------+
|        features|  招生数|        prediction|
+----------------+--------+------------------+
|[79.9593,3322.0]|428.5024|436.43624680944015|
|[92.1332,3681.0]|597.0785| 603.6432342457803|
|[95.6631,3938.0]|711.3957| 536.3080737150805|
+----------------+--------+------------------+

None


## 模型封装

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import pandas as pd

def Get_Session():
    return SparkSession.builder.appName('analysis').getOrCreate()
def LR_Go(saprk,inputcols,labelcol):
    # 数据导入
    df = saprk.read.csv("hdfs://master:9000/home/hadoop/data/edu/GeneralData.csv", inferSchema=True, header = True)
    # 特征工程：输入特征合并为单变量
    vec = VectorAssembler(inputCols=inputcols, outputCol='features')
    features_df = vec.transform(df)
    model_df = features_df.select('features',labelcol)
    # 数据集划分
    train_df, test_df = model_df.randomSplit([0.7, 0.3],seed=123)
    # 模型训练
    lin_reg = LinearRegression(featuresCol='features',labelCol=labelcol)
    lr_model = lin_reg.fit(train_df)
    # 模型预测
    predictions = lr_model.transform(test_df)
    # 模型评估
    train_p = lr_model.evaluate(train_df)
    test_p = lr_model.evaluate(test_df)
    print('{}{}'.format('训练集R2判定系数：',train_p.r2 ))
    print('{}{}'.format('均方误差:',train_p.meanSquaredError))
    print('{}{}'.format('预测集R2判定系数：',test_p.r2 ))
    # 结果保存
#     GO = predictions.toPandas()
#     GO.to_csv('Predictions.csv')
    print (predictions.show())
sess = Get_Session()

In [4]:
LR_Go(sess,['在校学生数', '毕业生数'],'招生数')

训练集R2判定系数：0.9815928677261894
均方误差:174.14846474051734
预测集R2判定系数：0.9717173396117535
+--------------------+--------+-----------------+
|            features|  招生数|       prediction|
+--------------------+--------+-----------------+
|[1254.2893,406.3981]|451.5235|464.1651953832783|
|[1416.3127,516.1519]|495.3553|497.0250703650671|
|  [1689.882,554.384]|597.0785|620.5222275601685|
|[1816.4447,543.6524]|711.3957|690.7980593769042|
+--------------------+--------+-----------------+

None
