# BitCoin Price Prediction using Sentiment analysis on social media
* Aaron Paul

In [1]:
import pyspark as spark
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import col,udf,monotonically_increasing_id,unix_timestamp,round,avg
import re
sc = spark.SparkContext()
sql = spark.SQLContext(sc)

## Loading tweets dataset

In [2]:
TwDF=pd.read_csv('/content/tweetsfinal1.csv',error_bad_lines=False,engine = 'python',header = None)

## Loading Bitcoin prices dataset

In [3]:
BtcDF=pd.read_csv('/content/BitCoinPrice.csv',error_bad_lines=False,engine = 'python',header = None)
FullDataTw=sql.createDataFrame(TwDF)
FullDataBtc=sql.createDataFrame(BtcDF) #creating pandas df and then changing it to pyspark df

In [4]:
FullDataTw = FullDataTw.dropna() #getting rid of full empty rows
#print(FullDataTw.count())
#print(FullDataBtc.count())

In [5]:
FullDataTw.select(monotonically_increasing_id().alias("rowId"),"*")
FullDataTw = FullDataTw.withColumnRenamed('0', 'DateTime') #setting column names of Twitter dataset
FullDataTw = FullDataTw.withColumnRenamed('1', 'Tweet')
FullDataBtc = FullDataBtc.withColumnRenamed('0', 'DateTime') #setting column names of Bitcoin price dataset
FullDataBtc = FullDataBtc.withColumnRenamed('1', 'Price')
FullDataBtc = FullDataBtc.filter(FullDataBtc.DateTime != 'Date') #to get rid of first row with the header

## Pre-Processing Twitter dataframe

In [6]:
Tw_samp = FullDataTw  #.limit(100) #taking sample of 100 rows and working on it otherwise remove the limit

In [7]:
import preprocessor as p #cleaning each tweet using tweet-preprocessor like removing hashtags,urls,emojis....
def function_udf(input_str):
    input_str = re.sub(r'RT', '', input_str)
    p.set_options(p.OPT.URL, p.OPT.EMOJI,p.OPT.MENTION)
    input_str = p.clean(input_str)
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", input_str).split())
func_udf = udf(function_udf, StringType())
CleanDF = Tw_samp.withColumn('CleanedTweets', func_udf(Tw_samp['Tweet']))
CleanDF.show(3)

+--------------------+--------------------+--------------------+
|            DateTime|               Tweet|       CleanedTweets|
+--------------------+--------------------+--------------------+
|Thu Nov 09 17:43:...|RT @Forbes: The F...|The Failure of Se...|
|Thu Nov 09 17:43:...|RT @mindstatex: L...|Lots of love from...|
+--------------------+--------------------+--------------------+
only showing top 3 rows



## Sentiment analysis using Vader packages

In [8]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()
def senti_score_udf(sentence):
    snt = analyser.polarity_scores(sentence)
    return ([snt['neg'], snt['neu'], snt['pos'], snt['compound']])
func_udf2 = udf(senti_score_udf, ArrayType(FloatType()))
CleanDF = CleanDF.withColumn('p_neg', func_udf2(CleanDF['CleanedTweets'])[0])
CleanDF = CleanDF.withColumn('p_neu', func_udf2(CleanDF['CleanedTweets'])[1])
CleanDF = CleanDF.withColumn('p_pos', func_udf2(CleanDF['CleanedTweets'])[2])
CleanDF = CleanDF.withColumn('p_comp', func_udf2(CleanDF['CleanedTweets'])[3])
CleanDF.show(3)

+--------------------+--------------------+--------------------+-----+-----+-----+-------+
|            DateTime|               Tweet|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|
+--------------------+--------------------+--------------------+-----+-----+-----+-------+
|Thu Nov 09 17:43:...|RT @Forbes: The F...|The Failure of Se...|0.342|0.658|  0.0|-0.6914|
|Thu Nov 09 17:43:...|RT @mindstatex: L...|Lots of love from...|  0.0|0.577|0.423|  0.875|
+--------------------+--------------------+--------------------+-----+-----+-----+-------+
only showing top 3 rows



In [9]:
def Tw_Time_format(stri):  #manipulating and casting the strings(DateTime) of tweets dataframe to timestamps
    dic = {'Nov':'11','Oct':'10'}
    ans = ''
    ans += stri[-4:]+'-'+ dic[stri[4:7]]+'-'+stri[8:19]
    return ans
func_udf3 = udf(Tw_Time_format,StringType())
CleanDF = CleanDF.withColumn('DateTime_c', func_udf3(CleanDF['DateTime']))
CleanDF = CleanDF.withColumn("DateTime_casted",CleanDF['DateTime_c'].cast(TimestampType()))
CleanDF.show(3)

