# Spark Setup

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, desc
from pyspark.sql.types import ArrayType, StringType, MapType, FloatType

In [2]:
spark = SparkSession.builder\
                        .master("local[*]")\
                        .config('spark.executor.memory', '5g')\
                        .config('spark.driver.memory', '5g')\
                        .config("spark.sql.session.timeZone", "UTC")\
                        .config("spark.sql.execution.arrow.enabled","true")\
                    .appName("Sentiment Analysis")\
                    .getOrCreate()
spark

In [3]:
spark

# Rolling Average eth_ss

In [9]:
eth_ss.count()

2163314

In [30]:
x = eth_ss.select("datetime","pos_vader").withColumn("datetime",col("datetime").cast("int")).sort("datetime").toPandas()

In [31]:
x.head()

Unnamed: 0,datetime,pos_vader
0,1451599200,0.0
1,1451600160,0.0
2,1451601060,0.0
3,1451601900,0.0
4,1451603520,0.167


In [32]:
import pandas as pd


In [33]:
x["datetime"] = pd.to_datetime(x.datetime,unit='s')
x.head()
#x.set_index("datetime").head(20).rolling('5m').sum()

Unnamed: 0,datetime,pos_vader
0,2015-12-31 22:00:00,0.0
1,2015-12-31 22:16:00,0.0
2,2015-12-31 22:31:00,0.0
3,2015-12-31 22:45:00,0.0
4,2015-12-31 23:12:00,0.167


In [38]:
x.set_index("datetime").head(20).rolling('300s').sum()

Unnamed: 0_level_0,pos_vader
datetime,Unnamed: 1_level_1
2015-12-31 22:00:00,0.0
2015-12-31 22:16:00,0.0
2015-12-31 22:31:00,0.0
2015-12-31 22:45:00,0.0
2015-12-31 23:12:00,0.167
2016-01-01 00:02:00,0.258
2016-01-01 02:14:00,0.0
2016-01-01 04:05:00,0.0
2016-01-01 08:05:00,0.178
2016-01-01 08:21:00,0.162


# Twitter Sentiments

In [9]:
from pyspark.sql.functions import col, concat, lit, date_format, desc, asc


parquet_eth_path = "data/tweets/ethereum/parquet/sentiment/"
eth = spark.read.parquet(parquet_eth_path)

In [10]:
from pyspark.sql.functions import col, concat, lit, date_format, desc, asc, min, max


In [11]:
parquet_eth_path = "data/tweets/bitcoin/parquet/sentiment/"
btc = spark.read.parquet(parquet_eth_path)

In [12]:
eth_ss = eth\
            .select("pos_vader","neg_vader","neu_vader","compound_vader","polarity_textblob","subjectivity_textblob","datetime")\
            .withColumn("Cryptocurrency",lit("Ethereum"))\

In [13]:
btc_ss = btc\
            .select("pos_vader","neg_vader","neu_vader","compound_vader","polarity_textblob","subjectivity_textblob","datetime")\
            .withColumn("Cryptocurrency",lit("Bitcoin"))\

In [14]:
both_ss = eth_ss.union(btc_ss).na.drop()

In [15]:
eth_ss.printSchema()

