### SVM

In [2]:
"""
    构建机器学习模型步骤；
        1、如何搜集数据 ？
            历史数据
        2、如何进行数据准备 ？
            提取特征字段和标签字段  -   特征工程    （花费时间最多的）
        3、如何训练模型 ？
            使用什么算法进行训练模型
        4、如何使用模型预测 ？
            使用训练的模型如决策树模型，进行预测
        5、如何评估模型的准确率?
            使用某一个标准来评估模型的准确率，二元分类中使用 AUC 作为评估标准
        6、模型训练参数如何影响准确率？
            训练模型时，针对算法传递不同的参数将会影响准确率和训练时间。
            如使用决策树算法，其中参数impurity、maxDepth、maxBins的值设置
        7、如何找出准确率最高的参数组合？
            不同的参数，不同的组合得到的模型不一样，准确率也不痒。
        8、如何确认是否Overfiiting（过度训练，过拟合）：
            Overfiting（过度训练）是指机器学习所学到的模型过度贴近trainData，从而导致误差变得很大。
            我们使用另一组数据testData再次测试，以避免overfitting的问题。
            - 如果训练评估阶段是AUC很高，但是测试阶段AUC很低，代表可能有overfitting的问题。
            - 如果测试与训练评估阶段的结果中AUC差异不大，就代表无overfitting的问题。
"""
print

<function print>

In [3]:
# 导入模块 pyspark
from pyspark import SparkConf, SparkContext
# 导入系统模块
import os
import time

In [4]:
# 设置环境变量
os.environ['JAVA_HOME'] = 'C:\Java\jdk1.8.0_91'
# HADOOP在Windows的兼容性问题  主要需要$HADOOP_HOME/lib中winutils.exe等文件
os.environ['HADOOP_HOME'] = 'C:\Java\hadoop-2.6.0-cdh5.7.6'
# 设置SPARK_HOME环境变量, 非常重要, 如果没有设置的话,SparkApplication运行不了
os.environ['SPARK_HOME'] = 'C:\Java\spark-2.2.0-bin-2.6.0-cdh5.7.6'

# Create SparkConf
sparkConf = SparkConf()\
    .setAppName('Python_Spark_Classifier')\
    .setMaster('local[2]')
# Create SparkContext
sc = SparkContext(conf=sparkConf)

In [5]:
print(sc)

<SparkContext master=local[2] appName=Python_Spark_Classifier>


In [6]:
# 读取数据集： 预测 网页是短暂的还是长青的
raw_rdd = sc.textFile('./datas/train.tsv')

raw_rdd.take(1)

['"url"\t"urlid"\t"boilerplate"\t"alchemy_category"\t"alchemy_category_score"\t"avglinksize"\t"commonlinkratio_1"\t"commonlinkratio_2"\t"commonlinkratio_3"\t"commonlinkratio_4"\t"compression_ratio"\t"embed_ratio"\t"framebased"\t"frameTagRatio"\t"hasDomainLink"\t"html_ratio"\t"image_ratio"\t"is_news"\t"lengthyLinkDomain"\t"linkwordscore"\t"news_front_page"\t"non_markup_alphanum_characters"\t"numberOfLinks"\t"numwords_in_url"\t"parametrizedLinkRatio"\t"spelling_errors_ratio"\t"label"']

### 数据集


In [7]:
# 读取数据集： 预测 网页是短暂的还是长青的
raw_rdd = sc.textFile('./datas/train.tsv')

raw_rdd.take(1)

['"url"\t"urlid"\t"boilerplate"\t"alchemy_category"\t"alchemy_category_score"\t"avglinksize"\t"commonlinkratio_1"\t"commonlinkratio_2"\t"commonlinkratio_3"\t"commonlinkratio_4"\t"compression_ratio"\t"embed_ratio"\t"framebased"\t"frameTagRatio"\t"hasDomainLink"\t"html_ratio"\t"image_ratio"\t"is_news"\t"lengthyLinkDomain"\t"linkwordscore"\t"news_front_page"\t"non_markup_alphanum_characters"\t"numberOfLinks"\t"numwords_in_url"\t"parametrizedLinkRatio"\t"spelling_errors_ratio"\t"label"']