+--------------------+--------------------+--------------------+-----+-----+-----+-------+-------------------+-------------------+
|            DateTime|               Tweet|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|         DateTime_c|    DateTime_casted|
+--------------------+--------------------+--------------------+-----+-----+-----+-------+-------------------+-------------------+
|Thu Nov 09 17:43:...|RT @Forbes: The F...|The Failure of Se...|0.342|0.658|  0.0|-0.6914|2017-11-09 17:43:41|2017-11-09 17:43:41|
|Thu Nov 09 17:43:...|RT @mindstatex: L...|Lots of love from...|  0.0|0.577|0.423|  0.875|2017-11-09 17:43:40|2017-11-09 17:43:40|
+--------------------+--------------------+--------------------+-----+-----+-----+-------+-------------------+-------------------+
only showing top 3 rows



In [10]:
FinalTw = CleanDF.selectExpr("DateTime_casted as Date_Time", "CleanedTweets as Cleaned_Tweets", "p_neg","p_neu","p_pos","p_comp")
FinalTw.show(5) #selecting necessary columns

+-------------------+--------------------+-----+-----+-----+-------+
|          Date_Time|      Cleaned_Tweets|p_neg|p_neu|p_pos| p_comp|
+-------------------+--------------------+-----+-----+-----+-------+
|2017-11-09 17:43:41|The Failure of Se...|0.342|0.658|  0.0|-0.6914|
|2017-11-09 17:43:40|Lots of love from...|  0.0|0.577|0.423|  0.875|
|2017-11-09 17:43:39|Join our telegram...|  0.0|0.845|0.155|  0.296|
|2017-11-09 17:43:39|DIGAF FLOAT 16M T...|  0.0|  1.0|  0.0|    0.0|
+-------------------+--------------------+-----+-----+-----+-------+
only showing top 5 rows



## Pre-Processing Bitcoin dataframe

In [11]:
from datetime import datetime
from dateutil import parser
def Btc_Time_format(input_str): #manipulating and casting the strings(DateTime) of BTC dataframe to timestamps
    input_str = re.sub(r'/17','', input_str)
    input_str = '2017-'+ input_str
    input_str = re.sub(r'/', '-', input_str)
    input_str += ':00'
    return input_str[:10]+""+input_str[10:]
func_udf = udf(Btc_Time_format, StringType())
FullDataBtc = FullDataBtc.withColumn('Cleaned_BTC_Time', func_udf(FullDataBtc['DateTime']))
FullDataBtc.show(5)

+-------------+-------+------------------+
|     DateTime|  Price|  Cleaned_BTC_Time|
+-------------+-------+------------------+
|10/30/17 0:00|6123.21|2017-10-30 0:00:00|
|10/30/17 1:00|6131.35|2017-10-30 1:00:00|
|10/30/17 2:00|6114.17|2017-10-30 2:00:00|
|10/30/17 3:00|6153.11|2017-10-30 3:00:00|
|10/30/17 4:00|6151.09|2017-10-30 4:00:00|
+-------------+-------+------------------+
only showing top 5 rows



In [12]:
CleandfBtc = FullDataBtc.withColumn("Cleaned_BTC_Time_New",FullDataBtc['Cleaned_BTC_Time'].cast(TimestampType()))
FinalBtc = CleandfBtc.selectExpr("Cleaned_BTC_Time_New as Date_Time", "Price")
FinalBtc = FinalBtc.withColumn("Price",FinalBtc['Price'].cast(DoubleType()))
FinalBtc.show(5)#In this cell, casting to timesstamp, changing col names and casting price type to double

+-------------------+-------+
|          Date_Time|  Price|
+-------------------+-------+
|2017-10-30 00:00:00|6123.21|
|2017-10-30 01:00:00|6131.35|
|2017-10-30 02:00:00|6114.17|
|2017-10-30 03:00:00|6153.11|
|2017-10-30 04:00:00|6151.09|
+-------------------+-------+
only showing top 5 rows



## Dataframes Look like this...

In [13]:
FinalTw.printSchema()

root
 |-- Date_Time: timestamp (nullable = true)
 |-- Cleaned_Tweets: string (nullable = true)
 |-- p_neg: float (nullable = true)
 |-- p_neu: float (nullable = true)
 |-- p_pos: float (nullable = true)
 |-- p_comp: float (nullable = true)



In [14]:
FinalBtc.printSchema()
FinalBtc.count()

root
 |-- Date_Time: timestamp (nullable = true)
 |-- Price: double (nullable = true)



