In [40]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.feature import Word2Vec
import time

## TF-IDF

In [48]:
sc = SparkContext()
documents = sc.textFile("spark_context.txt").map(lambda line: line.split(" "))

In [49]:
hashingTF = HashingTF()
tf = hashingTF.transform(documents)

In [50]:
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

In [51]:
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [52]:
text=documents.flatMap(lambda w:w)
wordCounts = text.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b).\
sortBy(lambda x: x[1],ascending=False)
wordCounts.take(10)

[('房東', 16370),
 ('說', 10908),
 ('月', 4731),
 ('房客', 4676),
 ('想', 3920),
 ('房子', 3515),
 ('押金', 3179),
 ('問題', 3170),
 ('住', 2909),
 ('人', 2888)]

In [55]:
start = time.time()
print("tfidf:")
for each in tfidf.collect():
    print(each)


print("tfidfIgnore:")
for each in tfidfIgnore.collect():
    print(each)
    
sc.stop()
end = time.time()
print("Total time is {} secs".format(end - start))

tfidf:
(1048576,[9123,21479,39917,48948,62888,64966,86543,91101,110180,120734,140244,140615,156771,159431,161366,164668,175332,189817,204013,211867,214352,237000,253201,261501,263464,267680,278920,283593,287234,297523,299773,317669,324763,326220,351912,356639,369828,370468,393260,411013,446542,451368,461225,462176,466257,480969,483604,487249,492673,494226,504840,516116,536026,540319,540761,548524,549146,564388,565122,567190,584018,586949,594075,633480,639487,647528,665076,669424,673607,686058,693377,698753,705559,712688,719155,720288,749537,754764,785714,791271,807125,813316,842968,852436,866777,869421,870324,899465,899717,917065,930996,942264,962040,982571,987278,992429,1005113,1025524,1031734,1036962,1048070],[9.73416371873,1.94213736599,4.13076250694,2.1688839547,13.5048662576,2.07447427023,4.20378764195,2.55863164692,5.82849302652,7.70029520342,3.90480601425,2.97290738471,5.71273196557,4.4421986654,5.13534584596,3.68391218267,3.48816760554,4.80992344552,1.67612087948,4.83809432249,

(1048576,[14444,47789,93771,139278,150530,154084,155682,182079,187619,204013,249485,258119,270845,282557,326220,350587,369803,369828,373759,399390,407126,407345,435431,439942,471862,473623,480969,483604,538730,548524,568421,586949,647053,669424,698614,772881,805806,832254,845229,846854,866777,903604,975306,987278,1005113,1006385,1025524,1044583],[10.4307771073,2.87598948752,3.54925529752,5.39771011043,4.15933587938,6.49189581433,5.0781338233,3.49560258403,7.38592403638,0.83806043974,3.43059775372,3.1730865589,3.90480601425,1.91955168763,3.51499165659,3.77832186714,2.96409675503,5.07674092356,4.10298294283,3.25764394693,4.06270904369,3.48816760554,7.29483009531,3.11532772475,1.84522328122,7.29483009531,1.23717787294,3.41143543287,2.26657319987,2.42729564486,4.75585622425,2.44203561371,0.930653226568,1.032066955,5.21538855363,4.20378764195,2.0443033926,4.63224226829,1.04807754356,7.02128092279,1.63534787955,6.78400447155,2.05484830578,2.28419480122,0.927127727695,2.47186396434,1.65885072

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.


Total time is 3.772944688796997 secs


In [None]:
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

## Word2Vector

In [8]:
try:
    sc = SparkContext()
    inp = sc.textFile("spark_w2v.txt").map(lambda row: row.split(" "))

    word2vec = Word2Vec()
    model = word2vec.fit(inp)

    synonyms = model.findSynonyms('水電', 40)
    print("水電 相似詞有:")
    for word, cosine_distance in synonyms:
        print("{}: {}".format(word, cosine_distance))
except Exception as e:
    print(e)
finally :
    sc.stop()

水電 相似詞有:
網路: 0.7625455856323242
第四台: 0.7436988949775696
有線: 0.7152613401412964
免費: 0.6998873353004456
太慢: 0.693181574344635
網路費: 0.6776204705238342
水電瓦斯: 0.6734753251075745
包: 0.6477099657058716
撤: 0.6446213126182556
來裝: 0.6398170590400696
牽: 0.6378631591796875
送到: 0.6364670395851135
分擔: 0.6322566270828247
無線: 0.6305233836174011
費用: 0.6271809935569763
使用者: 0.6256622672080994
接線: 0.6239302754402161
他收: 0.6044703722000122
比照辦理: 0.6016911268234253
徵收: 0.5981587171554565
斷網: 0.5964264869689941
管理費: 0.5939884781837463
不包: 0.5923348665237427
安裝: 0.5890639424324036
申辦: 0.5881028175354004
中華電信: 0.5819717049598694
業者: 0.578784167766571
跟本: 0.5787201523780823
盒: 0.5784633159637451
光纖: 0.5757163763046265
佔便宜: 0.5743582844734192
月費: 0.5733641982078552
包水包: 0.5697975754737854
線路: 0.5668936371803284
辦: 0.5663245320320129
頻寬: 0.5656952857971191
濾心: 0.5613507032394409
含: 0.5611875057220459
水費: 0.559997022151947
包水: 0.5591640472412109
