In [None]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizerModel, CountVectorizer

In [None]:
from pyspark.ml.linalg import SparseVector, Vector

In [None]:
spark = SparkSession.builder.appName("PySpark Text Analysis Example").getOrCreate()

In [None]:
sc = spark.sparkContext

In [None]:
# read all the files
wikif = sc.wholeTextFiles("file:///home/hduser/Downloads/sharedfolder/mlexamples/TFIDF-CaseStudy-4-PySpark/wikidocs/*.txt")
# wikif = sc.wholeTextFiles("mllib/wikidocs/*.txt")

In [None]:
wikif.first()

In [None]:
# Remove path from the file name
# wikif.first()[0].split('/')[-1]
# (wikif.first()[0].split('/')[-1],wikif.first()[1])

wiki=wikif.map(lambda rec:((rec[0].split('/')[-1]),rec[1]))

In [None]:
wiki.first()

In [None]:
# convert to dataframe
filedata=spark.createDataFrame(wiki).toDF('label','doc')
filedata.printSchema()

In [None]:
filedata.show(2)

In [None]:
# convert document to list of words
tokenizer = RegexTokenizer(inputCol="doc", outputCol="allwords", pattern="\\W")
allWordsData = tokenizer.transform(filedata)
allWordsData.printSchema()

In [None]:
allWordsData.show(2)

In [None]:
# remove the stop words
remover = StopWordsRemover(inputCol="allwords", outputCol="words")
wordsData=remover.transform(allWordsData)
wordsData.printSchema()

In [None]:
wordsData.show(2)

In [None]:
wordsData.select('allwords').show(1,False)

In [None]:
wordsData.select('words').show(1,False)

In [None]:
# Get the required columns 
nwordsData=wordsData.select('label','words')
nwordsData.printSchema()

In [None]:
nwordsData.show(2)

In [None]:
# Build a term frequency matrix. Also check the vocabulary
# val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("rawFeatures").fit(nwordsData)
# cvModel.vocabulary.length
# val cvm = cvModel.transform(nwordsData)
cvModel=CountVectorizer(inputCol='words',outputCol='rawFeatures').fit(nwordsData)
len(cvModel.vocabulary)

In [None]:
type(cvModel.vocabulary)

In [None]:
cvModel.vocabulary

In [None]:
cvm = cvModel.transform(nwordsData)
type(cvm)

In [None]:
cvm.printSchema()

In [None]:
cvm.show(2)

In [None]:
# Build the Inverse document frequency
idf=IDF(inputCol="rawFeatures", outputCol="features").fit(cvm)
tfIDFData = idf.transform(cvm)

In [None]:
# Apply the function to extract the top words.
# Top 10 is specified below but you can change that.
tfIDFData.printSchema()

In [None]:
tfIDFData.show(2)

In [None]:
tfIDFData.count()

In [None]:
tfIDFRDD=tfIDFData.select("label","features").rdd

In [None]:
tfIDFRDD.count()

In [None]:
type(tfIDFRDD.first())

In [None]:
tfIDFRDD.first()

In [None]:
tfIDFRDD.first().features.indices

In [None]:
len(tfIDFRDD.first().features.indices)

In [None]:
tfIDFRDD.first().features.values

In [None]:
len(tfIDFRDD.first().features.values)

In [None]:
type(tfIDFRDD.first().features.values[0])

In [None]:
type(tfIDFRDD.first().features.indices[0])

In [None]:
tfIDFRDD.first().label

In [None]:
def findf2(vocab,words,idfs):
    dict1={}
    for i in range(len(words)):
        if idfs[i] in dict1:
            dict1[idfs[i]].append(words[i])
        else:
            dict1[idfs[i]]=[words[i]]
    list1=list(dict1.keys())
    list1.sort(reverse=True)
    kwords=[]
    for j in list1[:10]:
        for k in range(len(dict1[j])):
            kwords.append(vocab[dict1[j][k]])
    return kwords[:10]
 

In [None]:
for rec in tfIDFRDD.take(tfIDFRDD.count()):
    kwords10=(rec.label, findf2(cvModel.vocabulary,rec.features.indices, rec.features.values))
    print(kwords10)
    