672

## Truncating timestamps to hours and then grouping them by hour

In [15]:
dt_truncated = ((round(unix_timestamp(col('Date_Time')) / 3600) * 3600).cast('timestamp'))
FinalTw = FinalTw.withColumn('dt_truncated', dt_truncated)
FinalTw = FinalTw.selectExpr("dt_truncated as Date_Time","Cleaned_Tweets","p_neg","p_neu","p_pos","p_comp")
UTC = ((unix_timestamp(col('Date_Time'))+ 5*60*60).cast('timestamp'))
FinalTw = FinalTw.withColumn('UTC', UTC)
FinalTw = FinalTw.selectExpr("UTC as Date_Time","Cleaned_Tweets","p_neg","p_neu","p_pos","p_comp")
FinalTw.show(5)

+-------------------+--------------------+-----+-----+-----+-------+
|          Date_Time|      Cleaned_Tweets|p_neg|p_neu|p_pos| p_comp|
+-------------------+--------------------+-----+-----+-----+-------+
|2017-11-09 23:00:00|The Failure of Se...|0.342|0.658|  0.0|-0.6914|
|2017-11-09 23:00:00|Lots of love from...|  0.0|0.577|0.423|  0.875|
|2017-11-09 23:00:00|Join our telegram...|  0.0|0.845|0.155|  0.296|
|2017-11-09 23:00:00|DIGAF FLOAT 16M T...|  0.0|  1.0|  0.0|    0.0|
+-------------------+--------------------+-----+-----+-----+-------+
only showing top 5 rows



In [21]:
FinalTw.registerTempTable("temp")
#FinalTw_avg = sql.sql("SELECT Date_Time As DateTime,AVG(p_neg) as P_Neg,AVG(p_neu) as P_Neu,AVG(p_pos) as P_Pos,AVG(p_comp) as P_Comp FROM temp GROUP BY Date_Time")
FinalTw_avg = FinalTw.select("Date_Time","polarity","subj","p_pos","p_neg").groupBy("Date_Time").agg(avg(col("polarity","subj","p_pos","p_neg")))
FinalTw_avg.show(5)

AnalysisException: ignored

In [17]:
#This cell is just to collect all the corpus per hour(for the future work)
from pyspark.sql import functions as f
df_with_text = FinalTw.groupby("Date_Time").agg(f.concat_ws(" ", f.collect_list(FinalTw.Cleaned_Tweets)))
df_with_text.show(1)

+-------------------+------------------------------------------+
|          Date_Time|concat_ws( , collect_list(Cleaned_Tweets))|
+-------------------+------------------------------------------+
|2017-11-09 14:00:00|                      hello What it fee...|
+-------------------+------------------------------------------+
only showing top 1 row



In [18]:
FinalTw_avg.count()
from pyspark.sql.functions import *
df_sort = FinalTw_avg.sort(asc("Date_Time"))
df_sort.show(1)

+-------------------+--------------------+------------------+-------------------+-------------------+
|           DateTime|               P_Neg|             P_Neu|              P_Pos|             P_Comp|
+-------------------+--------------------+------------------+-------------------+-------------------+
|2017-10-31 05:00:00|0.028692847177866968|0.8934684422959118|0.07503646569846388|0.09825245369889292|
+-------------------+--------------------+------------------+-------------------+-------------------+
only showing top 1 row



## Joining twitter and bitcoin dataframes by DateTime

In [19]:
FinalTw_avg.registerTempTable("avgs")
FinalBtc.registerTempTable("prices")
results = sql.sql("SELECT DateTime, P_Neg, P_Neu, P_Pos, P_Comp, Price FROM avgs JOIN prices ON avgs.DateTime = prices.Date_Time order by avgs.DateTime")
#results = results.selectExpr("DateTime","avg(polarity)","avg(subj)","avg(p_pos)","avg(p_neg)","Price") Use this line if you are using text blob package
results.show(1)

+-------------------+--------------------+------------------+-------------------+-------------------+-------+
|           DateTime|               P_Neg|             P_Neu|              P_Pos|             P_Comp|  Price|
+-------------------+--------------------+------------------+-------------------+-------------------+-------+
|2017-10-31 05:00:00|0.028692847177866968|0.8934684422959118|0.07503646569846388|0.09825245369889292|6158.76|
+-------------------+--------------------+------------------+-------------------+-------------------+-------+
only showing top 1 row



In [20]:
results.repartition(1).write.csv("DataforModelExec.csv") #this will write df to single csv instead of writing diff csv acc to partitions

In [None]:
# Now refer to LSTM notebook for the timeseries analysis