In [8]:
len(raw_rdd.take(1)[0].split("\t"))###单个字符串长度

27

In [9]:
"""
    数据集中字段：
        -a. 每行数据的各个字段使用制表符隔开 \t
            文件的第一行为字段名称
        -b. 字段 0 - 2 
            表示的是 url网址、urlid网址ID、boilerplate连接的样本文字,此三个字段与判断网页是否长青性关系不大，忽略
        -c. 字段 3 - 25 
            总共23个字段属于特征字段值，基本上都是数值类型特征
        -d. 字段26
            属于 标签label，具有两个值
            - 0: 代表长青性（evergreen）
            - 1： 代表的是短暂性
"""

# 获取原始数据集中前2条数据
raw_rdd.take(2)

['"url"\t"urlid"\t"boilerplate"\t"alchemy_category"\t"alchemy_category_score"\t"avglinksize"\t"commonlinkratio_1"\t"commonlinkratio_2"\t"commonlinkratio_3"\t"commonlinkratio_4"\t"compression_ratio"\t"embed_ratio"\t"framebased"\t"frameTagRatio"\t"hasDomainLink"\t"html_ratio"\t"image_ratio"\t"is_news"\t"lengthyLinkDomain"\t"linkwordscore"\t"news_front_page"\t"non_markup_alphanum_characters"\t"numberOfLinks"\t"numwords_in_url"\t"parametrizedLinkRatio"\t"spelling_errors_ratio"\t"label"',
 '"http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html"\t"4042"\t"{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the S

In [10]:
# 获取第一条数据
header_data = raw_rdd.first()

# 采用过滤的方式，删除第一条数据
filter_rdd = raw_rdd.filter(lambda line: line != header_data)

In [11]:
datas = filter_rdd \
    .map(lambda line: line.replace("\"", ""))\
    .map(lambda line: line.split("\t"))
    
"总共：" + str(datas.count())

'总共：7395'

In [12]:
# 查看第一条数据


"""
    数据分为三个部分：
        - 第一个部分：
            第一个字段：特征地址为 类别特征数据
        - 第二个部分：
            最后一个字段：label 字段
        - 第三个部分：
            其他字段：数值特征字段
"""
datas.first()[3:]

['business',
 '0.789131',
 '2.055555556',
 '0.676470588',
 '0.205882353',
 '0.047058824',
 '0.023529412',
 '0.443783175',
 '0',
 '0',
 '0.09077381',
 '0',
 '0.245831182',
 '0.003883495',
 '1',
 '1',
 '24',
 '0',
 '5424',
 '170',
 '8',
 '0.152941176',
 '0.079129575',
 '0']

### 提取特征feature字段

In [13]:
# 类别特征数据转换：采用 1-of-K，其中K表示的就是类别的个数
# 构建 网页类别 字典
catetory_dic = datas \
    .map(lambda fields: fields[3]) \
    .distinct() \
    .zipWithIndex() \
    .collectAsMap()

catetory_dic

{'?': 2,
 'arts_entertainment': 3,
 'business': 0,
 'computer_internet': 6,
 'culture_politics': 5,
 'gaming': 4,
 'health': 12,
 'law_crime': 7,
 'recreation': 11,
 'religion': 8,
 'science_technology': 13,
 'sports': 1,
 'unknown': 10,
 'weather': 9}

In [14]:
#查看自带你的额项数
print(len(catetory_dic))

14


In [15]:
#查看类型
type(catetory_dic)

dict

In [16]:
#导入numpy
import numpy as np

In [17]:
# 转换 字段的值为 ? 转换为 0，转换为数值类型
def conver_float(x):
    return (0 if x == "?" else float(x))


# 特征字段提取
def extract_features(fields, catetory_dic, end_index):
    # 类别字段
    category_index = catetory_dic[fields[3]]
    category_features = np.zeros(len(catetory_dic))
    category_features[category_index] = 1.0
    
    # 数值字段
    numeric_features = [ conver_float(column)  for column in fields[4: end_index]]
    # print numeric_features
    
    # 返回  类别 特征 +  数值特征 
    return np.concatenate((category_features, numeric_features))

In [18]:
# 提取 标签字段
def extract_label(fields):
    label = fields[-1]
    return float(label)

In [19]:
# 测试 提取特征函数
sample_data = datas.first()
extract_features(sample_data, catetory_dic, len(sample_data)-1)

array([1.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
       0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
       0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
       0.00000000e+00, 0.00000000e+00, 7.89131000e-01, 2.05555556e+00,
       6.76470588e-01, 2.05882353e-01, 4.70588240e-02, 2.35294120e-02,
       4.43783175e-01, 0.00000000e+00, 0.00000000e+00, 9.07738100e-02,
       0.00000000e+00, 2.45831182e-01, 3.88349500e-03, 1.00000000e+00,
       1.00000000e+00, 2.40000000e+01, 0.00000000e+00, 5.42400000e+03,
       1.70000000e+02, 8.00000000e+00, 1.52941176e-01, 7.91295750e-02])

In [20]:
from pyspark.mllib.regression import LabeledPoint

In [21]:
# 特征工程：构建分类算法特征RDD： LabeledPoint
from pyspark.mllib.regression import LabeledPoint

labelpoint_rdd = datas.map(lambda r: 
          LabeledPoint(extract_label(r), extract_features(r, catetory_dic, len(r)-1)))

In [22]:
# 获取数据对比
datas.first()[3:],labelpoint_rdd.take(1)

(['business',
  '0.789131',
  '2.055555556',
  '0.676470588',
  '0.205882353',
  '0.047058824',
  '0.023529412',
  '0.443783175',
  '0',
  '0',
  '0.09077381',
  '0',
  '0.245831182',
  '0.003883495',
  '1',
  '1',
  '24',
  '0',
  '5424',
  '170',
  '8',
  '0.152941176',
  '0.079129575',
  '0'],
 [LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])])

