In [1]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .master("yarn")\
        .config('spark.executor.instances','99')\
        .config('spark.executor.memory','4G')\
        .appName("mldemo")\
        .getOrCreate()

In [2]:
# read csv
sales = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/user/spark/share/words_all.csv")\
  .where("Words IS NOT NULL")

In [4]:
sales.show(5)

+--------------------+----------+-------------------------------------+--------------------+---------------+--------------------------------+
|                 URL|      Date|                                Title|              Author|        Website|                           Words|
+--------------------+----------+-------------------------------------+--------------------+---------------+--------------------------------+
|https://wealth.bu...|2020/07/20|       國安基金不退場，股市會過熱嗎？|              陳唯泰|bussinessweekly| 意外 國安 基金 不 退場 話說 ...|
|https://wealth.bu...|2020/07/20|台積電去美國不夠，還要去日本？讀賣...|黃嘉洵編譯／經濟日報|bussinessweekly|  讀賣 新聞 19日 報導 日本 政...|
|https://wealth.bu...|2020/07/20|  華為衝擊已過＋5G需求加持，外資看...|            Atkinson|bussinessweekly| 亞系 外資 在 最 新 研究 報告...|
|https://wealth.bu...|2020/07/17| 台股萬點以上就是貴？從「1指標」看...|     Charlotte夏綠蒂|bussinessweekly|    2020年 3月 股災 全球 股市...|
|https://wealth.bu...|2020/07/17|金融海嘯大賠出場，老婆抱著他一起哭...|                孫太|bussinessweekly|夫妻 因為 投資 理念 不同 產生...|
+-------------

In [5]:
# 計算詞頻
from pyspark.ml.feature import Tokenizer, CountVectorizer
tkn = Tokenizer().setInputCol("Words").setOutputCol("TokenOut")
tokenized = tkn.transform(sales.select("Words"))
cv = CountVectorizer()\
  .setInputCol("TokenOut")\
  .setOutputCol("features")\
  .setVocabSize(500)\
  .setMinTF(0)\
  .setMinDF(0)\
  .setBinary(True)

In [6]:
cvFitted = cv.fit(tokenized)
prepped = cvFitted.transform(tokenized)

In [7]:
prepped.show(5)

+--------------------------------+----------------------------+--------------------+
|                           Words|                    TokenOut|            features|
+--------------------------------+----------------------------+--------------------+
| 意外 國安 基金 不 退場 話說 ...|[意外, 國安, 基金, 不, 退...|(500,[0,1,2,3,5,6...|
|  讀賣 新聞 19日 報導 日本 政...| [讀賣, 新聞, 19日, 報導,...|(500,[0,2,5,6,18,...|
| 亞系 外資 在 最 新 研究 報告...| [亞系, 外資, 在, 最, 新,...|(500,[0,1,2,3,4,5...|
|    2020年 3月 股災 全球 股市...|   [2020年, 3月, 股災, 全...|(500,[0,1,2,3,6,7...|
|夫妻 因為 投資 理念 不同 產生...|[夫妻, 因為, 投資, 理念, ...|(500,[0,1,2,3,4,5...|
+--------------------------------+----------------------------+--------------------+
only showing top 5 rows



In [8]:
%time cvFitted = cv.fit(tokenized)
prepped = cvFitted.transform(tokenized)

CPU times: user 3.99 ms, sys: 14 ms, total: 17.9 ms
Wall time: 36.7 s


In [9]:
# 將詞頻用LDA進行文件分類
# setK() 分幾類
from pyspark.ml.clustering import LDA
lda = LDA().setK(5).setMaxIter(5)

In [10]:
model = lda.fit(prepped)

In [11]:
%time model = lda.fit(prepped)

CPU times: user 5.25 ms, sys: 9.68 ms, total: 14.9 ms
Wall time: 25.7 s


In [12]:
model.describeTopics(3).show()
#cvFitted.vocabulary

+-----+-----------+--------------------+
|topic|termIndices|         termWeights|
+-----+-----------+--------------------+
|    0| [0, 2, 10]|[0.01191710462391...|
|    1|  [0, 1, 3]|[0.01995731890938...|
|    2|[0, 4, 119]|[0.01587687698025...|
|    3| [0, 14, 2]|[0.01194581290519...|
|    4|  [0, 3, 2]|[0.00963768413344...|
+-----+-----------+--------------------+



In [None]:
# 計算tf-idf
from pyspark.ml.feature import HashingTF, IDF
tf = HashingTF()\
  .setInputCol("DescOut")\
  .setOutputCol("TFOut")\
  .setNumFeatures(10000)
idf = IDF()\
  .setInputCol("TFOut")\
  .setOutputCol("IDFOut")\
  .setMinDocFreq(2)

In [None]:
idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(10, False)