## 一、载入数据集data

In [1]:
import time
from pyspark.sql import SQLContext
from pyspark import SparkContext
# 利用spark的csv库直接载入csv格式的数据
sc = SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
                                                                  inferschema='true').load('train.csv')
# 选10000条数据集，减少运行时间
data = data.sample(False, 0.01, 100)
print(data.count())

8703


In [2]:
# 扩展项
# 关闭对象sc，避免重新运行时报错，但是关闭sc后，之后的操作无法运行。
# with SparkContext() as sc:
#     sqlContext = SQLContext(sc)
#     data = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
#                                                                   inferschema='true').load('train.csv')
#     # 选10000条数据集，减少运行时间
#     data = data.sample(False, 0.01, 100)
#     print(data.count())

### 1.1 除去与需求无关的列

In [3]:
# 除去一些不要的列，并展示前五行
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+---------------+--------------------+
|       Category|            Descript|
+---------------+--------------------+
|  LARCENY/THEFT|GRAND THEFT FROM ...|
|  VEHICLE THEFT|   STOLEN AUTOMOBILE|
|   NON-CRIMINAL|      FOUND PROPERTY|
|SECONDARY CODES|   JUVENILE INVOLVED|
| OTHER OFFENSES|DRIVERS LICENSE, ...|
+---------------+--------------------+
only showing top 5 rows



### 1.2 显示数据结构

In [4]:
# 利用printSchema()方法显示数据的结构
data.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)



### 1.3 查看犯罪类型最多的前20个

In [5]:
# 包含数量最多的20类犯罪
from pyspark.sql.functions import col
data.groupBy('Category').count().orderBy(col('count').desc()).show()

+--------------------+-----+
|            Category|count|
+--------------------+-----+
|       LARCENY/THEFT| 1725|
|      OTHER OFFENSES| 1230|
|        NON-CRIMINAL|  962|
|             ASSAULT|  763|
|       VEHICLE THEFT|  541|
|       DRUG/NARCOTIC|  494|
|           VANDALISM|  447|
|            WARRANTS|  406|
|            BURGLARY|  347|
|      SUSPICIOUS OCC|  295|
|      MISSING PERSON|  284|
|             ROBBERY|  225|
|               FRAUD|  159|
|     SECONDARY CODES|  124|
|FORGERY/COUNTERFE...|  109|
|         WEAPON LAWS|   86|
|            TRESPASS|   63|
|        PROSTITUTION|   59|
|  DISORDERLY CONDUCT|   54|
|         DRUNKENNESS|   52|
+--------------------+-----+
only showing top 20 rows



### 1.4 查看犯罪描述最多的前20个

In [6]:
# 包含犯罪数量最多的20个描述
data.groupBy('Descript').count().orderBy(col('count').desc()).show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|  569|
|       LOST PROPERTY|  323|
|             BATTERY|  301|
|   STOLEN AUTOMOBILE|  262|
|DRIVERS LICENSE, ...|  244|
|AIDED CASE, MENTA...|  223|
|      WARRANT ARREST|  222|
|PETTY THEFT FROM ...|  216|
|SUSPICIOUS OCCURR...|  211|
|MALICIOUS MISCHIE...|  184|
|   TRAFFIC VIOLATION|  168|
|THREATS AGAINST LIFE|  154|
|PETTY THEFT OF PR...|  152|
|      FOUND PROPERTY|  138|
|MALICIOUS MISCHIE...|  138|
|ENROUTE TO OUTSID...|  121|
|GRAND THEFT OF PR...|  115|
|MISCELLANEOUS INV...|  101|
|   DOMESTIC VIOLENCE|   99|
|        FOUND PERSON|   98|
+--------------------+-----+
only showing top 20 rows



## 二、对犯罪描述进行分词

### 2.1 对Descript分词，先切分单词，再删除停用词

流程和scikit-learn版本的很相似，包含3个步骤：   
1.regexTokenizer: 利用正则切分单词   
2.stopwordsRemover: 移除停用词   
3.countVectors: 构建词频向量   

RegexTokenizer：基于正则的方式进行文档切分成单词组
inputCol: 输入字段  
outputCol: 输出字段  
pattern： 匹配模式，根据匹配到的内容切分单词   