root
 |-- pos_vader: float (nullable = true)
 |-- neg_vader: float (nullable = true)
 |-- neu_vader: float (nullable = true)
 |-- compound_vader: float (nullable = true)
 |-- polarity_textblob: float (nullable = true)
 |-- subjectivity_textblob: float (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- Cryptocurrency: string (nullable = false)



In [16]:
from pyspark.sql.functions import when, lit, avg, month, year, dayofmonth, minute, hour
from pyspark.sql.functions import col as c

In [17]:
l5 = c("minute") <= 5
l10 = (c("minute") > 5) & (c("minute") <= 10)
l15 = (c("minute") > 10) & (c("minute") <= 15)
l20 = (c("minute") > 15) & (c("minute") <= 20)
l25 = (c("minute") > 20) & (c("minute") <= 25)
l30 = (c("minute") > 25) & (c("minute") <= 30)
l35 = (c("minute") > 30) & (c("minute") <= 35)
l40 = (c("minute") > 35) & (c("minute") <= 40)
l45 = (c("minute") > 40) & (c("minute") <= 45)
l50 = (c("minute") > 45) & (c("minute") <= 50)
l55 = (c("minute") > 50) & (c("minute") <= 55)
l60 = (c("minute") > 55) & (c("minute") <= 60)

w_t = when(l5,5)\
        .when(l10,10)\
        .when(l15,15)\
        .when(l20,10)\
        .when(l25,25)\
        .when(l30,30)\
        .when(l35,35)\
        .when(l40,40)\
        .when(l45,45)\
        .when(l50,50)\
        .when(l55,55)\
        .when(l60,60)


eth_ssm = eth_ss.withColumn("year",year("datetime"))\
                .withColumn("month",month("datetime"))\
                .withColumn("day",dayofmonth("datetime"))\
                .withColumn("hour",hour("datetime"))\
                .withColumn("minute",minute("datetime"))\
                .withColumn("bucket",w_t)

btc_ssm = btc_ss.withColumn("year",year("datetime"))\
                .withColumn("month",month("datetime"))\
                .withColumn("day",dayofmonth("datetime"))\
                .withColumn("hour",hour("datetime"))\
                .withColumn("minute",minute("datetime"))\
                .withColumn("bucket",w_t)
            
both_ssm = both_ss.withColumn("year",year("datetime"))\
                .withColumn("month",month("datetime"))\
                .withColumn("day",dayofmonth("datetime"))\
                .withColumn("hour",hour("datetime"))\
                .withColumn("minute",minute("datetime"))\
                .withColumn("bucket",w_t)

In [18]:
cv = eth_ssm.filter("compound_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("compound_vader").alias("avg_eth_compound_vader"),
)

pv = eth_ssm.filter("pos_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("pos_vader").alias("avg_eth_pos_vader")
)

nv = eth_ssm.filter("neg_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("neg_vader").alias("avg_eth_neg_vader")
)

pt = eth_ssm.filter("polarity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("polarity_textblob").alias("avg_eth_polarity_textblob"),   
)

st = eth_ssm.filter("subjectivity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("subjectivity_textblob").alias("avg_eth_subjectivity_textblob")    
)

eth_agg = cv.join(pv,on=["year","month","day","hour","bucket"])\
    .join(nv,on=["year","month","day","hour","bucket"],how="full")\
    .join(pt,on=["year","month","day","hour","bucket"],how="full")\
    .join(st,on=["year","month","day","hour","bucket"],how="full")



In [19]:
eth_agg.count()

193178

In [20]:
cv = btc_ssm.filter("compound_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("compound_vader").alias("avg_twitter_btc_compound_vader"),
)

pv = btc_ssm.filter("pos_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("pos_vader").alias("avg_twitter_btc_pos_vader")
)

nv = btc_ssm.filter("neg_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("neg_vader").alias("avg_twitter_btc_neg_vader")
)

pt = btc_ssm.filter("polarity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("polarity_textblob").alias("avg_twitter_btc_polarity_textblob"),   
)

st = btc_ssm.filter("subjectivity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("subjectivity_textblob").alias("avg_twitter_btc_subjectivity_textblob")    
)

btc_agg = cv.join(pv,on=["year","month","day","hour","bucket"],how="full")\
    .join(nv,on=["year","month","day","hour","bucket"],how="full")\
    .join(pt,on=["year","month","day","hour","bucket"],how="full")\
    .join(st,on=["year","month","day","hour","bucket"],how="full")



In [21]:
btc_agg.count()

205438

In [22]:
cv = both_ssm.filter("compound_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("compound_vader").alias("avg_twitter_compound_vader"),
)

pv = both_ssm.filter("pos_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("pos_vader").alias("avg_twitter_pos_vader")
)

nv = both_ssm.filter("neg_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("neg_vader").alias("avg_twitter_neg_vader")
)

pt = both_ssm.filter("polarity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("polarity_textblob").alias("avg_twitter_polarity_textblob"),   
)

st = both_ssm.filter("subjectivity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("subjectivity_textblob").alias("avg_twitter_subjectivity_textblob")    
)

both_agg = cv.join(pv,on=["year","month","day","hour","bucket"],how="full")\
    .join(nv,on=["year","month","day","hour","bucket"],how="full")\
    .join(pt,on=["year","month","day","hour","bucket"],how="full")\
    .join(st,on=["year","month","day","hour","bucket"],how="full")



In [23]:
everything_t = eth_agg.join(btc_agg,on=["year","month","day","hour","bucket"])\
                    .join(both_agg,on=["year","month","day","hour","bucket"],how="full")
                    

In [24]:
everything_t.count()

227319

In [25]:
everything_t.write.mode("overwrite").parquet("data/temp/sentiment/twitter")

In [19]:
#everything_t.limit(5).toPandas()

# Reddit Sentiments

In [29]:
from pyspark.sql.functions import when, col
from pyspark.sql.functions import col as c

when_statment = when(col("subreddit") == "ethtrader","Ethereum")\
                .when(col("subreddit") == "ethereum","Ethereum")\
                .when(col("subreddit") == "Bitcoin","Bitcoin")\
                .when(col("subreddit") == "btc","Bitcoin")

In [30]:
parquet_reddit_path = "data/reddit-crypto/parquet/complete_sentiment/"
redd = spark.read.parquet(parquet_reddit_path)\
                    .withColumn("Cryptocurrency",when_statment)\
                    .withColumnRenamed("created_utc","datetime")

In [31]:
eth_r_ss  = redd.select("subreddit","pos_vader","neg_vader","neu_vader","compound_vader","polarity_textblob","subjectivity_textblob","datetime")\
                .filter("Cryptocurrency = 'Ethereum'")\
                .drop("Cryptocurrency")


btc_r_ss  = redd.select("subreddit","pos_vader","neg_vader","neu_vader","compound_vader","polarity_textblob","subjectivity_textblob","datetime")\
                .filter("Cryptocurrency = 'Bitcoin'")\
                .drop("Cryptocurrency")


both_r_ss = redd.select("subreddit","pos_vader","neg_vader","neu_vader","compound_vader","polarity_textblob","subjectivity_textblob","datetime")\
                .drop("Cryptocurrency")







In [32]:
l5 = c("minute") <= 5
l10 = (c("minute") > 5) & (c("minute") <= 10)
l15 = (c("minute") > 10) & (c("minute") <= 15)
l20 = (c("minute") > 15) & (c("minute") <= 20)
l25 = (c("minute") > 20) & (c("minute") <= 25)
l30 = (c("minute") > 25) & (c("minute") <= 30)
l35 = (c("minute") > 30) & (c("minute") <= 35)
l40 = (c("minute") > 35) & (c("minute") <= 40)
l45 = (c("minute") > 40) & (c("minute") <= 45)
l50 = (c("minute") > 45) & (c("minute") <= 50)
l55 = (c("minute") > 50) & (c("minute") <= 55)
l60 = (c("minute") > 55) & (c("minute") <= 60)

w_t = when(l5,5)\
        .when(l10,10)\
        .when(l15,15)\
        .when(l20,10)\
        .when(l25,25)\
        .when(l30,30)\
        .when(l35,35)\
        .when(l40,40)\
        .when(l45,45)\
        .when(l50,50)\
        .when(l55,55)\
        .when(l60,60)


eth_r_ssm = eth_r_ss.withColumn("year",year("datetime"))\
                .withColumn("month",month("datetime"))\
                .withColumn("day",dayofmonth("datetime"))\
                .withColumn("hour",hour("datetime"))\
                .withColumn("minute",minute("datetime"))\
                .withColumn("bucket",w_t)

btc_r_ssm = btc_r_ss.withColumn("year",year("datetime"))\
                .withColumn("month",month("datetime"))\
                .withColumn("day",dayofmonth("datetime"))\
                .withColumn("hour",hour("datetime"))\
                .withColumn("minute",minute("datetime"))\
                .withColumn("bucket",w_t)
            
both_r_ssm = both_r_ss.withColumn("year",year("datetime"))\
                .withColumn("month",month("datetime"))\
                .withColumn("day",dayofmonth("datetime"))\
                .withColumn("hour",hour("datetime"))\
                .withColumn("minute",minute("datetime"))\
                .withColumn("bucket",w_t)

In [33]:
cv = eth_r_ssm.filter("compound_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("compound_vader").alias("avg_reddit_eth_compound_vader"),
)

pv = eth_r_ssm.filter("pos_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("pos_vader").alias("avg_reddit_eth_pos_vader")
)

nv = eth_r_ssm.filter("neg_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("neg_vader").alias("avg_reddit_eth_neg_vader")
)

pt = eth_r_ssm.filter("polarity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("polarity_textblob").alias("avg_reddit_eth_polarity_textblob"),   
)

st = eth_r_ssm.filter("subjectivity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("subjectivity_textblob").alias("avg_reddit_eth_subjectivity_textblob")    
)

eth_r_agg = cv.join(pv,on=["year","month","day","hour","bucket"],how="full")\
    .join(nv,on=["year","month","day","hour","bucket"],how="full")\
    .join(pt,on=["year","month","day","hour","bucket"],how="full")\
    .join(st,on=["year","month","day","hour","bucket"],how="full")



In [34]:
cv = btc_r_ssm.filter("compound_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("compound_vader").alias("avg_reddit_btc_compound_vader"),
)

pv = btc_r_ssm.filter("pos_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("pos_vader").alias("avg_reddit_btc_pos_vader")
)

nv = btc_r_ssm.filter("neg_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("neg_vader").alias("avg_reddit_btc_neg_vader")
)

pt = btc_r_ssm.filter("polarity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("polarity_textblob").alias("avg_reddit_btc_polarity_textblob"),   
)

st = btc_r_ssm.filter("subjectivity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("subjectivity_textblob").alias("avg_reddit_btc_subjectivity_textblob")    
)

btc_r_agg = cv.join(pv,on=["year","month","day","hour","bucket"],how="full")\
    .join(nv,on=["year","month","day","hour","bucket"],how="full")\
    .join(pt,on=["year","month","day","hour","bucket"],how="full")\
    .join(st,on=["year","month","day","hour","bucket"],how="full")



In [35]:
cv = both_r_ssm.filter("compound_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("compound_vader").alias("avg_reddit_compound_vader"),
)

pv = both_r_ssm.filter("pos_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("pos_vader").alias("avg_reddit_pos_vader")
)

nv = both_r_ssm.filter("neg_vader != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("neg_vader").alias("avg_reddit_neg_vader")
)

pt = both_r_ssm.filter("polarity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("polarity_textblob").alias("avg_reddit_polarity_textblob"),   
)

st = both_r_ssm.filter("subjectivity_textblob != 0")\
.groupBy("year","month","day","hour","bucket").agg(
avg("subjectivity_textblob").alias("avg_reddit_subjectivity_textblob")    
)

both_r_agg = cv.join(pv,on=["year","month","day","hour","bucket"])\
    .join(nv,on=["year","month","day","hour","bucket"],how="full")\
    .join(pt,on=["year","month","day","hour","bucket"],how="full")\
    .join(st,on=["year","month","day","hour","bucket"],how="full")



In [36]:
everything_r = eth_r_agg.join(btc_r_agg,on=["year","month","day","hour","bucket"])\
                    .join(both_r_agg,on=["year","month","day","hour","bucket"],how="full")
                    

In [37]:
everything_r.filter("year > 2015").count()

240466

In [38]:
227319

227319

In [95]:
240466*5/60/24/365

2.2875380517503805

In [39]:
everything_r.write.mode("overwrite").parquet("data/temp/sentiment/reddit")

In [40]:
#everything_r.limit(5).toPandas()

In [41]:
everything_r.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- bucket: integer (nullable = true)
 |-- avg_reddit_eth_compound_vader: double (nullable = true)
 |-- avg_reddit_eth_pos_vader: double (nullable = true)
 |-- avg_reddit_eth_neg_vader: double (nullable = true)
 |-- avg_reddit_eth_polarity_textblob: double (nullable = true)
 |-- avg_reddit_eth_subjectivity_textblob: double (nullable = true)
 |-- avg_reddit_btc_compound_vader: double (nullable = true)
 |-- avg_reddit_btc_pos_vader: double (nullable = true)
 |-- avg_reddit_btc_neg_vader: double (nullable = true)
 |-- avg_reddit_btc_polarity_textblob: double (nullable = true)
 |-- avg_reddit_btc_subjectivity_textblob: double (nullable = true)
 |-- avg_reddit_compound_vader: double (nullable = true)
 |-- avg_reddit_pos_vader: double (nullable = true)
 |-- avg_reddit_neg_vader: double (nullable = true)
 |-- avg_reddit_polarity_textblob: do

# Join all social media information

In [51]:
r = spark.read.parquet("data/temp/sentiment/reddit").filter("year > 2015")
t = spark.read.parquet("data/temp/sentiment/twitter").filter("year > 2015")

In [52]:
t.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- bucket: integer (nullable = true)
 |-- avg_eth_compound_vader: double (nullable = true)
 |-- avg_eth_pos_vader: double (nullable = true)
 |-- avg_eth_neg_vader: double (nullable = true)
 |-- avg_eth_polarity_textblob: double (nullable = true)
 |-- avg_eth_subjectivity_textblob: double (nullable = true)
 |-- avg_twitter_btc_compound_vader: double (nullable = true)
 |-- avg_twitter_btc_pos_vader: double (nullable = true)
 |-- avg_twitter_btc_neg_vader: double (nullable = true)
 |-- avg_twitter_btc_polarity_textblob: double (nullable = true)
 |-- avg_twitter_btc_subjectivity_textblob: double (nullable = true)
 |-- avg_twitter_compound_vader: double (nullable = true)
 |-- avg_twitter_pos_vader: double (nullable = true)
 |-- avg_twitter_neg_vader: double (nullable = true)
 |-- avg_twitter_polarity_textblob: double (nullable = true)
 |-

In [53]:
r.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- bucket: integer (nullable = true)
 |-- avg_reddit_eth_compound_vader: double (nullable = true)
 |-- avg_reddit_eth_pos_vader: double (nullable = true)
 |-- avg_reddit_eth_neg_vader: double (nullable = true)
 |-- avg_reddit_eth_polarity_textblob: double (nullable = true)
 |-- avg_reddit_eth_subjectivity_textblob: double (nullable = true)
 |-- avg_reddit_btc_compound_vader: double (nullable = true)
 |-- avg_reddit_btc_pos_vader: double (nullable = true)
 |-- avg_reddit_btc_neg_vader: double (nullable = true)
 |-- avg_reddit_btc_polarity_textblob: double (nullable = true)
 |-- avg_reddit_btc_subjectivity_textblob: double (nullable = true)
 |-- avg_reddit_compound_vader: double (nullable = true)
 |-- avg_reddit_pos_vader: double (nullable = true)
 |-- avg_reddit_neg_vader: double (nullable = true)
 |-- avg_reddit_polarity_textblob: do

In [54]:
everything = r.join(t,on=["year","month","day","hour","bucket"],how="full")

In [55]:
everything.write.mode("overwrite").parquet("data/temp/sentiment/everything")

In [7]:
spark.read.parquet("data/temp/sentiment/everything").sort("year","month","day","hour","bucket").printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- bucket: integer (nullable = true)
 |-- avg_reddit_eth_compound_vader: double (nullable = true)
 |-- avg_reddit_eth_pos_vader: double (nullable = true)
 |-- avg_reddit_eth_neg_vader: double (nullable = true)
 |-- avg_reddit_eth_polarity_textblob: double (nullable = true)
 |-- avg_reddit_eth_subjectivity_textblob: double (nullable = true)
 |-- avg_reddit_btc_compound_vader: double (nullable = true)
 |-- avg_reddit_btc_pos_vader: double (nullable = true)
 |-- avg_reddit_btc_neg_vader: double (nullable = true)
 |-- avg_reddit_btc_polarity_textblob: double (nullable = true)
 |-- avg_reddit_btc_subjectivity_textblob: double (nullable = true)
 |-- avg_reddit_compound_vader: double (nullable = true)
 |-- avg_reddit_pos_vader: double (nullable = true)
 |-- avg_reddit_neg_vader: double (nullable = true)
 |-- avg_reddit_polarity_textblob: do

In [5]:
e = spark.read.parquet("data/temp/sentiment/everything").sort("year","month","day","hour","bucket").toPandas()

In [6]:
e.head()

Unnamed: 0,year,month,day,hour,bucket,avg_reddit_eth_compound_vader,avg_reddit_eth_pos_vader,avg_reddit_eth_neg_vader,avg_reddit_eth_polarity_textblob,avg_reddit_eth_subjectivity_textblob,...,avg_twitter_btc_compound_vader,avg_twitter_btc_pos_vader,avg_twitter_btc_neg_vader,avg_twitter_btc_polarity_textblob,avg_twitter_btc_subjectivity_textblob,avg_twitter_compound_vader,avg_twitter_pos_vader,avg_twitter_neg_vader,avg_twitter_polarity_textblob,avg_twitter_subjectivity_textblob
0,2016,1,1,0,5,0.2732,0.149,,,,...,0.660516,0.309296,0.160778,0.133164,0.21739,0.659378,0.308778,0.160778,0.157931,0.220476
1,2016,1,1,0,10,0.7658,0.099,0.044,0.269481,0.561364,...,,,,,,-0.002928,0.237611,0.146,0.255884,0.458497
2,2016,1,1,0,15,-0.1449,0.085,0.134,0.205556,0.7,...,,,,,,0.244489,0.311,0.172714,0.22923,0.558709
3,2016,1,1,0,25,,,,,,...,,,,,,-0.181192,0.226417,0.151,0.142119,0.665341
4,2016,1,1,0,30,0.4926,1.0,,0.25,0.2,...,,,,,,0.478296,0.230391,0.184333,0.402529,0.422893
