# Sentiment Analysis with Pyspark

In [1]:
import re
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import string
# %load_ext autotime

In [11]:
import pyspark as ps
# Import Spark SQL related packages
from pyspark.sql import functions as F
from pyspark.sql import SparkSession 
from pyspark.sql.types import FloatType, StringType

# Import TextBlob packages for Sentiment Analysis
from textblob import TextBlob


conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.driver.memory", "15g")
conf.set("spark.executor.heartbeatInterval","3600s")

<pyspark.conf.SparkConf at 0x11c329d90>

In [3]:
spark = SparkSession\
        .builder\
        .appName('final')\
        .getOrCreate()

# load the comments data

In [4]:
comments = spark.read.format("csv").option("header", "false").load("comments.csv")

In [5]:
comments = comments.withColumnRenamed("_c0", "Comment ID")\
        .withColumnRenamed("_c1", "Post ID")\
        .withColumnRenamed("_c2", "Content")\
        .withColumnRenamed("_c3", "Author")\
        .withColumnRenamed("_c4", "Date")\
        .withColumnRenamed("_c5", "Vote")

In [6]:
comments.show(5)

+----------+-------+--------------------+--------------+----------+----+
|Comment ID|Post ID|             Content|        Author|      Date|Vote|
+----------+-------+--------------------+--------------+----------+----+
|         1|      1|Seemed to work fine.|       BJ Cook|2010-04-01|   1|
|         2|      1|I tried this the ...|  Blitz Surfer|2010-04-01|   1|
|         3|      1|A lot of twitter ...|Nischal Shetty|2010-04-01|   1|
|         4|      1|Worked a minute a...|   Jonah Grant|2010-04-01|   1|
|         5|      1|Yep. At 10:33pm p...| Joshua Guffey|2010-04-01|   1|
+----------+-------+--------------------+--------------+----------+----+
only showing top 5 rows



In [7]:
comments = comments.dropna()

In [8]:
comments.count()

745866

# Text Preprocessing

### fixed abbreviation

In [12]:
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

fix_abbreviation_udf = F.udf(fix_abbreviation, StringType())

In [13]:
comments = comments.withColumn('fixed_abbrev',fix_abbreviation_udf(comments['Content']))
comments.show(5)

+----------+-------+--------------------+--------------+----------+----+--------------------+
|Comment ID|Post ID|             Content|        Author|      Date|Vote|        fixed_abbrev|
+----------+-------+--------------------+--------------+----------+----+--------------------+
|         1|      1|Seemed to work fine.|       BJ Cook|2010-04-01|   1|seemed to work fine.|
|         2|      1|I tried this the ...|  Blitz Surfer|2010-04-01|   1|i tried this the ...|
|         3|      1|A lot of twitter ...|Nischal Shetty|2010-04-01|   1|a lot of twitter ...|
|         4|      1|Worked a minute a...|   Jonah Grant|2010-04-01|   1|worked a minute a...|
|         5|      1|Yep. At 10:33pm p...| Joshua Guffey|2010-04-01|   1|yep. at 10:33pm p...|
+----------+-------+--------------------+--------------+----------+----+--------------------+
only showing top 5 rows



### remove irrelevant features

In [14]:
def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned_str.split())

remove_features_udf = F.udf(remove_features, StringType())

In [16]:
comments = comments.withColumn('removed',remove_features_udf(comments['Content']))
comments.show(5, True)

+----------+-------+--------------------+--------------+----------+----+--------------------+--------------------+
|Comment ID|Post ID|             Content|        Author|      Date|Vote|        fixed_abbrev|             removed|
+----------+-------+--------------------+--------------+----------+----+--------------------+--------------------+
|         1|      1|Seemed to work fine.|       BJ Cook|2010-04-01|   1|seemed to work fine.| seemed to work fine|
|         2|      1|I tried this the ...|  Blitz Surfer|2010-04-01|   1|i tried this the ...|tried this the fi...|
|         3|      1|A lot of twitter ...|Nischal Shetty|2010-04-01|   1|a lot of twitter ...|lot of twitter ap...|
|         4|      1|Worked a minute a...|   Jonah Grant|2010-04-01|   1|worked a minute a...|worked minute ago...|
|         5|      1|Yep. At 10:33pm p...| Joshua Guffey|2010-04-01|   1|yep. at 10:33pm p...|yep at pm pst it ...|
+----------+-------+--------------------+--------------+----------+----+--------

### Sentiment Analysis main function

In [18]:
def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

sentiment_analysis_udf = F.udf(sentiment_analysis , FloatType())

In [19]:
comments  = comments.withColumn("sentiment_score", sentiment_analysis_udf( comments['fixed_abbrev'] ))
comments = comments.drop('fixed_abbrev').drop('removed')
comments.show(5,True)

+----------+-------+--------------------+--------------+----------+----+---------------+
|Comment ID|Post ID|             Content|        Author|      Date|Vote|sentiment_score|
+----------+-------+--------------------+--------------+----------+----+---------------+
|         1|      1|Seemed to work fine.|       BJ Cook|2010-04-01|   1|     0.41666666|
|         2|      1|I tried this the ...|  Blitz Surfer|2010-04-01|   1|          0.125|
|         3|      1|A lot of twitter ...|Nischal Shetty|2010-04-01|   1|            0.5|
|         4|      1|Worked a minute a...|   Jonah Grant|2010-04-01|   1|            0.0|
|         5|      1|Yep. At 10:33pm p...| Joshua Guffey|2010-04-01|   1|     0.20888889|
+----------+-------+--------------------+--------------+----------+----+---------------+
only showing top 5 rows



### Get the Average Sentiment Score for each Post

In [23]:
post_sentiment= comments.groupBy(['Post ID']).agg(F.avg('sentiment_score').alias('avg_senti_score'))

In [24]:
post_sentiment.show(5)

+-------+-------------------+
|Post ID|    avg_senti_score|
+-------+-------------------+
|    675| 0.0335309744157173|
|   5925|0.11772260677231394|
|    691|0.11745779495686293|
|   6194| 0.1919174811582228|
|   6240|0.13226061725081542|
+-------+-------------------+
only showing top 5 rows



In [25]:
post_sentiment.toPandas().to_csv('post_sentiment.csv')