# ECE 494 Intro To Cloud Computing Final Project

This notebook contains our code for the final project. It is nearly identical to the one used for Amazon EMR. The one for Amazon EMR is sligtly modified to fit that environment.


This notebook follows the same sequence presented in the final presentation.<br>

Gather Data -> PreProcess -> Feature Extraction -> ML Algorithm -> Results


Each of the above steps are defined as methods and then a single driver calls them all. Each of their section has a title matching their functions.




To run this code the following must be done:
- Put the file name "english" in the same directory as this code or update the PATH_TO_ENGLISH constant with the path leading to the file.
- Know the path to the input files.
- Run all of the code up to and including the diver method definition.
- Call the driver method clusteringPipeline(inputPath) with the path to the input file. The input file is a text file that will be analyzed.


Notes:
  - Gather Data in this notebook refers to simply reading the text file with input. The twitter api code is in a separate file.
  - Results in this notebook refers to presenting the words that serve as topic cluster centers. The actual analysis is in the presentation.

In [None]:
#!cat /proc/cpuinfo #run to see cpu info for colab; req for comparison with aws emr; extaneous and not necessary for the rest of the code

# Imports

In [None]:
#pyspark:
!pip install pyspark
import pyspark as spark
from pyspark import SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,CountVectorizer
from pyspark.ml.clustering import KMeans
from pyspark.mllib.clustering import KMeans as KMeans_mllib


#nltk imports
import nltk
from nltk.tokenize.casual import TweetTokenizer
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

import re

import json

import random

import numpy as np

import time

## Downloads

To be Downloaded:
  - Twitter Dataset
  - Stopwords

This is a run once only step to get the labeled twitter data and stopwords.


After this manual processing is done on the json file to convert to 
.txt files that can be read and processed by pyspark.

We will include the downloaded files in the zip folder for the project.

In [None]:
r"""
twitter_samples = nltk.download('twitter_samples',download_dir='./')

"""

"\ntwitter_samples = nltk.download('twitter_samples',download_dir='./')\n\n"

In [None]:
#nltk.download('stopwords',download_dir='/root/nltk_data')

## Spark Starter

In [None]:
#spark context starter:
sc = SparkContext(appName="inModuleTask")

# Declaration/Initializations

Below all of the modules,constants, and variables to be used <br>
in a global manner across the code will be either declared or <br>
initiated.

In [None]:
#dataset variables and constants:
# - path constants:
POSITIVE_TWEETS_PATH = "./positive_tweets_short.txt"
NEGATIVE_TWEETS_PATH = "./negative_tweets_short.txt"
PATH_TO_ENGLISH = "./english"


# modules that will be used by the pre-processor
tokenizer_tweets = TweetTokenizer(reduce_len=True,strip_handles=True)
with open(PATH_TO_ENGLISH) as eng:
  engSet = set(eng)
engSet = set(map(lambda x: x[0:len(x)-1],engSet))
stp_words = engSet
porter = PorterStemmer()


# for feature extraction:
NUM_FEATURES = 1<<16 #this is the same as 2^N when 1<<N
NUM_WORDS_KEEP = 3

#clustering vars/constants:
NUM_CLUSTERS = 5
MAX_ITERATION = 30
NUM_TOPICS_PER_CLUSTER = 7

In [None]:
#print(stp_words)

# Gather Data

In [None]:
def getTextData(path):
  r"""
    Will get the (text) file at the given path and return
    an rdd.
  
  @param:
    - path : path to the file
  
  @return:
    - rdd : rdd of the textfile; separated by lines
  """
  dataRdd = sc.textFile(path)
  return dataRdd

# Pre-Processing

Steps in pre-processing:
  - For each line of text (= one tweet):
    - get rid of all non alphabetical characters
    - lowercase all words
    - get rid of stop words
  - Break each line into words
  - Convert each word to its root form, i.e stemming (through nltk)

### Pre-Processing Method

Will be plugged into pyspark map function for both
positive and negative tweets.