### 数据集划分

In [23]:
# 划分数据集，按照 8: 1: 1划分
train_rdd, validation_rdd, test_rdd = labelpoint_rdd.randomSplit([7,2,1])

In [24]:
# 统计各个数据集条目数，并且将数据缓存起来
print('训练数据集: ', train_rdd.cache().count())
print('验证数据集: ', validation_rdd.cache().count())
print('测试数据集: ', test_rdd.cache().count())

训练数据集:  5262
验证数据集:  1439
测试数据集:  694


### 训练模块

In [25]:
# train(cls, data, iterations=100, step=1.0, regParam=0.01,  #正则
#       miniBatchFraction=1.0, initialWeights=None, regType='l2', 
#       intercept=False, validateData=True, convergenceTol=0.001) 
#导入模块
from pyspark.mllib.classification import SVMWithSGD
dt_model = SVMWithSGD.train(train_rdd,iterations=100
                            , regParam=0.01,miniBatchFraction=1.0)
dt_model

(weights=[0.2894095173433443,-0.354390200786637,-0.4348474817262035,-0.47500543853611166,-0.02750700301277201,-0.10524479088553164,-0.24296874524121473,-0.012078421530786561,-0.025760502431698428,-0.005935452214869844,-0.005776617901852765,0.3490500042258589,-0.016731391794543832,-0.09554506263649504,-0.5279341610976515,-1.2429059778300375,-0.39640447645250904,-0.13740791706152342,-0.0184958101522274,-0.007949207838425225,-4.87073936137362,0.19667558842266505,0.0,-0.12974330577782434,-0.04045271237820033,-0.2947402446439461,-0.48298558072519365,-0.6959652488804384,-0.6122215433808469,-60.08412241718933,-0.06812212331220191,-209.63317776478695,-71.72060119463222,-6.404246596097144,-0.2013498080715562,-0.1503641003361016], intercept=0.0)

In [26]:
# 基于训练的模型和验证书籍及 评估
score = dt_model.predict(validation_rdd.map(lambda lp: lp.features))

# 组合预测的值和实际真实的值
score_and_label = score.zip(validation_rdd.map(lambda lp: lp.label))

# 获取前十个数据打印
score_and_label.take(5)


[(0, 0.0), (0, 1.0), (0, 1.0), (0, 1.0), (0, 0.0)]

