### Subject: MA5851 - Data Science Master Class I
### Author: Hendrik A. Dreyer
### Due Date: 3 December 2019

#### This file contains the step to processing the Hacker News comments that are associated to the daily top 30 stories as posted during August 2019

In [None]:
import os

script_dir = os.getcwd()  #<-- absolute dir the script is in
rel_path = "..\data\HN_BigQ_Results_Top30_Story_Comments_Aug_2019.csv"
abs_file_path = os.path.join(script_dir, rel_path)

#Load the post process data from Assessment 3 into a pandas dataframe
import pandas as pd

# Control delimiters, rows, column names with read_csv (see later) 
data = pd.read_csv(abs_file_path) 

# Preview the first 5 lines of the loaded data 
data.head()

In [None]:
#Shape of the loaded post-procssed data
data.shape

In [None]:
#Initiate spark
import findspark
findspark.init()

In [None]:
#Import the neccessary libs
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [None]:
#Create a spark session
spark = SparkSession.builder.master('local[2]').appName('HN_Story_Comments').getOrCreate()

In [None]:
#Specify a structure onto which the RDD will be framed
schema = StructType([
    StructField("comment_date", StringType(), True),
    StructField("comment",      StringType(), True)])

In [None]:
# Set the relative path to the pst process csv file
spark_file_path = abs_file_path

#Format the csv file for RDD ingestion
data = spark.read.format("org.apache.spark.csv")\
                        .option("delimiter",",")\
                        .schema(schema)\
                        .option("mode", "PERMISSIVE")\
                        .option("inferSchema", "True")\
                        .csv(spark_file_path)

In [None]:
#Have a quick peak
data.show()

In [None]:
#Filter the link_comment field and place int an RDD
link_comment_rdd = data.select("comment").rdd.flatMap(lambda x: x)

In [None]:
link_comment_rdd.collect()

In [None]:
#Remove the header from the rdd
hn_header = link_comment_rdd.first()
link_comments = link_comment_rdd.filter(lambda row: row != hn_header)

link_comments.collect()

In [None]:
#Convert the comments to lower case
link_comments_lower = link_comments.map(lambda x : x.lower() if x is not None else x)

link_comments_lower.collect()

The following section addresses the preparation of the corpus for the application of NLP techniques.  
Each sentence in the corpus will be tokenized, follwed by the tokenization of each word in each tokenized sentence.

In [None]:
# Load the neccessary nltk libs before we start applying some NLP techniques
import nltk
from   nltk.corpus import stopwords
from   nltk.stem   import WordNetLemmatizer

In [None]:
#Make sure we take out all of the null values and empties in the RDD
link_comments_lower_full = link_comments_lower.filter(lambda x: x is not None).filter(lambda x: x != "")

In [None]:
#Tokenize the comment sentences and display
def SententceTokenizer(x):
    return nltk.sent_tokenize(x)

#Apply function
link_comments_tokenize = link_comments_lower_full.map(SententceTokenizer)

#Have a peak
link_comments_tokenize.collect()

In [None]:
#Tokenize each word in the each title sentence 
def WordTokenizer(x):
    token_words = [word for line in x for word in line.split()]
    return token_words

link_comments_tokenize_word = link_comments_tokenize.map(WordTokenizer)

#Have another peal
link_comments_tokenize_word.collect()

In [None]:
import string
def removePunctuationsFunct(x):
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    return filtered
#link_titles_tokenize_word
#link_titles_punct = link_titles_clean.map(removePunctuationsFunct)
link_comments_punct = link_comments_tokenize_word.map(removePunctuationsFunct)
link_comments_punct.collect()

In [None]:
#Now we apply lemmatization 
def lemmatizationFunct(x):
    nltk.download('wordnet')
    lemmatizer = WordNetLemmatizer()
    finalLem = [lemmatizer.lemmatize(s) for s in x]
    return finalLem

link_comments_lem = link_comments_punct.map(lemmatizationFunct)
link_comments_lem.collect()

In [None]:
#Join the tokens together
def JoinTokensTogether(val):
    final_token_list = []
    val = " ".join(val)
    return val

link_comments_join = link_comments_lem.map(JoinTokensTogether)
link_comments_join.collect()

