# Acquiring Data

In [1]:
#importing and creating a spark session, to establish the session for reading through those data points
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

inSpark = SparkSession.builder.appName('Fake News Analyzer').master("local[*]").getOrCreate()

dataset_inSpark = inSpark.read.csv('Fake.csv', inferSchema=True, header =True)


# Data Preparation

In [2]:
#data preprocessing with filtering as rdd, handling stopwords and tokenizing
from nltk.corpus import stopwords
import re as regex

fakeComments = dataset_inSpark.rdd.map(lambda i : i['title']).filter(lambda i: i is not None)   
#rdd.map is latest spark version instead of .map, since dataframe does not have map running on spark environ

stopWords = stopwords.words("english")


## Data Cleaning

In [3]:
tokens = fakeComments.map( lambda document: document.strip().lower()).map( lambda document: regex.split(" ", document)).map( lambda word: [i for i in word if i.isalpha()]).map( lambda word: [i for i in word if len(i) > 2] ).map( lambda word: [i for i in word if i not in stopWords]).zipWithIndex()

# Data Vectorization

In [4]:
from pyspark.ml import feature

#data preprocessing on mining textual data up with TF-IDF
sqlContext = SQLContext(sparkContext=inSpark.sparkContext, sparkSession=inSpark)

#Term Frequency: the RDD columnar creation of a reference Indexes and list of words from title
dF1 = sqlContext.createDataFrame(tokens, ["title_Words",'refIndex'])
vectorizingText = feature.CountVectorizer(inputCol="title_Words", outputCol="rawFeatures", vocabSize=10000, minDF=10.0)
vectorizedModel = vectorizingText.fit(dF1)
resultVectorized = vectorizedModel.transform(dF1)


In [5]:
#Inverse Document Frequency
idf = feature.IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(resultVectorized)
tfidf_result = idfModel.transform(resultVectorized) 


# Modelling

In [6]:
totalTopics = 3
maxIter = 10
from pyspark.mllib.clustering import LDA as MLlibLDA
from pyspark.mllib.linalg import Vectors as MLlibVectors

model_LDA = MLlibLDA.train(tfidf_result[['refIndex','features']].rdd.mapValues(MLlibVectors.fromML).map(list),k=totalTopics, maxIterations = maxIter)

In [7]:
#extracting the top 3 words in each topic and cognizance the weights of those words

wordVocab = vectorizedModel.vocabulary
numberofWords = 3

topicIndices = inSpark.createDataFrame(model_LDA.describeTopics(maxTermsPerTopic = numberofWords))

In [8]:
#returning key topics
def yieldedTopics(topic):
    terms = topic[0]
    result = []
    for i in range(numberofWords):
        term = wordVocab[terms[i]]
        result.append(term)
    return result

keyTopics = topicIndices.rdd.map(lambda topic: yieldedTopics(topic)).collect()


In [9]:
#view the key topics
for topic in range(len(keyTopics)):
    print("Topic" + str(topic))
    for term in keyTopics[topic]:
        print(term)
    print('\n')

Topic0
trump
obama
hillary


Topic1
trump
hillary
obama


Topic2
trump
obama
hillary




In [None]:
#Deploy on your choice - web frameworks, apps, cloud, etc...