In [None]:
#install tweepy for streaming
#! pip install tweepy

#install and set up google.cloud for translation
#!pip install google.cloud

## Stream tweets

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from time import gmtime, strftime
from pyspark.ml.feature import Tokenizer
import numpy as np 
import time
from time import gmtime, strftime
import pandas as pd
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import style
from pyspark.sql import Row
import argparse
from google.cloud import translate
import six
import warnings; warnings.simplefilter('ignore')
import pytz
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
OneHotEncoder,StringIndexer)
from pyspark.ml.regression import LinearRegression

In [None]:
#ensure spark context is running
sc.version

In [None]:
#ensure sql context is running
sqlContext

In [None]:
# Lazily instantiated global instance of SQLContext
def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SparkSession.builder.master("local[*]")\
        .appName("appName").config("spark.sql.warehouse.dir", "./spark-warehouse").getOrCreate()
    return globals()['sqlContextSingletonInstance']

In [None]:
#initiate the streaming context
ssc = StreamingContext(sc, 10 )

#set the socket stream to connect to the tweetread file using the machine and port number
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)

In [None]:
#set the batch window
lines = socket_stream.window( 20 )

In [None]:
#create a function to convert tweet time into easter time zone
def get_time_zone():
    tz = pytz.timezone('US/Eastern')
    est_now = datetime.now(tz)
    est=est_now.strftime('%Y-%m-%d %H:%M')
    return est

In [None]:
#call the above functions on each Dstream of the rdd to map each tweet, set time zone, and write into a hive table
( lines.map( lambda word: ( word.lower(), 1 ) ) 
  .map( lambda r: Row(tweet=r[0], timeTweet = get_time_zone() ))
  .foreachRDD(lambda rdd: rdd.toDF().write.mode("append").saveAsTable("tweets"))
)

In [None]:
#initialize the streaming by calling the tweepy over the designated port
ssc.start()

## Stock Market Data

In [None]:
from urllib2 import Request, urlopen
import json
import pandas as pd
from pandas.io.json import json_normalize
import time
from time import sleep

request=Request("https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=AAPL&interval=1min&apikey=GE5WPJIU1LE7WVC8")
response = urlopen(request)
elevations = response.read()
data = json.loads(elevations)
final_data = pd.DataFrame(json_normalize(data).T)


#retrieve stock data through url request
for i in range(121):
    request=Request("""https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol
                    =AAPL&interval=1min&apikey=GE5WPJIU1LE7WVC8""")
    response = urlopen(request)
    elevations = response.read()
    data = json.loads(elevations)
    data_frame = pd.DataFrame(json_normalize(data).T)
    
    frames = [final_data, data_frame]
    final_data = pd.concat(frames)
    print str(i)
    #set timer to get one price every minute
    time.sleep(60)

#format dataframe
stock_data=final_data.iloc[6:]
stock_data.columns = ['close']
stock_data.index.name = 'timeTweet'
stock_data.reset_index(inplace=True)
stock_data=stock_data[stock_data['timeTweet'].str.contains("close")]
stock_data['timeTweet']=stock_data['timeTweet'].str[19:35]
stock_data['timeTweet']=pd.to_datetime(stock_data['timeTweet'],infer_datetime_format=True)


## Check Tables

In [None]:
sqlContext.sql("show tables").toPandas()

In [None]:
#check table format
sqlContext.sql("select * from tweets limit 10").toPandas()

In [None]:
#check tweet count
sqlContext.sql("select count(*) from tweets").show()

In [None]:
#sqlContext.sql("drop table tweets")

## Translate text

In [None]:
def translate_text(text):
    """Translates text into the target language.

    Target must be an ISO 639-1 language code.
    See https://g.co/cloud/translate/v2/translate-reference#supported_languages
    """
    translate_client = translate.Client()

    if isinstance(text, six.binary_type):
        try:
            text = text.decode("utf-8")
            result = translate_client.translate(text, target_language="en")
            output = result['translatedText']
        except: 
            output = "error"
    else:
        result = translate_client.translate(text, target_language="en")
        output = result['translatedText']
    time.sleep(1)
    return output

## Calculate Sentiment