In [None]:
#pre-processor method, will be the map function for pyspark:
def preProcessTweet(tweet):
  r"""
    Does pre-processing of each tweet and returns a list
    of processed words.

    NOTE: Each line in input doc has to be a tweet for this to work.

    Steps taken by this method:
      - Retains only words in the tweets.
      - Gets rid of stop words
      - Stems each word using nltk library functions
  """

  #strip begining and ending whitespaces:
  tweet = tweet.strip()

  #tokenize the words based on nltk twitter tokenization:
  tokenized_tweet = tokenizer_tweets.tokenize(tweet)

  #keep only the words:
  wordRegex = r'^[a-zA-Z].*$'
  tokenized_tweet_onlyWords = []
  for s in tokenized_tweet:
    result = re.search(wordRegex,s)
    if result is not None:
      tokenized_tweet_onlyWords.append(result.string.lower())
  
  #get rid of links (NOTE: this is an approximation and may not cover everything):
  tokenized_tweet_onlyWords_noLinks = []
  linkRemovalRegex = r"^https?:.*$|^.*\..*$|^.*[@\.].*$"
  for s in tokenized_tweet_onlyWords:
    result = re.search(linkRemovalRegex,s)
    if result is None:
      tokenized_tweet_onlyWords_noLinks.append(s)

  #get rid of stopwords:
  tokenized_tweet_onlyWords_noStopwords = []
  for word in tokenized_tweet_onlyWords_noLinks:
    if word not in stp_words:
      tokenized_tweet_onlyWords_noStopwords.append(word)
  
  #stem the word:
  tokenizedTweet_onlyWords_noStopWords_stemmed = []
  for word in tokenized_tweet_onlyWords_noStopwords:
    tokenizedTweet_onlyWords_noStopWords_stemmed.append(porter.stem(word))
  
  
  toBeReturned = tokenizedTweet_onlyWords_noStopWords_stemmed

  #change all remaining words to lowercase:
  for i in range(len(toBeReturned)):
    toBeReturned[i] = toBeReturned[i].lower()

  return toBeReturned 


## Pre-Processing Abstraction

A method that calls the pre-processor and returns
the pre-processed rdd. This is only for the sake of cleaner code.

In [None]:
def doDataPreProcessing(dataRdd,numPartitions):
  r"""
    Given the rdd of a textfile, this method will
    return an rdd with pre-processed data. The
    pre-processing will take place using pyspark map function with 
    the preProcessTweet() method.

  @param:
    - dataRdd: rdd that contains the entire text data. This is the
               rdd generated by getTextData().
  
  @return:
    - processedRdd: rdd that contains processed tweets. The format for
                    each entry is a list of words relevant for each tweet.
  """

  if numPartitions is None:
    return dataRdd.map(preProcessTweet)
  
  return dataRdd.map(preProcessTweet)\
                .partitionBy(numPartitions)\
                .persist()

# Feature Extraction

Generate Bag of Words

In [None]:

def generateWordDict(table):
  r"""
    Creates a dictionary where the key values are indices and the 
    values are words that correspond to said indices. The indices are
    obatained from PySpark's HashingTF & IDF modules and correspond to 
    words in the document.

    This dictionary is created so we can interpret the results after KMeans
    runs. KMeans returns a vector of numbers which have to be converted to 
    the words they correspond to.

  @param:
    - table: the dataframe that has been processed by genBagOfWords.
  
  @return:
    - dictionary {index:word(s)}

  
  Note: The dataframe is converted to a RDD here so we can apply PySpark mapreduce
        functions on it.
  """

  # column names in the dataframe
  COL_TWEETS = 'tweet'
  COL_HASHINGTF_RESULTS = 'hashed'
  COL_FEATURES = "features"
  tableRdd = table.rdd
  
  #put index and corresponding word in a list
  def putWordInDict(row):
    wd = []
    words = row[COL_TWEETS]
    indicies = row[COL_HASHINGTF_RESULTS].indices
    values = row[COL_FEATURES].values
    for i in range(len(indicies)):
      wd.append((indicies[i],(values[i],words[i])))
    return wd
  
  # gets rid of the same words that appear twice or more
  def getRidOfDuplicateWords(wordList):
    toBeReturned = []
    for word in wordList:
      if len(toBeReturned)==0:
        toBeReturned.append(word)
      if word in toBeReturned:
        continue
      else:
        toBeReturned.append(word)
    return toBeReturned
      
  
  # sorts words by their weights
  def keepHighestValueWords(kvPair):
    key = kvPair[0]
    valueWordTupList = list(kvPair[1])
    aggDict = {}
    valList = []
    for vwTup in valueWordTupList:
      aggDict[vwTup[0]] = vwTup[1]
      valList.append(vwTup[0])
    sortedVals = np.sort(np.array(valList))[::-1]
    wordsList = []
    for i in sortedVals:
      wordsList.append(aggDict[i])
    
    return (key,wordsList)
    

  # mapreduce functions to apply the method defined above to the dataset
  wordIndices = tableRdd.flatMap(putWordInDict)\
                        .sortByKey()\
                        .groupByKey()\
                        .map(keepHighestValueWords)\
                        .map(lambda x: (x[0],getRidOfDuplicateWords(x[1])))

  #create an actual dictionary: Note -> this is a sequential operation and takes up the most time in this section
  wordDictionary = {}
  for indx in wordIndices.collect():
    wordList = []
    for word in indx[1]:
      wordList.append(word)
    wordDictionary[indx[0]] = wordList

  return wordDictionary


