# Preprocess

# 目标：

将数据转换为 label,content 形式的JSON文件

# 问题：
- 1.数据是XML格式，转换为JSON
- 2.字符格式是GBK。自己一开始测试时，用UTF-8，Java nio报MalformedInputException
- 3.数据没有根元素，影响转换，每个文件都要在头和尾，加<docs></docs>
- 4.url字段，含有特殊符号“&”，影响JSON转换，需要清除该符号
- 5.有的数据，content字段，是空的，对分词造成影响

# Spark Processing

# 目标：

按Spark ML API要求，组织数据形式

# 步骤：

装载数据，生成dataFrame，相当于数据库表，原始数据格式为(label \t words),已经分好词，但没有去除停用词

生成dataFrame，对content列做特征提取

转换dataFrame，转换为NaivBayes需要的格式

split数据为training and test set

训练贝叶斯分类器

预测

# 问题：

特征提取/选择花费时间长，word2vec有负数，不符合贝叶斯分类的非负数要求，最后，用了最简单的TF-IDF，Spark的TF-IDF平滑于归一化都很自动化，顺便测试了sklearn的TF-IDF，得出的向量不是等维度的

不是太熟悉Transformation操作，需要花点时间测试

数据倾斜问题

互信息应该是更好的特征选择

In [2]:
#data = sc.textFile('/FileStore/tables/g9pufw5w1475166089797/data_processed2.json')
dataPath = "/FileStore/tables/kbhw49a91475214246176/data_processed3.json"

In [3]:
from pyspark.sql import Row
def importJsonFile(file):
  data = sc.textFile(file)
  parts = data.map(lambda l: l.split("\t"))
  news = parts.map(lambda p: Row(content=p[1].split(" "), label=p[0]))
  return news

In [4]:
# Infer the schema, and register the DataFrame as a table.
def newsDataFrame(dataPath):
    news = importJsonFile(dataPath)
    newsDF = sqlContext.createDataFrame(news)
    newsDF.registerTempTable("news")
    return newsDF

In [5]:
newsDataFrame(dataPath)

In [6]:
result = sqlContext.sql("SELECT * FROM news")
result.printSchema()

In [7]:
result.groupBy("label").count().show()

In [8]:
result.select("content").take(1)

In [9]:
'''
  sklearn TF-IDF feature
  size of vectors are not the same
'''
from sklearn.feature_extraction.text import TfidfVectorizer  
tfidf_transformer = TfidfVectorizer(encoding=u'utf-8',ngram_range=(1, 3),sublinear_tf=True) 
res = result.map(lambda l : [l[1],tfidf_transformer.fit_transform(l[0])])
res.take(2)

In [10]:
'''
  Spark Word2Vec feature
  has negative numbers
'''
from pyspark.ml.feature import Word2Vec

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=1000, minCount=5, inputCol="content", outputCol="vector")
model = word2Vec.fit(result)
word2vecResult = model.transform(result)

In [11]:
word2vecResult.registerTempTable("word2vecResult")
word2vecResult.printSchema()
word2vecResult.take(1)

In [12]:
word2vecResult.map(lambda l : [l[1],l[2]]).take(1)#RDD transform test

In [13]:
'''
  Spark TF-IDF feature
'''
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="content", outputCol="rawFeatures", numFeatures=100)
featurizedData = hashingTF.transform(result)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [14]:
rescaledData.printSchema()
rescaledData.select("features").take(2)

In [15]:
rescaledData.map(lambda l : [l[1],l[3]]).take(1)#RDD transform test

In [16]:
'''
  Prepare for NaiveBayes
'''
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.linalg import Vectors

def parseLine(line):
  label = float(line[0])
  features = line[1]
  return LabeledPoint(label,features)

In [17]:
#norData = word2vecResult.map(lambda row : [row[1],row[2]]).map(parseLine)
norData = rescaledData.map(lambda row : [row[1],row[3]]).map(parseLine)

In [18]:
'''
  split data into training and test sets
'''
training, test = norData.randomSplit([0.6, 0.4], seed=0)

In [19]:
model = NaiveBayes.train(training, 1.0)

In [20]:
# Make prediction and test accuracy.
predictionAndLabel = test.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda (x, v): x == v).count() / test.count()

In [21]:
print accuracy