In [None]:
import os
import sys
from pyspark.sql import SparkSession

'''
Purpose: Use the prebuilt Vader Sentiment Model from the nltk library to assign sentiment labels to tweets or 
Reddit posts as they are streamed through Spark. This is the implementation we ultimately used.
'''

spark = SparkSession.builder \
         .appName("spark-nltk") \
         .getOrCreate()
 
data = spark.sparkContext.textFile('test.txt')
 
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sent_analyzer = SentimentIntensityAnalyzer()
        
def assign_sentiment(x):
    return dict(sent_analyzer.polarity_scores(x))
 
score = data.map(assign_sentiment)
print(score.collect())

In [None]:
import re
import nltk
import contractions
from nltk.corpus import stopwords
import pandas as pd

'''
Purpose: Build a custom machine learning model with PySpark for analyzing sentiment from the Twitter 140 traning set.
This idea was ultimately abandoned for the one above. 
'''

df = pd.read_csv('/Users/mchifala/atls-5214-project/Big_Sentiment/utility/twitter_sentiment_training_set.csv', 
            header = None, 
            encoding = 'ISO-8859-1', 
            names = ['label', 'id', 'date', '?', 'author', 'text'])     

df['words'] = df['text'].apply(lambda x: re.sub(r'http\S+|@\S+|[?|$|.|!|,|;|:|_|-|*|~|/|\|+|=|(|)|#]','',x))
df['words'] = df['words'].apply(lambda x: x.lower().split())

stop = ['i', "i'm", "i've", 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", 
        "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 
        'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 
        'which', 'who', 'whom', 'this', "that's", 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 
        'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 
        'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 
        'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 
        'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how']

df['words'] = df['words'].apply(lambda x: [word for word in x if word not in stop])
df['clean_text'] = df['words'].apply(lambda x: ' '.join(x))

df = df.drop(['id', 'date', '?', 'author', 'text', 'words'], axis = 'columns')
#df = df.drop('words', axis = 'columns')
df.to_csv('clean_twitter_sentiment_training_set.csv', encoding = 'utf-8', index = False)



In [None]:
df['full_compound_sentiment'] = df['full_sentiment'].apply(lambda x: x['compound'])
df.head(1000000)

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName('Sentiment').getOrCreate()

tweetDF = spark.read\
        .option("encoding", "UTF-8")\
        .csv('/Users/mchifala/atls-5214-project/Big_Sentiment/utility/clean_twitter_sentiment_training_set.csv', header=True, inferSchema=True)

tweetDF.dropna()
tweetDF.show(20, False)

tweetDF.dropna()

(train_set, test_set) = tweetDF.randomSplit([0.9, 0.1], seed = 5)

train_set = train_set.filter(train_set.clean_text.isNotNull())
train_set.show(5000)
print(test_set.count())
tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=3)

train_set.show()
pipeline = Pipeline(stages=[tokenizer])#, hashtf])#, idf])
pipelineFit = pipeline.fit(train_set)
train_set = pipelineFit.transform(train_set)

train_set.show()