def genBagOfWords(preprocessedRdd):
  r"""
    Generates a bag of words model and word-index dictionary from a given data rdd.
    The rdd has to be pre-processed already, so it must be the output from the
    doDataPreProcessing() function.

  @param: 
    - preprocessedRdd : an rdd that contains the pre-processed data. This rdd should be
                the one obtained from txtDataRDD.map(preProcessTweet)

  @return:
    - tuple:
        - [0] : tweets_bagOfWords_DF : dataframe
        - [1] : wordDic : dictonary of indices to words
            - format : {index(int):words(list)}
  
  @description:
    This method creates a sql dataframe using the data from the rdd. Then uses
    the that table with the HashingTF and IDF classes to create the bag of words
    model. HashingTF and IDF can take in dataframe objects and expand them with
    their outputs. From the final dataframe produced by the IDF class a dictionary
    of indices with their respective words. The dictionary will be used to interpret
    results later.

    Steps:
      - Create DataFrame that has columns index and tweet.
      
      - Do Term Frequency (TF) transformation using HashingTF to generate indices
        for each word(we had the indices for tweets before not words). HashingTF 
        will take the DataFrame created in 1st step and return tweet_tf_DF which 
        has all of the previous dataframe data with the hashed TF indices.

      - Do Inverse Document Frequency (IDF) transformation, this extracts feature
        data. Note this requires two calls to the IDF class to do (see pyspark IDF
        documentation). IDF takes the dataframe tweet_tf_DF produced by HashingTF
        as input and outputs tweets_bagOfWords_DF. Note: Bag of Words = TF-IDF
        transformation, a IDF transformation after TF transformation.

      - Create Dictionary of indices to their words using the generateWordsDict()
        method.

      - Return results
  """

  # spark session:
  spark_session = spark.sql.SparkSession.builder.appName("dfexp_inmeth").getOrCreate()

  dataList = preprocessedRdd.collect()

  #give each tweet/document an index:
  dataListIndexed = []
  for i in range(len(dataList)):
    dataListIndexed.append((i,dataList[i]))
  
  #create dataframe with the spark session spark_sess:
  tweet_DF = spark_session.createDataFrame(dataListIndexed,["index","tweet"])
  tweetHasher = HashingTF(inputCol="tweet",outputCol="hashed",numFeatures=NUM_FEATURES)
  tweet_tf_DF = tweetHasher.transform(tweet_DF)
  idfTransformer = IDF(inputCol="hashed",outputCol="features")
  idfFitter = idfTransformer.fit(tweet_tf_DF)
  tweets_bagOfWords_DF = idfFitter.transform(tweet_tf_DF) #note bagOfWords = tf-idf transformation

  #get the word dictionary:
  wordDic = generateWordDict(tweets_bagOfWords_DF)

  #return both the table and the word dictionary:
  return (tweets_bagOfWords_DF,wordDic)



# ML Algo Implementation

r"""
Call PySpark's KMeans algorithm, providing it the appropriate data. The results from the algorithm will be a vector of floats which will be converted to words
that represent the topics. The conversion will be done through the word index
dictionary created during the Feature Extraction section.
"""