CountVectorizer：构建词频向量   
covabSize: 限制的词频数   
minDF：如果是float，则表示出现的百分比小于minDF,不会被当做关键词  
       如果是int，则表示出现是次数小于minDF，不会被当做关键词  

In [7]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# 正则切分单词
# inputCol:输入字段名
# outputCol:输出字段名
regexTokenizer = RegexTokenizer(inputCol='Descript', outputCol='words', pattern='\\W')
# 停用词
add_stopwords = ['http', 'https', 'amp', 'rt', 't', 'c', 'the']
stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered').setStopWords(add_stopwords)
# 构建词频向量
count_vectors = CountVectorizer(inputCol='filtered', outputCol='features', vocabSize=10000, minDF=5)

### 2.2 对分词后的词频率排序，最频繁出现的设置为0 

### StringIndexer
StringIndexer将一列字符串label编码为一列索引号，根据label出现的频率排序，最频繁出现的label的index为0  
该例子中，label会被编码成从0-32的整数，最频繁的label被编码成0 
    
Pipeline是基于DataFrame的高层API，可以方便用户构建和调试机器学习流水线，可以使得多个机器学习算法顺序执行，达到高效的数据处理的目的。  
   
fit():将DataFrame转换成一个Transformer的算法，将label列转化为特征向量   
transform(): 将特征向量作为新列添加到DataFrame    

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol='Category', outputCol='label')
pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, count_vectors, label_stringIdx])
# fit the pipeline to training documents
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
dataset.show(5)

