In [1]:
sc

In [2]:
spark

In [1]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id
import jieba
import matplotlib.pyplot as plt
from pyspark.ml.feature import CountVectorizer, Tokenizer
from pyspark.ml.clustering import LDA, LDAModel

%matplotlib inline

In [2]:
dataset_location = 'project/two_news.csv'

df = spark.read.format("csv").option("header", "true").load(dataset_location)

In [3]:
df2 = df.withColumnRenamed("生技醫療類指數","tourism") \
    .withColumnRenamed("觀光類指數","bio")\
    .withColumnRenamed("網站","website")\
    .withColumnRenamed("關鍵字","keyword")\
    .withColumnRenamed("網址","url")\
    .withColumnRenamed("新聞標題","title")\
    .withColumnRenamed("內容","content")\
    .withColumnRenamed("預計影響日期","effect_time")

In [4]:
df2 = df2.withColumn("effect_time", F.to_timestamp(F.col("effect_time"), 'yyyy-MM-dd HH:mm:ss'))

In [5]:
df2 = df2.sort("effect_time")

In [6]:
df2.createOrReplaceTempView("news")

In [7]:
stop_words=open('./stopwords.txt', 'r', encoding='utf-8').readlines()
stop_words = [line.strip() for line in stop_words]

In [8]:
spark.sparkContext.addFile('userdict.txt')
spark.sparkContext.addFile('dict.txt.big')

In [9]:
def seg_list(x):
    if not jieba.dt.initialized:
        jieba.set_dictionary('dict.txt.big')
        jieba.load_userdict('userdict.txt')
    s=jieba.lcut(x, cut_all=False)
    s=[x for x in s if len(x)>1 and x not in stop_words]
    
    return s 
            
seg_udf = udf(seg_list, ArrayType(StringType()))

In [10]:
df2 = spark.sql("select * from news ").cache()

In [11]:
df2 = df2.withColumn('tokens',seg_udf(df2['content']))

In [12]:
df_try = df2.select("tokens","effect_time","bio","tourism","content")

In [13]:
df_try = df2.withColumn("id", monotonically_increasing_id())

In [None]:
df_try.write.parquet("ml_test/news_jieba.parquet")

In [None]:
df_try = spark.read.parquet("ml_test/news_jieba.parquet")

In [14]:
cv = CountVectorizer(inputCol="tokens", outputCol="features")
cvmodel = cv.fit(df_try)
df_vect = cvmodel.transform(df_try)

In [17]:
df_A = df_vect.select("id","tokens","features")

In [18]:
df_final = sqlContext.createDataFrame(df_A.head(100), df_A.schema)

In [19]:
num_topics = 5
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(df_final)
ll = model.logLikelihood(df_final)
lp = model.logPerplexity(df_final)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -820259.5690545622
The upper bound on perplexity: 42.75748379141797


In [None]:
vocab = cvmodel.vocabulary
topics = model.describeTopics()   
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
        print(word)
    print("*"*25)