In [None]:
def getTopicWords(clusterCenterList,indexWordDictionary):
  r"""
    Takes the indices produced by the KMeans algorithms and matches it
    with the corresponding word(s). These words will represent the
    topics and themes discovered by the KMeans algorithm.

  @param:
    - clusterCenterList : list of cluster centers
    - indexWordDictionary: dictionary that matches words with indices used by the KMenas algo; created during Feature Extraction phase.
  
  @return:
    - list of topics now with words instead of numbers(indices)
  """
  topicWordsAllCluster = []
  clusterIndex = 0
  for clusterCenter in clusterCenterList:
    topicWordsPerCluster = []
    print("Cluster " + str(clusterIndex+1) + " :")
    clusterIndex += 1

    featureArray = clusterCenter
    featureIndexMap = []
    
    for i in range(len(featureArray)):
      if featureArray[i] != 0 :
        featureIndexMap.append((i,featureArray[i]))
    descendingFeatureWeight = sorted(featureIndexMap,key=lambda p:p[1])
    descendingFeatureWeight.reverse()
    top5Terms = descendingFeatureWeight[0:NUM_TOPICS_PER_CLUSTER]

    for i in top5Terms:
      topicWordsPerCluster.append(indexWordDictionary[i[0]])
    
    print(topicWordsPerCluster)
    print()
    topicWordsAllCluster.append((clusterIndex,topicWordsPerCluster))
  return topicWordsAllCluster


def doKMeans(dataFrame,indexWordDictionary):
  r"""
    Carries out PySpark's KMeans algorithm. This method calls getTopicWords and
    to generate the words from topics and returns that result.

  @param:
    - dataFrame: table containing the features that will be fed into the algorithm
    - indexWordDictionary: dictionary with index as key and word(s) as value
  """
  kmeansFitter = KMeans(featuresCol="features",k=NUM_CLUSTERS,maxIter=MAX_ITERATION)
  kmeansModel = kmeansFitter.fit(dataFrame)
  return getTopicWords(kmeansModel.clusterCenters(),indexWordDictionary)

  

# Output Printer

Method to print output to a specified output directory

In [None]:
def writeOutput(path,topicClusters,durations):
  r"""
    Takes the topics clusters and duration dictionary
    and print them to the file defined by path.
  """
  with open(path,mode='a') as outFile:
    #write the topic clusters into the file
    topicsString = ""
    for tc in topicClusters:
      topicsString += "Cluster " + str(tc[0]) + ": \n{"
      for wordList in tc[1]:
        for word in wordList:
          topicsString += word + ","
      topicsString = topicsString[0:len(topicsString)-1]
      topicsString += "}\n\n"
    outFile.write(topicsString)
    
    #write the durations down to the file
    durationString = ""
    for d in durations:
      durationString += d + " : " + str(durations[d]) + ", "
    durationString = durationString[0:len(durationString)-2]
    outFile.write(durationString)
    outFile.write("\n\t\t\t---------------------------------------------------------------\n\n")
  

# Driver

Runs the ml pipeline and prints outputs

In [None]:
def clusteringPipeline(inputPath,outputPath,numClusters=None,numMaxIterations=None,numPartitions=None):
  r"""
    Calls each of the methods related to the machine learning
    pipeline and returns the result. If output path is not None then
    it will also write the output to a file.
  """
  if numClusters is not None:
    NUM_CLUSTERS = numClusters
  
  if numMaxIterations is not None:
    MAX_ITERATION = numMaxIterations

  dataRdd = getTextData(inputPath)

  preProcessingStartTime = time.time()
  processedRdd = doDataPreProcessing(dataRdd,None)
  preProcessingEndTime = time.time()
  
  featureExtractionStartTime = time.time()
  df,wordDic = genBagOfWords(processedRdd)
  featureExtractionEndTime = time.time()
  
  mlAlgoStartTime = time.time()
  topics = doKMeans(df,wordDic)
  mlAlgoEndTime = time.time()


  preProcessingDuration = preProcessingEndTime - preProcessingStartTime
  featureExtractionDuration = featureExtractionEndTime - featureExtractionStartTime
  mlAlgoDuration = mlAlgoEndTime - mlAlgoStartTime
  totalDuration = preProcessingDuration + featureExtractionDuration + mlAlgoDuration
  
  durationsDic = {"PreProcessing":preProcessingDuration,"FeatureExtraction":featureExtractionDuration,"MLAlgo":mlAlgoDuration,"TotalDuration":totalDuration}
  print(durationsDic)

  if outputPath is not None:
    writeOutput(outputPath,topics,durationsDic)

  return (df,wordDic,topics,durationsDic)

# Run Application

Here we call the clusteringPipelie to run the entire application and output the result. 

To run the application the only required input arguments are a path to the input
text file. Please follow the example(s) below to run the code, granted everything prior to this cell has been run.

In [None]:
x,y,t,dd = clusteringPipeline('./donald_trump.txt')


