`written by Jackson

## From Python list to Spark DataFrame

In [1]:
# 初始化环境
#spark 的学习直接查看官方文档
from pyspark.sql import SparkSession, Row
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("lab3") 
#Rdd 的入口
sc = SparkContext(conf = conf) 
#Datafram 的入口
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
#How  to creat Datafram, there is the wsceond method using RDD
trainingData = [["Chinese Beijing Chinese", "c"],\
                ["Chinese Chinese Nanjing", "c"],\
                ["Chinese Macao", "c"],\
                ["Australia Sydney Chinese","o"],\
               ]
testData = ["Chinese Chinese Chinese Australia Sydney"]
type(trainingData)

list

In [4]:
trainRDD = sc.parallelize(trainingData)
testRDD = sc.parallelize(testData)
trainRDD.collect()

[['Chinese Beijing Chinese', 'c'],
 ['Chinese Chinese Nanjing', 'c'],
 ['Chinese Macao', 'c'],
 ['Australia Sydney Chinese', 'o']]

In [5]:
#Bulid RDD
trainRDD = sc.parallelize(trainingData)
testRDD = sc.parallelize(testData)
# 把RDD 的每一行转化成行Row()的类型，为DataFram 做准备
trainRDD = trainRDD.map(lambda e: Row(descript=e[0], category=e[1]))
testRDD = testRDD.map(lambda e: Row(descript=e))
trainRDD.collect()

[Row(category='c', descript='Chinese Beijing Chinese'),
 Row(category='c', descript='Chinese Chinese Nanjing'),
 Row(category='c', descript='Chinese Macao'),
 Row(category='o', descript='Australia Sydney Chinese')]

In [6]:
testRDD.collect()

[Row(descript='Chinese Chinese Chinese Australia Sydney')]

In [9]:
# covert to DataFrame，build DtaaFram 
#createDataFrame function
trainDF = spark.createDataFrame(trainRDD)
#testDF = testRDD.toDF()
testDF = spark.createDataFrame(testRDD)
# trainDF.createOrReplaceTempView("doc") for sql 

#truncate=False 打印表格内容名的全貌
trainDF.show(truncate=False)

+--------+------------------------+
|category|descript                |
+--------+------------------------+
|c       |Chinese Beijing Chinese |
|c       |Chinese Chinese Nanjing |
|c       |Chinese Macao           |
|o       |Australia Sydney Chinese|
+--------+------------------------+



In [10]:
testDF.show(truncate=False)

+----------------------------------------+
|descript                                |
+----------------------------------------+
|Chinese Chinese Chinese Australia Sydney|
+----------------------------------------+



In [11]:
#if we got the above fram, we can do SOL operation anyway
#写各种SQL的各种语言
# normal operation for df using DSL syntax
trainDF.groupby("category").count().show()  

+--------+-----+
|category|count|
+--------+-----+
|       o|    1|
|       c|    3|
+--------+-----+



## 使用 Tokenizer  把 descript spilt

In [13]:
#开始把每句话分成一个个的单词  转化成words
#使用Tokenizer包，相当于 split的功能
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf

# defined a tokenizer  建立模型
#流程化的写法，写法都一样， 传入哪一列，把列名输入进去
tokenizer = Tokenizer(inputCol="descript", outputCol="words")

# use the tokenizer  开始使用这个模型
tokenizedDF = tokenizer.transform(trainDF)
tokenizedDF.show(truncate=False)


+--------+------------------------+----------------------------+
|category|descript                |words                       |
+--------+------------------------+----------------------------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |
|c       |Chinese Macao           |[chinese, macao]            |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|
+--------+------------------------+----------------------------+



In [15]:
testTokenizedDF = tokenizer.transform(testDF)
# see the result
tokenizedDF.select("category", "descript", "words").show(truncate=False)

+--------+------------------------+----------------------------+
|category|descript                |words                       |
+--------+------------------------+----------------------------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |
|c       |Chinese Macao           |[chinese, macao]            |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|
+--------+------------------------+----------------------------+



In [50]:
# user defined function
countTokens = udf(lambda e: len(e))
tokenizedDF = tokenizedDF.select("category", "descript", "words")\
    .withColumn("tokens", countTokens(col("words")))
tokenizedDF.show(truncate=False)

+--------+------------------------+----------------------------+------+
|category|descript                |words                       |tokens|
+--------+------------------------+----------------------------+------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |3     |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |3     |
|c       |Chinese Macao           |[chinese, macao]            |2     |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|3     |
+--------+------------------------+----------------------------+------+



## 使用 CountVectorizer 把 words 转换成 features

In [16]:
from pyspark.ml.feature import CountVectorizer
# 转化成words的时候需要用到define a CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features")  # 可以带参数

# Estimator fit with DataFrame to get a model, 需要去训练model
#fit 的时候就是train的时候
cvModel = cv.fit(tokenizedDF)

# 把word就转化成了features
#.transform 转化成新的DataFram
featuredDF = cvModel.transform(tokenizedDF)
featuredDF.show(truncate=False)



+--------+------------------------+----------------------------+-------------------------+
|category|descript                |words                       |features                 |
+--------+------------------------+----------------------------+-------------------------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |(6,[0,4],[2.0,1.0])      |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |(6,[0,2],[2.0,1.0])      |
|c       |Chinese Macao           |[chinese, macao]            |(6,[0,5],[1.0,1.0])      |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|(6,[0,1,3],[1.0,1.0,1.0])|
+--------+------------------------+----------------------------+-------------------------+



In [17]:
testFeaturedDF =cvModel.transform(testTokenizedDF)
testFeaturedDF.show()

+--------------------+--------------------+--------------------+
|            descript|               words|            features|
+--------------------+--------------------+--------------------+
|Chinese Chinese C...|[chinese, chinese...|(6,[0,1,3],[3.0,1...|
+--------------------+--------------------+--------------------+



[chinese, australia, sydney,macao, nanjing, beijing]

* `(6,[0,5],[2.0,1.0])` 是 `sparse Vector` 的形式  
* 6 = `vocabulary size`
* `[0,5]` 是 index
* `[2.0,1.0]` 是 value
* 等价于 dense Vector `[2.0, 0.0, 0.0, 0.0, 0.0, 1.0]`

## 使用 StringIndexer 把 label 转换成 features

In [18]:
#导包
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="category", outputCol="label")
indexedDF = indexer.fit(featuredDF).transform(featuredDF)
indexedDF.show(truncate=False)
# c 变成 0
# o 变成 1

+--------+------------------------+----------------------------+-------------------------+-----+
|category|descript                |words                       |features                 |label|
+--------+------------------------+----------------------------+-------------------------+-----+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |(6,[0,4],[2.0,1.0])      |0.0  |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |(6,[0,2],[2.0,1.0])      |0.0  |
|c       |Chinese Macao           |[chinese, macao]            |(6,[0,5],[1.0,1.0])      |0.0  |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|(6,[0,1,3],[1.0,1.0,1.0])|1.0  |
+--------+------------------------+----------------------------+-------------------------+-----+



In [19]:
finalDF = indexedDF.select("features", "label")
finalDF.show(truncate=False)

+-------------------------+-----+
|features                 |label|
+-------------------------+-----+
|(6,[0,4],[2.0,1.0])      |0.0  |
|(6,[0,2],[2.0,1.0])      |0.0  |
|(6,[0,5],[1.0,1.0])      |0.0  |
|(6,[0,1,3],[1.0,1.0,1.0])|1.0  |
+-------------------------+-----+



## 使用 Navie Bayes 

In [22]:
# 注意使用 ml 的 NaiveBayes 而不是 mllib 的！！！！！！！
#调包使用 Bayes
from pyspark.ml.classification import NaiveBayes
# smoothing=1.0, modelType='multinomial' 对应上之前默认的
nb = NaiveBayes(featuresCol='features', labelCol='label', predictionCol='nb_prediction', smoothing=1.0, modelType='multinomial',)

nb_model = nb.fit(finalDF)
# head 表示一行
#nb_model.transform(testFeaturedDF).show()
nb_model.transform(testFeaturedDF).head().nb_prediction  # 0.0 c 

0.0

## 使用 PipeLine

In [20]:
trainDF.show()

+--------+--------------------+
|category|            descript|
+--------+--------------------+
|       c|Chinese Beijing C...|
|       c|Chinese Chinese N...|
|       c|       Chinese Macao|
|       o|Australia Sydney ...|
+--------+--------------------+



In [74]:
testDF.show()

+--------------------+
|            descript|
+--------------------+
|Chinese Chinese C...|
+--------------------+



In [23]:
#第一步导包
from pyspark.ml import Pipeline
#直接把上面的所有的过程放在一起，串成一串
nb_pipeline = Pipeline(stages=[tokenizer, cv, indexer, nb])
#进行train,然后得到model
pipeModel = nb_pipeline.fit(trainDF)
#fit 之后我们得到 transformer
#使用model,对test数据使用model
predictionDF = pipeModel.transform(testDF)
predictionDF.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|            descript|               words|            features|       rawPrediction|         probability|nb_prediction|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|Chinese Chinese C...|[chinese, chinese...|(6,[0,1,3],[3.0,1...|[-8.2254733485002...|[0.59713120479585...|          0.0|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+



In [24]:
predictionDF.head().nb_prediction

0.0