In [1]:
import os
import atexit
import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob
import pandas
from pyspark.sql import SparkSession

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="3:00" #60 min 
os.environ['SBATCH_PARTITION']='lattice' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

try:
    print('Cleaning up Spark Job')
    sj.stop()
except:
    pass

sj.wait_to_start()

try:
    print('Cleaning up Spark Context')
    sc.stop()
except:
    pass

sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)
print("RUNNING")

INFO:sparkhpc.sparkjob:Submitted batch job 636255



Cleaning up Spark Job


INFO:sparkhpc.sparkjob:Submitted cluster 0


Cleaning up Spark Context
RUNNING


In [67]:
import json
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
from collections import defaultdict
from datetime import datetime

from nltk.tokenize import RegexpTokenizer
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
import re

In [68]:
##RELEVANCE PREPROCESSING
#remove stop words
stop_words = {'elonmusk', 'tesla', 'get', 'at_tesla', 'at_elonmusk'}
tokenizer = RegexpTokenizer(r'\w+') # remove punctuation and keep only words

def remove_stop(tweet: str):
    words = []
    tokens = tokenizer.tokenize(tweet)
    for token in tokens:
        if token not in stop_words:
            words.append(token)
    return ' '.join(words)

def process_tweet(tweet: str):
    tweet = tweet.lower() # convert to lowercase
    tweet = re.sub('\s+', ' ', tweet) # remove multiple whitespace
    tweet = remove_stop(tweet)
    tweet.strip() # remove excess leading and trailing whitespace
    return tweet

def parse_then_process(tweetObject: str):
    item = json.loads(tweetObject) # read the tweet object
    item['text'] = process_tweet(item['text']) # process the tweet text
    return json.dumps(item)

In [69]:
##SENTIMENT PREPROCESSING
stop_words_sent = {'elonmusk', 'tesla', 'get', 'at_tesla', 'at_elonmusk'}
tokenizer_sent = RegexpTokenizer(r'\w+') # remove punctuation and keep only words

def remove_stop_sent(tweet: str):
    words = []
    tokens = tokenizer_sent.tokenize(tweet)
    for token in tokens:
        if token not in stop_words_sent:
            words.append(token)
    return ' '.join(words)

def process_tweet_sent(tweet: str):
    tweet = tweet.lower() # convert to lowercase
    tweet = re.sub('\s+', ' ', tweet) # remove multiple whitespace
    tweet = remove_stop_sent(tweet)
    tweet.strip() # remove excess leading and trailing whitespace
    return tweet

def parse_then_process_sent(tweetObject: str):
    item = json.loads(tweetObject) # read the tweet object
    item['text'] = process_tweet_sent(item['text']) # process the tweet text
    return json.dumps(item)

In [70]:
#Load in labeled tweets
rddLabeled = sc.textFile("../Data/CombinedLabeled.json")

In [71]:
#Extract text and relevance label
def getRelLabel(line):
    jline = json.loads(line)
    return (jline['text'], int(jline['isRelevant']))
##Preprocess text for relevance classifier
rddRelProcessed = rddLabeled.map(parse_then_process)
relLabels = rddRelProcessed.map(getRelLabel)
firstLabel = relLabels.first()
print("Text: ", firstLabel[0], "\nRelevance Label:", firstLabel[1])

Text:  lileeny those are destination chargers and charge most normal cars at 7kw the super charger network c https t co dponggcdnn 
Relevance Label: 1


In [72]:
##Fit relevant classification model
pipeRelevant = Pipeline([('vect', CountVectorizer()),
                         ('tfidf', TfidfTransformer()),
                         ('mnb', MultinomialNB(fit_prior=False)),])
relData = relLabels.collect()
trainText = [i[0] for i in relData]
trainLabel = [i[1] for i in relData]
modelRelevant = pipeRelevant.fit(trainText, trainLabel)

In [73]:
#Extract text and sentiment label
#0 = negative, 1 = neutral, 2 = positive
def isRelevant(line):
    jline = json.loads(line)
    if int(jline['isRelevant']):
        return True
    return False

def getSentLabel(line):
    jline = json.loads(line)
    return (jline['text'], int(jline['isRelevant']))

##Preprocess text for relevance classifier
rddRelevant = rddRelProcessed.filter(isRelevant)
rddSentProcessed = rddRelevant.map(parse_then_process_sent)

sentLabels = rddLabeled.map(getSentLabel)
firstLabel = sentLabels.first()
print("Text: ", firstLabel[0], "\nSentiment Label:", firstLabel[1])

Text:  @lileeny @elonmusk Those are destination chargers and charge most “normal cars” at 7KW. The super charger network c… https://t.co/DPoNgGCDnN 
Sentiment Label: 1


In [74]:
##Fit sentiment classification model
pipeSentiment = Pipeline([('vect', CountVectorizer()),
                          ('tfidf', TfidfTransformer()),
                          ('mnb', MultinomialNB(fit_prior=False)),])
