`written by Elijah DengDeng`

## From Python list to Spark DataFrame

In [1]:
# 初始化环境
from pyspark.sql import SparkSession, Row
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("lab3") 
sc = SparkContext(conf = conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [53]:
trainingData = [["Chinese Beijing Chinese", "c"],\
                ["Chinese Chinese Nanjing", "c"],\
                ["Chinese Macao", "c"],\
                ["Australia Sydney Chinese","o"],\
               ]
testData = ["Chinese Chinese Chinese Australia Sydney"]

In [54]:
trainRDD = sc.parallelize(trainingData)
testRDD = sc.parallelize(testData)
trainRDD = trainRDD.map(lambda e: Row(descript=e[0], category=e[1]))
testRDD = testRDD.map(lambda e: Row(descript=e))
trainRDD.collect()

[Row(category='c', descript='Chinese Beijing Chinese'),
 Row(category='c', descript='Chinese Chinese Nanjing'),
 Row(category='c', descript='Chinese Macao'),
 Row(category='o', descript='Australia Sydney Chinese')]

In [55]:
testRDD.collect()

[Row(descript='Chinese Chinese Chinese Australia Sydney')]

In [56]:
# covert to DataFrame
trainDF = spark.createDataFrame(trainRDD)
testDF = testRDD.toDF()
# trainDF.createOrReplaceTempView("doc") for sql 
trainDF.show()

+--------+--------------------+
|category|            descript|
+--------+--------------------+
|       c|Chinese Beijing C...|
|       c|Chinese Chinese N...|
|       c|       Chinese Macao|
|       o|Australia Sydney ...|
+--------+--------------------+



In [57]:
testDF.show()

+--------------------+
|            descript|
+--------------------+
|Chinese Chinese C...|
+--------------------+



In [58]:
# normal operation for df using DSL syntax
trainDF.groupby("category").count().show()  

+--------+-----+
|category|count|
+--------+-----+
|       o|    1|
|       c|    3|
+--------+-----+



## 使用 Tokenizer  把 descript spilt

In [59]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf

# defined a tokenizer
tokenizer = Tokenizer(inputCol="descript", outputCol="words")
# use the tokenizer
tokenizedDF = tokenizer.transform(trainDF)

testTokenizedDF = tokenizer.transform(testDF)
# see the result
tokenizedDF.select("category", "descript", "words").show()



+--------+--------------------+--------------------+
|category|            descript|               words|
+--------+--------------------+--------------------+
|       c|Chinese Beijing C...|[chinese, beijing...|
|       c|Chinese Chinese N...|[chinese, chinese...|
|       c|       Chinese Macao|    [chinese, macao]|
|       o|Australia Sydney ...|[australia, sydne...|
+--------+--------------------+--------------------+



In [60]:
testTokenizedDF.show()

+--------------------+--------------------+
|            descript|               words|
+--------------------+--------------------+
|Chinese Chinese C...|[chinese, chinese...|
+--------------------+--------------------+



In [50]:
# user defined function
countTokens = udf(lambda e: len(e))
tokenizedDF = tokenizedDF.select("category", "descript", "words")\
    .withColumn("tokens", countTokens(col("words")))
tokenizedDF.show(truncate=False)

+--------+------------------------+----------------------------+------+
|category|descript                |words                       |tokens|
+--------+------------------------+----------------------------+------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |3     |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |3     |
|c       |Chinese Macao           |[chinese, macao]            |2     |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|3     |
+--------+------------------------+----------------------------+------+



## 使用 CountVectorizer 把 words 转换成 features

In [61]:
from pyspark.ml.feature import CountVectorizer
# define a CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features")  # 可以带参数
# fit with DataFrame to get a model
cvModel = cv.fit(tokenizedDF)
# use 
featuredDF = cvModel.transform(tokenizedDF)
testFeaturedDF =cvModel.transform(testTokenizedDF)
featuredDF.show(truncate=False)

+--------+------------------------+----------------------------+-------------------------+
|category|descript                |words                       |features                 |
+--------+------------------------+----------------------------+-------------------------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |(6,[0,3],[2.0,1.0])      |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |(6,[0,2],[2.0,1.0])      |
|c       |Chinese Macao           |[chinese, macao]            |(6,[0,1],[1.0,1.0])      |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|(6,[0,4,5],[1.0,1.0,1.0])|
+--------+------------------------+----------------------------+-------------------------+



In [62]:
testFeaturedDF.show()

+--------------------+--------------------+--------------------+
|            descript|               words|            features|
+--------------------+--------------------+--------------------+
|Chinese Chinese C...|[chinese, chinese...|(6,[0,4,5],[3.0,1...|
+--------------------+--------------------+--------------------+



[chinese, australia, sydney,macao, nanjing, beijing]

* `(6,[0,5],[2.0,1.0])` 是 `sparse Vector` 的形式  
* 6 = `vocabulary size`
* `[0,5]` 是 index
* `[2.0,1.0]` 是 value
* 等价于 dense Vector `[2.0, 0.0, 0.0, 0.0, 0.0, 1.0]`

## 使用 StringIndexer 把 label 转换成 features

In [25]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="category", outputCol="label")
indexedDF = indexer.fit(featuredDF).transform(featuredDF)
indexedDF.show(truncate=False)

+--------+------------------------+----------------------------+------+-------------------------+-----+
|category|descript                |words                       |tokens|features                 |label|
+--------+------------------------+----------------------------+------+-------------------------+-----+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |3     |(6,[0,1],[2.0,1.0])      |0.0  |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |3     |(6,[0,2],[2.0,1.0])      |0.0  |
|c       |Chinese Macao           |[chinese, macao]            |2     |(6,[0,5],[1.0,1.0])      |0.0  |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|3     |(6,[0,3,4],[1.0,1.0,1.0])|1.0  |
+--------+------------------------+----------------------------+------+-------------------------+-----+



In [26]:
finalDF = indexedDF.select("features", "label")
finalDF.show(truncate=False)

+-------------------------+-----+
|features                 |label|
+-------------------------+-----+
|(6,[0,1],[2.0,1.0])      |0.0  |
|(6,[0,2],[2.0,1.0])      |0.0  |
|(6,[0,5],[1.0,1.0])      |0.0  |
|(6,[0,3,4],[1.0,1.0,1.0])|1.0  |
+-------------------------+-----+



## 使用 Navie Bayes 

In [71]:
# 注意使用 ml 的 NaiveBayes 而不是 mllib 的！！！！！！！
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='features', labelCol='label', predictionCol='nb_prediction', smoothing=1.0, modelType='multinomial',)

nb_model = nb.fit(finalDF)
nb_model.transform(testFeaturedDF).head().nb_prediction  # 0.0 c 

0.0

## 使用 PipeLine

In [73]:
trainDF.show()

+--------+--------------------+
|category|            descript|
+--------+--------------------+
|       c|Chinese Beijing C...|
|       c|Chinese Chinese N...|
|       c|       Chinese Macao|
|       o|Australia Sydney ...|
+--------+--------------------+



In [74]:
testDF.show()

+--------------------+
|            descript|
+--------------------+
|Chinese Chinese C...|
+--------------------+



In [80]:
from pyspark.ml import Pipeline
nb_pipeline = Pipeline(stages=[tokenizer, cv, indexer, nb])
pipeModel = nb_pipeline.fit(trainDF)
predictionDF = pipeModel.transform(testDF)
predictionDF.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|            descript|               words|            features|       rawPrediction|         probability|nb_prediction|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|Chinese Chinese C...|[chinese, chinese...|(6,[0,2,3],[3.0,1...|[-8.2254733485002...|[0.59713120479585...|          0.0|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+