In [None]:
#Extract all the noun phrases
def ExtractAllPhrases(x):
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words('english'))    
    
    def leaves(tree):
        """Finds NP (nounphrase) leaf nodes of a chunk tree."""
        for subtree in tree.subtrees(filter = lambda t: t.label()=='NP'):
            yield subtree.leaves()
    
    def get_terms(tree):
        for leaf in leaves(tree):
            term = [w for w,t in leaf if not w in stop_words]
            yield term
    
    #Setup regular expression 
    sentence_regexp = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
   
    #Specify grammar rules
    grammar_rule = r"""
    NBAR:
        {<NN.*|JJ>*<NN.*>}  # Nouns and Adjectives, terminated with Nouns
        
    NP:
        {<NBAR>}
        {<NBAR><IN><NBAR>}  # Above, connected with in/of/etc...
    """
    
    #Apply regexp parser
    #Apply shallow parsing in order to give more context to the sentence
    chunker = nltk.RegexpParser(grammar_rule)
    tokens = nltk.regexp_tokenize(x,sentence_regexp)
    
    #Apply part of speech tagging
    #Apply nous, verbs, adjectives, etc. 
    postoks = nltk.tag.pos_tag(tokens)  
    
    # Get a chink from the chunk
    tree = chunker.parse(postoks) 
    terms = get_terms(tree)
    temp_phrases = []
    for term in terms:
        if len(term):
            temp_phrases.append(' '.join(term))
            
    #Get rid of the empty list
    finalPhrase = [w for w in temp_phrases if w]
    return finalPhrase


link_comments_phrases = link_comments_join.map(ExtractAllPhrases)
link_comments_phrases.collect()

In [None]:
#To determine phrase sentiment values we'll use the “Valence Aware Dictionary and sEntiment Reasoner” - a.k.a VADER.
import nltk
nltk.download('vader_lexicon')

def DetermineWordsSentiment(x):
    from nltk.sentiment.vader import SentimentIntensityAnalyzer
    analyzer = SentimentIntensityAnalyzer() 
    senti_list_temp = []    
    for i in x:
        y = ''.join(i) 
        vs = analyzer.polarity_scores(y)
        senti_list_temp.append((y, vs))
        senti_list_temp = [w for w in senti_list_temp if w]    
    sentiment_list  = []
    for j in senti_list_temp:
        first = j[0]
        second = j[1]
        for (k,v) in second.items():
            if k == 'compound':
                if v < 0.0:
                    sentiment_list.append((first, "Negative"))
                elif v == 0.0:
                    sentiment_list.append((first, "Neutral"))
                else:
                    sentiment_list.append((first, "Positive"))     
    return sentiment_list

sentimentRDD = link_comments_phrases.map(DetermineWordsSentiment)

sentimentRDD.collect()

In [None]:
#Seperate out negative and positive sentiments
def ExtractSentimentType(x,sentiment_type):
    target_words = []
    for items in x:
        if len(items)>0:
            if items[len(items)-1]== str(sentiment_type):
                target_words.append(" ".join(items[:len(items)-1]))
    return target_words

neg_sentiments = lambda x: ExtractSentimentType(x, "Negative")
pos_sentiments = lambda x: ExtractSentimentType(x, "Positive")
neg_link_titles_sentiment = sentimentRDD.map(neg_sentiments)
pos_link_titles_sentiment = sentimentRDD.map(pos_sentiments)

In [None]:
# Now let's extract the top 100 positive keywords from the extracted key phrases.
freq_pos_link_titles = pos_link_titles_sentiment.flatMap(lambda x : nltk.FreqDist(x)\
                                            .most_common())\
                                            .map(lambda x: x)\
                                            .reduceByKey(lambda x,y : x+y)\
                                            .sortBy(lambda x: x[1], ascending = False)
freq_pos_link_titles.take(100)

In [None]:
#converting RDD to spark dataframe
df_fDist = freq_pos_link_titles.toDF()

df_fDist.createOrReplaceTempView("myTable") 

#renaming columns 
df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20") 

 #converting spark dataframes to pandas dataframes
pandD = df2.toPandas()
pandD.plot.barh(x='Keywords', y='Frequency', rot=1, figsize=(10,8))

In [None]:
# Now let's extract the top 100 negative keywords from the extracted key phrases.
freq_neg_link_titles = neg_link_titles_sentiment.flatMap(lambda x : nltk.FreqDist(x)\
                                            .most_common())\
                                            .map(lambda x: x)\
                                            .reduceByKey(lambda x,y : x+y)\
                                            .sortBy(lambda x: x[1], ascending = False)
freq_neg_link_titles.take(100)

In [None]:
#converting RDD to spark dataframe
df_fDist = freq_neg_link_titles.toDF()

df_fDist.createOrReplaceTempView("myTable") 

#renaming columns 
df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20") 

 #converting spark dataframes to pandas dataframes
pandD = df2.toPandas()
pandD.plot.barh(x='Keywords', y='Frequency', rot=1, figsize=(10,8))