Cluster 1 :
[['support', 'geraldo'], ['great', 'america', 'work'], ['presid', 'blunt', 'economi', 'periscop'], ['thrive', 'america', 'hollywood'], ['make', 'great', 'announc'], ['tri', 'must', 'alway'], ['wonder', 'indiv', 'border', 'amp', 'great']]

Cluster 2 :
[['wonder', 'indiv', 'border', 'amp', 'great'], ['welcom', 'debat', 'presidenti'], ['tri', 'must', 'alway'], ['billion', 'countri', 'grow'], ['crime', 'pass', 'border', 'pour'], ['fake', 'berni', 'hillari'], ['neighbor', 'drug', 'fastbal', 'vote']]

Cluster 3 :
[['generos', 'win'], ['greatest', 'ago'], ['help'], ['washington'], ['thank'], ['adam'], ['unexpect']]

Cluster 4 :
[['like', 'great'], ['enjoy'], ['tonight'], ['p'], ['eastern'], ['realli', 'repres'], ['great', 'america', 'work']]

Cluster 5 :
[['get', 'florida'], ['christi', 'c'], ['appli', 'campaign', 'ivanka'], ['b', 'sent'], ['libra', 'use', 'need', 'busi'], ['realli', 'gonna'], ['c', 'murder']]

{'PreProcessing': 0.0003159046173095703, 'FeatureExtraction': 28.76021

In [None]:
_,_,_,_ = clusteringPipeline('./joe_biden.txt','./joe_biden_output.txt',numMaxIterations=50)

Cluster 1 :
[['presid', 'give'], ['trump', 'unit'], ['retir', 'biden', "vp'"], ['safe', 'vp'], ['love', 'tax'], ['mark', 'speak'], ['peopl', 'back', 'young']]

Cluster 2 :
[['well'], ['care', 'rescu'], ['led'], ['barack', 'charact'], ['retir', 'biden', "vp'"], ['key', 'show', 'keep'], ['rememb', 'choic']]

Cluster 3 :
[['background'], ['administr'], ['close'], ['protect'], ['incompet', 'profoundli'], ['everi'], ['secur']]

Cluster 4 :
[['donald', 'vote', 'campaign'], ['worker', 'make', 'rule'], ['backlog', 'everi', 'almost'], ['instead', 'vote', 'vp', 'support'], ['long', 'refurbish'], ['fight', 'stay'], ['vote', 'today']]

Cluster 5 :
[['restor', 'solv'], ['attempt', 'sunday'], ['urgent'], ['never'], ['treatment', 'veteran'], ['war', 'sacr'], ['retir', 'biden', "vp'"]]

{'PreProcessing': 0.0003025531768798828, 'FeatureExtraction': 11.328802108764648, 'MLAlgo': 8.685641050338745, 'TotalDuration': 20.014745712280273}


In [None]:
_,_,_,_ = clusteringPipeline('./joe_biden.txt','./joe_biden_output.txt',numMaxIterations=50)

Cluster 1 :
[['presid', 'give'], ['trump', 'unit'], ['retir', 'biden', "vp'"], ['safe', 'vp'], ['women', 'came', 'nh'], ['donald', 'vote', 'campaign'], ['worker', 'make', 'rule']]

Cluster 2 :
[['letter', 'notic'], ['ask'], ['vp'], ['then-sen'], ['senat'], ['team', 'week'], ['ice']]

Cluster 3 :
[["can't", 'econom'], ['need'], ['wh'], ['mccain'], ['info'], ['right'], ['balanc']]

Cluster 4 :
[['educ', 'white', 'student', 'health'], ['expand', 'student', 'cost'], ['capabl', 'affect', 'attend'], ['work', 'yesterday', 'would'], ['firsthand', 'get'], ["we'r", 'biden'], ['law', 'pay', 'rate']]

Cluster 5 :
[['gun', 'republican', 'live'], ['form', 'today'], ['epidem', 'forget'], ['quarterli', 'week'], ['issu', 'pass'], ['suppos'], ['louis', 'b', 'violenc']]

{'PreProcessing': 0.0003197193145751953, 'FeatureExtraction': 11.101372957229614, 'MLAlgo': 7.724670886993408, 'TotalDuration': 18.826363563537598}


# Spark Stopper

After finishing any tasks within this notebook and<br>
if spark context was started (sc = SparkContext()) then<br>
make sure to run the following.

In [None]:
sc.stop()