+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|       Category|            Descript|               words|            filtered|            features|label|
+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|  LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(309,[0,2,3,4,6],...|  0.0|
|  VEHICLE THEFT|   STOLEN AUTOMOBILE|[stolen, automobile]|[stolen, automobile]|(309,[9,27],[1.0,...|  4.0|
|   NON-CRIMINAL|      FOUND PROPERTY|   [found, property]|   [found, property]|(309,[5,32],[1.0,...|  2.0|
|SECONDARY CODES|   JUVENILE INVOLVED|[juvenile, involved]|[juvenile, involved]|(309,[67,218],[1....| 13.0|
| OTHER OFFENSES|DRIVERS LICENSE, ...|[drivers, license...|[drivers, license...|(309,[14,23,28,30...|  1.0|
+---------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



### 三、训练/测试集划分

In [9]:
# set seed for reproducibility
# 数据集划分训练集和测试集，比例7:3， 设置随机种子100
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print('Training Dataset Count:{}'.format(trainingData.count()))
print('Test Dataset Count:{}'.format(testData.count()))

Training Dataset Count:6117
Test Dataset Count:2586


### 四、模型训练和评价

#### 1.以词频作为特征，利用逻辑回归进行分类

模型在测试集上预测和打分，查看10个预测概率值最高的结果：

LogisticRegression：逻辑回归模型   
maxIter：最大迭代次数   
regParam：正则化参数    
elasticNetParam：正则化。0：l1;1:l2   

In [12]:
start_time = time.time()
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
# 过滤prediction类别为0数据集
predictions.filter(predictions['prediction'] == 0).select('Descript', 'Category', 'probability', 'label', 'prediction').orderBy('probability', accending=False).show(n=10, truncate=30)

+--------------------------+--------+------------------------------+-----+----------+
|                  Descript|Category|                   probability|label|prediction|
+--------------------------+--------+------------------------------+-----+----------+
|        ARSON OF A VEHICLE|   ARSON|[0.1194196587417514,0.10724...| 26.0|       0.0|
|        ARSON OF A VEHICLE|   ARSON|[0.1194196587417514,0.10724...| 26.0|       0.0|
|        ARSON OF A VEHICLE|   ARSON|[0.1194196587417514,0.10724...| 26.0|       0.0|
|           ATTEMPTED ARSON|   ARSON|[0.12978385966276762,0.1084...| 26.0|       0.0|
|     CREDIT CARD, THEFT OF|   FRAUD|[0.21637136655265077,0.0836...| 12.0|       0.0|
|     CREDIT CARD, THEFT OF|   FRAUD|[0.21637136655265077,0.0836...| 12.0|       0.0|
|     CREDIT CARD, THEFT OF|   FRAUD|[0.21637136655265077,0.0836...| 12.0|       0.0|
|     CREDIT CARD, THEFT OF|   FRAUD|[0.21637136655265077,0.0836...| 12.0|       0.0|
|     CREDIT CARD, THEFT OF|   FRAUD|[0.21637136655265

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# predictionCol: 预测列的名称
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
# 预测准确率
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

0.9641817609126011
8.245999813079834


#### 2.以TF-ID作为特征，利用逻辑回归进行分类

In [14]:
from pyspark.ml.feature import HashingTF, IDF
start_time = time.time()
# numFeatures: 最大特征数
hashingTF = HashingTF(inputCol='filtered', outputCol='rawFeatures', numFeatures=10000)
# minDocFreq：过滤的最少文档数量
idf = IDF(inputCol='rawFeatures', outputCol='features', minDocFreq=5)
pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, hashingTF, idf, label_stringIdx])
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lr_model = lr.fit(trainingData)
predictions = lr_model.transform(testData)
predictions.filter(predictions['prediction'] == 0).select('Descript', 'Category', 'probability', 'label', 'prediction').\
orderBy('probability', ascending=False).show(n=10, truncate=30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.01889

In [15]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

0.9653361434618551
12.998999834060669


#### 3.交叉验证

用交叉验证来优化参数，这里针对基于词频特征的逻辑回归模型进行优化

In [16]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
start_time = time.time()
pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, count_vectors, label_stringIdx])
pipeline_fit = pipeline.fit(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
# 为交叉验证创建参数
# ParamGridBuilder：用于基于网格搜索的模型选择的参数网格的生成器
# addGrid：将网格中给定参数设置为固定值
# parameter：正则化参数
# maxIter：迭代次数
# numFeatures：特征值
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
             .addGrid(lr.maxIter, [10, 20, 50])
#              .addGrid(idf.numFeatures, [10, 100, 1000])
             .build())

# 创建五折交叉验证
# estimator：要交叉验证的估计器
# estimatorParamMaps：网格搜索的最优参数
# evaluator：评估器
# numFolds：交叉次数
cv = CrossValidator(estimator=lr,\
                   estimatorParamMaps=paramGrid,\
                   evaluator=evaluator,\
                   numFolds=5)
cv_model = cv.fit(trainingData)
predictions = cv_model.transform(testData)

# 模型评估
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

0.9807684755923513
368.97300004959106


#### 4.朴素贝叶斯

In [17]:
from pyspark.ml.classification import NaiveBayes
start_time = time.time()
# smoothing：平滑参数
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select('Descript', 'Category', 'probability', 'label', 'prediction') \
    .orderBy('probability', ascending=False) \
    .show(n=10, truncate=30)

+----------------------+-------------+------------------------------+-----+----------+
|              Descript|     Category|                   probability|label|prediction|
+----------------------+-------------+------------------------------+-----+----------+
|   PETTY THEFT BICYCLE|LARCENY/THEFT|[1.0,1.236977662838925E-20,...|  0.0|       0.0|
|   PETTY THEFT BICYCLE|LARCENY/THEFT|[1.0,1.236977662838925E-20,...|  0.0|       0.0|
|   PETTY THEFT BICYCLE|LARCENY/THEFT|[1.0,1.236977662838925E-20,...|  0.0|       0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...|  0.0|       0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...|  0.0|       0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...|  0.0|       0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...|  0.0|       0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...|  0.0|       0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,

In [18]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

0.977432832447723
5.371000051498413


#### 5.随机森林

In [19]:
from pyspark.ml.classification import RandomForestClassifier
start_time = time.time()
# numTree：训练树的个数
# maxDepth：最大深度
# maxBins：连续特征离散化的最大分类数
rf = RandomForestClassifier(labelCol='label', \
                            featuresCol='features', \
                            numTrees=100, \
                            maxDepth=4, \
                            maxBins=32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select('Descript','Category','probability','label','prediction') \
    .orderBy('probability', ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.116

In [20]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

0.27929770811242954
36.63699984550476


上面的结果可以看出：随机森林是优秀的、鲁棒的通用模型，但对于高维稀疏数据来说，并不是一个很好的选择。  
明显，选择使用交叉验证的逻辑回归