In [27]:
"""
    使用AUC（Area under the Curve of ROC）评估二分类模型，
    -a. AUC = 1
        最完美的情况，预测准确率到100%，但是不可能存在
    -b. 0.5  < AUC < 1
        优于随机猜测，具有预测的价值
    -c. AUC = 0.5
        余随机猜测一样，没有任何预测价值
    -d. AUC < 0.5
        适合于反向预测
        
    ROC曲线  PR曲线： 精确度precision 和 召回率 之间的关系
"""
print

<function print>

In [28]:
# 基于训练的模型和验证书籍及 评估
score = dt_model.predict(validation_rdd.map(lambda lp: lp.features))

# 组合预测的值和实际真实的值
score_and_label = score.zip(validation_rdd.map(lambda lp: lp.label))

# 获取前十个数据打印
score_and_label.take(10)

[(0, 0.0),
 (0, 1.0),
 (0, 1.0),
 (0, 1.0),
 (0, 0.0),
 (0, 1.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0)]

In [39]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics


# 方便对模型评估，定义函数
def evaluate_mode(model, validation_data):
    # 基于训练的模型和验证书籍及 评估
    score = model.predict(validation_data.map(lambda lp: lp.features)).map(lambda x: float(x))

    # 组合预测的值和实际真实的值
    score_and_label = score.zip(validation_data.map(lambda lp: lp.label))
    
    # 使用BinaryClassificationMetrics 计算AUC面积
    metrics = BinaryClassificationMetrics(score_and_label)

    return metrics.areaUnderROC

### 参数调优

In [52]:
"""
    构建一个函数，实现模型的 训练和评估功能，并且计算评估所需的时间
"""
from time import time

def train_evaluate_model(train_data, iterations, 
                         regParam,miniBatchFraction,validation_data):
    # 模型训练开始时间
    start_time = time()
    
    # 训练模型
    model = SVMWithSGD.train(train_rdd,iterations=iterations
                            ,regParam=regParam,miniBatchFraction=miniBatchFraction)
    
    # 模型验证评估
    auc = evaluate_mode(model,validation_data)
    
    # 计算花费时间
    duration = time() - start_time
    print("训练评估使用参数：miniBatchFraction = " + str(miniBatchFraction) + \
        ", iterations = " + str(iterations) + \
        ", regParam = " + str(regParam) + \
        " => 所需时间 = " + str(duration) + ", 评估AUC = " + str(auc))
    
    # 返回
    return (auc, iterations,regParam,miniBatchFraction,duration, model)
    

In [53]:
train_evaluate_model(train_rdd, 150, 0.01, 0.8,validation_rdd)

训练评估使用参数：miniBatchFraction = 0.8, iterations = 150, regParam = 0.01 => 所需时间 = 8.185468196868896, 评估AUC = 0.5


(0.5,
 150,
 0.01,
 0.8,
 8.185468196868896,
 (weights=[0.3552973670455374,-0.42400921430400235,-0.5176383656935559,-0.5628599471548509,-0.03272235916056206,-0.12496403838368975,-0.28743785140769784,-0.015120050314365726,-0.031562111383751756,-0.0067742856696276,-0.006952917754707379,0.42379659214000115,-0.022387077986309817,-0.11378913902105792,-0.6206653712867168,-1.2143923670201922,-0.4643404053410707,-0.16051797243339327,-0.01997105028603678,-0.007625207497432221,-5.719815620480423,0.22757396530661186,0.0,-0.15301350499805674,-0.04711827082571155,-0.34552202970129553,-0.5984844610035224,-0.8205473443804788,-0.7249098039310973,-71.02271835487804,-0.07831703182120767,-76.61167281678375,-84.70578171010413,-7.532263047027577,-0.23958979048890905,-0.17775478327203537], intercept=0.0))

### 最佳模型