sentData = sentLabels.collect()
trainText = [i[0] for i in sentData]
trainLabel = [i[1] for i in sentData]
modelSentiment = pipeSentiment.fit(trainText, trainLabel)

In [75]:
#Load in full filtered tweet set
rddFull = sc.textFile("../Data/allFiltered4.json")

In [76]:
##Process and extract text from set
def getText(line):
    jline = json.loads(line)
    return jline['text']
rddRelProcessed = rddFull.map(parse_then_process)

In [77]:
#Make predictions on relevant tweets
def classifyRelevance(line):
    jline = json.loads(line)
    label = int(modelRelevant.predict((jline['text'],))[0])
    jline['isRelevant'] = label
    return json.dumps(jline)

rddRelClassified = rddRelProcessed.map(classifyRelevance)

In [78]:
#Filter out irrelevant tweets
rddRelevant = rddRelClassified.filter(isRelevant)
print("Number of relevant tweets: ", rddRelevant.count())
#Pre process tweets for the classifier
rddSentProcessed = rddRelevant.map(parse_then_process_sent)

Number of relevant tweets:  10206


In [79]:
#Make sentiment predictions on relevant tweets
def classifySentiment(line):
    jline = json.loads(line)
    label = int(modelSentiment.predict((jline['text'],))[0])
    jline['sentiment'] = label
    return json.dumps(jline)

rddSentClassified = rddSentProcessed.map(classifySentiment)

In [80]:
##Take the timestamp and sentiment labels from the rdd
def extractSentimentAndTimestamp(line):
    jline = json.loads(line)
    sentiment = int(jline['sentiment'])
    timestamp = int(jline['timestamp_ms'])
    return (timestamp, sentiment)

timeSent = rddSentClassified.map(extractSentimentAndTimestamp).collect()

In [81]:
##Check that output is correct
for i in range(10):
    print(timeSent[i])

(1546372267107, 1)
(1546372485007, 1)
(1546372687470, 1)
(1546373494329, 1)
(1546373635157, 1)
(1546374029442, 1)
(1546374526952, 1)
(1546374602109, 1)
(1546375521745, 1)
(1546375739216, 1)


In [82]:
##convert timestamp to date
def convertMsToDate(timestamp):
    return datetime.utcfromtimestamp(int(timestamp)/1000).strftime('%Y-%m-%d')
dfSent = pd.DataFrame(timeSent, columns=['timestamp_ms', 'sentiment'])
dfSent['date'] = dfSent['timestamp_ms'].apply(convertMsToDate)
dfSent = dfSent.drop(['timestamp_ms'], axis = 1)
print(dfSent.head())
print(set(dfSent['sentiment']))

   sentiment        date
0          1  2019-01-01
1          1  2019-01-01
2          1  2019-01-01
3          1  2019-01-01
4          1  2019-01-01
{0, 1}


In [88]:
sentSums = defaultdict(int)
for index, row in dfSent.iterrows():
    sentSums[row['date']] += row['sentiment']

dfDateSent = pd.DataFrame(sentSums.items())#, columns=['Date', 'Sentiment Sum'])
#dfDateSent = dfDateSent.sort_values(by=['Date'])

print(dfDateSent.head())

ValueError: DataFrame constructor not properly called!

In [87]:
sentSums.items()

dict_items([('2019-01-01', 234), ('2019-01-02', 296), ('2019-01-03', 322), ('2019-01-04', 57), ('2018-12-29', 555), ('2018-12-30', 298), ('2018-12-23', 337), ('2018-12-24', 356), ('2019-01-05', 254), ('2019-01-06', 263), ('2019-01-07', 378), ('2019-01-08', 301), ('2019-01-12', 204), ('2019-01-13', 211), ('2019-01-14', 274), ('2019-01-15', 344), ('2019-01-16', 273), ('2019-01-17', 356), ('2019-01-09', 346), ('2019-01-10', 340), ('2018-12-31', 258), ('2019-01-30', 105), ('2019-01-31', 316), ('2019-02-01', 281), ('2019-02-02', 274), ('2019-02-03', 237), ('2018-12-25', 271), ('2018-12-26', 378), ('2018-12-27', 369), ('2018-12-28', 394), ('2019-01-18', 345), ('2019-01-19', 306), ('2019-01-20', 191), ('2019-01-21', 183), ('2019-01-22', 236), ('2019-01-23', 11)])

In [None]:
#Read in stock prices
dfStock = pd.read_csv('../Data/tsla_stock.csv')
dfStock = dfStock.drop(['Open', 'High', 'Low', 'Adj Close', 'Volume'], axis = 1)
dfStock = dfStock.rename(index=str, columns={"Close": "Stock Price"})
print(dfStock.head())

In [None]:
#Merge sentiment dataframe with stock price dataframe
dfMerged = dfDateSent.merge(dfStock, how='left', on='Date')
dfMerged = dfMerged.set_index('Date')
print(dfMerged.head())

In [None]:
%matplotlib inline
dfMerged.plot(rot=-90)