In [1]:
import seaborn as sns
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
ss = SparkSession.Builder() \
     .appName("articles") \
     .master("spark://spark-master:7077") \
     .getOrCreate()

In [3]:
df = ss.read.parquet("hdfs://namenode:9000/data/articles.parquet")

In [4]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+--------+
|                  id|               title|                sapo|                 url|        source|pega_cate_id|         title_token|          sapo_token|       content_token|           all_token|   label|
+--------------------+--------------------+--------------------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+--------+
|6.366651292038185E17|Anh phát hiện 39 ...|Ngày 23/10, cảnh ...|http://vnmedia.vn...|    vnmedia.vn|         102|Anh phát_hiện 39 ...|Ngày 23/10 , cảnh...|Theo cảnh_sát địa...|anh phát_hiện    ...|Thế giới|
|6.368640877043630...|Phát hiện két sắt...|Theo TASS ngày 23...|http://congan.com...| congan.com.vn|         102|Phát_hiện két sắt...|( CAO ) Theo TASS...|Theo điều_tra ban

In [5]:
df_new = df.drop('id', 
                'title', 
                'sapo',
                'url',
                'source',
                'title_token',
                'sapo_token',
                'content_token')

In [6]:
df_new = df_new.dropna()

In [7]:
df_new.show(5)

+------------+--------------------+--------+
|pega_cate_id|           all_token|   label|
+------------+--------------------+--------+
|         102|anh phát_hiện    ...|Thế giới|
|         102|phát_hiện két sắt...|Thế giới|
|         102|máy_bay rơi ở mex...|Thế giới|
|         102|hình_ảnh đệ nhất ...|Thế giới|
|         102|thủ_lĩnh cao nhất...|Thế giới|
+------------+--------------------+--------+
only showing top 5 rows



**Tokenize text in all_token columns**

In [8]:
from pyspark.ml.feature import Tokenizer, CountVectorizer
tkn = Tokenizer().setInputCol("all_token").setOutputCol("content_tokenized")
train_df = tkn.transform(df_new)
# train_df = tokenized.drop('title_token', 'sapo_token', 'content_token')

In [9]:
train_df.show(5)

+------------+--------------------+--------+--------------------+
|pega_cate_id|           all_token|   label|   content_tokenized|
+------------+--------------------+--------+--------------------+
|         102|anh phát_hiện    ...|Thế giới|[anh, phát_hiện, ...|
|         102|phát_hiện két sắt...|Thế giới|[phát_hiện, két, ...|
|         102|máy_bay rơi ở mex...|Thế giới|[máy_bay, rơi, ở,...|
|         102|hình_ảnh đệ nhất ...|Thế giới|[hình_ảnh, đệ, nh...|
|         102|thủ_lĩnh cao nhất...|Thế giới|[thủ_lĩnh, cao, n...|
+------------+--------------------+--------+--------------------+
only showing top 5 rows



**TF-IDF**

In [10]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml import Pipeline

In [11]:
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "label_id")
hashingTF = HashingTF(inputCol="content_tokenized", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(train_df)
dataset = pipelineFit.transform(train_df)

In [12]:
dataset = dataset.withColumnRenamed("label", "label_name")
dataset = dataset.withColumnRenamed("label_id", "label")


In [13]:
dataset.show(5)

+------------+--------------------+----------+--------------------+--------------------+--------------------+-----+
|pega_cate_id|           all_token|label_name|   content_tokenized|         rawFeatures|            features|label|
+------------+--------------------+----------+--------------------+--------------------+--------------------+-----+
|         102|anh phát_hiện    ...|  Thế giới|[anh, phát_hiện, ...|(10000,[44,277,57...|(10000,[44,277,57...|  1.0|
|         102|phát_hiện két sắt...|  Thế giới|[phát_hiện, két, ...|(10000,[54,63,250...|(10000,[54,63,250...|  1.0|
|         102|máy_bay rơi ở mex...|  Thế giới|[máy_bay, rơi, ở,...|(10000,[63,378,49...|(10000,[63,378,49...|  1.0|
|         102|hình_ảnh đệ nhất ...|  Thế giới|[hình_ảnh, đệ, nh...|(10000,[37,43,52,...|(10000,[37,43,52,...|  1.0|
|         102|thủ_lĩnh cao nhất...|  Thế giới|[thủ_lĩnh, cao, n...|(10000,[63,70,133...|(10000,[63,70,133...|  1.0|
+------------+--------------------+----------+--------------------+-----

In [14]:
df_train, df_test = dataset.randomSplit([0.8, 0.2])

In [15]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', 
                                          metricName='accuracy')

In [17]:
lr = LogisticRegression(regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(df_train)
pred = lrModel.transform(df_test)


In [18]:
evaluator.evaluate(pred)

0.821733459805549