In [56]:
# 定义函数
# 编写函数，可以对决策树 任意的参数调整进行训练和评估
def train_evaluate_params(train_data, iterations_list, 
                         regParam_list,miniBatchFraction_list,validation_data):
    # 训练及评估返回值
    metrics_list = [ train_evaluate_model(train_rdd,ite,reg,miniBatch,validation_rdd)
                        for ite in iterations_list
                        for reg in regParam_list
                        for miniBatch in miniBatchFraction_list
                    ]
    
    # 针对 auc值降序排序，找出最佳 模型
    sorted_metrics_list = sorted(metrics_list, key=lambda k: k[0], reverse=True)
    
    # 获取最佳模型
    best_params = sorted_metrics_list[0]
    
    # 打印显示，最佳参数组合
    print("最佳参数组合: miniBatchFraction -> " + str(best_params[3]) + \
             ", iterations -> " + str(best_params[1]) + \
              ", regParam -> " + str(best_params[2]) + \
              "\n AUC -> " + str(best_params[0])
         )
    
    # 返回模型
    return best_params[5]

In [59]:
print ('-------------------- 设置不同超参数的不同值进行训练评估 -------------------')
best_model = train_evaluate_params(train_rdd,
                                   [50,100], 
                                   [0.5,0.8], 
                                   [0.8,1],
                                  validation_rdd)

-------------------- 设置不同超参数的不同值进行训练评估 -------------------
训练评估使用参数：miniBatchFraction = 0.8, iterations = 50, regParam = 0.5 => 所需时间 = 7.275416374206543, 评估AUC = 0.5
训练评估使用参数：miniBatchFraction = 1, iterations = 50, regParam = 0.5 => 所需时间 = 7.2434141635894775, 评估AUC = 0.5
训练评估使用参数：miniBatchFraction = 0.8, iterations = 50, regParam = 0.8 => 所需时间 = 6.776387691497803, 评估AUC = 0.49931521138622853
训练评估使用参数：miniBatchFraction = 1, iterations = 50, regParam = 0.8 => 所需时间 = 6.645380020141602, 评估AUC = 0.49931521138622853
训练评估使用参数：miniBatchFraction = 0.8, iterations = 100, regParam = 0.5 => 所需时间 = 7.271415948867798, 评估AUC = 0.5000086926622341
训练评估使用参数：miniBatchFraction = 1, iterations = 100, regParam = 0.5 => 所需时间 = 7.390422582626343, 评估AUC = 0.5
训练评估使用参数：miniBatchFraction = 0.8, iterations = 100, regParam = 0.8 => 所需时间 = 7.302417755126953, 评估AUC = 0.5028173884151924
训练评估使用参数：miniBatchFraction = 1, iterations = 100, regParam = 0.8 => 所需时间 = 7.005400657653809, 评估AUC = 0.49931521138622853
最佳参数组合: mi

In [62]:
#### 使用 测试数据集 ，最佳模型是否过拟合
auc_metric = evaluate_mode(best_model, validation_rdd)

'基于测试数据集评估AUC = ', auc_metric

('基于测试数据集评估AUC = ', 0.5028173884151924)

### 保存模型

In [63]:
# 将最佳模型进行保存
best_model.save(sc, path='./datas/SVM-best-model')

### 加载模型使用

In [66]:
from pyspark.mllib.classification import SVMModel

load_dtc_model = SVMModel.load(sc, path='./datas/SVM-best-model')

load_dtc_model

(weights=[0.020367062806673396,-0.02465948807867913,-0.03108365031794814,-0.03400477832732595,-0.002209534482196023,-0.007394204336504548,-0.017147999490052152,-0.000932086446305012,-0.0019004579803960256,-0.0003565692046640863,-0.00045086043942416814,0.02450394528720281,-0.0015635121250946469,-0.006092671260585654,-0.03700419390321135,-0.13164463165550225,-0.027996169932592314,-0.009687431556840289,-0.00121175734342234,-0.0004314424361854842,-0.3535499635896977,0.014590274112449036,0.0,-0.009368448350714433,-0.0028794643893677173,-0.02108979225834374,-0.03532035921410548,-0.049235775532917254,-0.04221815148624418,-4.442364449747699,-0.0047673991101905774,4.5178017858959265,-5.101871120077326,-0.448044163979423,-0.014333776425147306,-0.010898578108425042], intercept=0.0)

In [68]:
help(load_dtc_model.predict)

Help on method predict in module pyspark.mllib.classification:

predict(x) method of pyspark.mllib.classification.SVMModel instance
    Predict values for a single data point or an RDD of points
    using the model trained.
    
    .. versionadded:: 0.9.0