In [None]:
#read in the positive and negative word lists
positive_words = Series(np.loadtxt('positive_words.txt',dtype = np.str))
negative_words = Series(np.loadtxt('negative_words.txt',dtype = np.str))

#pass a dataframe to the sentiment function to return the dataframe with two new columns
def sentiment(sent):
    sent['positive'] = 0.0
    sent['negative'] = 0.0
    
    #sent each tweet to translator
    sent["translation"] = sent.tweet.apply(lambda tweet: translate_text(tweet))
    sent['words'] = sent['translation'].str.split(' ')

    for row in range(1,len(sent)):
        word_count = 0
        word_count += len(sent['words'][row])
    
        list_of_words = sent['words'][row]

        positive_count = (positive_words.str.lower().isin(list_of_words)).sum()
        negative_count = (negative_words.str.lower().isin(list_of_words)).sum()

        positive_sent = (positive_count/float(word_count)).round(3)
        negative_sent = (negative_count/float(word_count)).round(3)
        
        sent['positive'][row] = positive_sent
        sent['negative'][row] = negative_sent


    return sent


In [None]:
#call the sentiment function, which inherently calls the translation function
sent = sentiment(sqlContext.sql("select * from tweets").toPandas())

sent['timeTweet']=pd.to_datetime(sent['timeTweet'],infer_datetime_format=True)



## Plot

In [None]:
#transform the dataframe with tweets to a minute level and display each tweet in a new column 
groupedpositive= sent.groupby('timeTweet')['positive'].apply(lambda df: df.reset_index(drop=True)).unstack()
groupednegative= sent.groupby('timeTweet')['negative'].apply(lambda df: df.reset_index(drop=True)).unstack()

groupednegative=groupednegative.ix[:,0:15]
groupedpositive=groupedpositive.ix[:,0:15]


In [None]:
#use matplotlib animation to display a frame by frame minute analysis of the tweet sentiment
%matplotlib notebook

matrixNegative = np.matrix(groupedpositive.transpose())
matrixPositive = np.matrix(groupednegative.transpose())


fig = plt.figure()
ax = fig.add_subplot(111)
plt.ion()
plt.xlabel("Time")
plt.ylabel("Sentiment")
plt.title("Sentiment by Minute")

fig.show()
fig.canvas.draw()

for i in range(0,len(groupedpositive)):
    time.sleep(.5)
    ax.clear()
    ax.plot(matrixPositive[:,i])
    ax.plot(matrixNegative[:,i])
    ax.text(.8,1.1, "Blue = Positive", transform=ax.transAxes)
    ax.text(.8,1.05, "Orange = Negative", transform=ax.transAxes)
    fig.suptitle("Sentiment by Minute")
    ax.text(.45,-.10, "Tweet", transform=ax.transAxes)
    ax.text(-.16,0, "Sentiment", transform=ax.transAxes)
    fig.canvas.draw()


## Regression

In [None]:
# Have only the required variables in the dataframe for regression
stock_sentiment_lm = stock_sentiment[['positive','negative','close']]

#Remove NAs
stock_sentiment_lm=stock_sentiment_lm.fillna(0)

#Convert all the values to float so that spark will read it as double
stock_sentiment_lm['close']=stock_sentiment_lm['close'].astype('float64', raise_on_error = False)
stock_sentiment_lm['positive']=stock_sentiment_lm['positive'].astype('float64', raise_on_error = False)
stock_sentiment_lm['negative']=stock_sentiment_lm['negative'].astype('float64', raise_on_error = False)


#Converting pandas dataframe to spark dataframe
stock_sent_lm_df=sqlContext.createDataFrame(stock_sentiment_lm)
#Checking the data
stock_sent_lm_df.printSchema()

In [None]:
#Transform all features into a vector using VectorAssembler
assemblerInputs = ["positive", "negative"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
output = assembler.transform(stock_sent_lm_df)

In [None]:
#Only keep the label and the feature
df = output.selectExpr( "features","close as label")
df.show()

In [None]:
# Define LinearRegression algorithm
lr = LinearRegression()

# Fitting the model
lm_model = lr.fit(df)

In [None]:
# Evaluating the coefficients

summary = lm_model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
