#### Option 1: Running locally with connector 2.11:2.4.0

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, array, explode, sum, count, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from datetime import datetime

SYMBOLS_LIST = ['BTC.X', 'BSV.X', 'BCH.X', 'LTC.X', 'ETH.X', 'DOGE.X']

# NOTE: The environment needs to have scala installed for this to work
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/cryptoracle") \
.config("spark.mongodb.input.collection", "messages") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.0') \
.getOrCreate()

messages_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

#### Option 2: Running with remote mongo URI with different connector

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
import sys

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, array, explode, sum, count, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from datetime import datetime

MONGODB_INPUT_URI = "mongodb://heroku_kvptfcm8:vbekldoic9poi92kkp810rvk7@ds141185.mlab.com:41185/heroku_kvptfcm8.runs"
spark = SparkSession \
        .builder \
        .config("spark.mongodb.input.uri", MONGODB_INPUT_URI) \
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.0') \
        .getOrCreate()

messages_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [2]:
SYMBOLS_LIST = ['BTC.X', 'BSV.X', 'BCH.X', 'LTC.X', 'ETH.X', 'DOGE.X']

In [3]:
messages_df.count()

48911

### Create new dataframe with only required columns and filtered rows

In [4]:
messages_df = messages_df.select(messages_df['_id']['oid'].alias('_id'),
                   messages_df['body'],
                   messages_df['created_at'],
                   messages_df['entities']['sentiment']['basic'].alias('sentiment'),
                   messages_df['symbols']['symbol'].alias('symbols'),
                   messages_df['likes']['total'].alias('likes'),
                   messages_df['reshares']['reshared_count'].alias('reshares'))

def sum_interactions(likes, reshares):
    _sum = 1
    if likes:
        _sum += likes
    if reshares:
        _sum += reshares
    return _sum

def convert_sentiment(sentiment):
    if sentiment == "Bullish":
        return 2
    if sentiment == "Bearish":
        return 1
    return 0

def convert_date(dt):
    return datetime.strptime(dt, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d-%H')

interactionUdf = udf(sum_interactions, IntegerType())
sentimentUdf = udf(convert_sentiment, IntegerType())
dateHourUdf = udf(convert_date, StringType())

# remove null timestamps, id and symbols (else timestamp conversion will have problem)
messages_df = messages_df \
                .na \
                .drop(subset=['_id','created_at','symbols'])

messages_df = messages_df \
                .withColumn('interaction_count', interactionUdf('likes', 'reshares')) \
                .withColumn('sentiment', sentimentUdf('sentiment')) \
                .withColumn('created_at', dateHourUdf('created_at')) \

In [5]:
messages_df.first()

Row(_id='5e7ad06b80f71592bb059da0', body='$BTC.X When Stalin and the Red Army were closing in on Hitler, He committed suicide, Well the walls are closing in on Trump and I hope for mankind he does the same and takes others with him, I don’t normally wish death as it’s bad karma, But we are dealing with Satan so all bets are off!', created_at='2020-03-25-03', sentiment=0, symbols=['BTC.X'], likes=None, reshares=None, interaction_count=1)

#### Distribution of sentiments

In [6]:
messages_df.groupBy('sentiment').count().collect()

[Row(sentiment=1, count=4688),
 Row(sentiment=2, count=19340),
 Row(sentiment=0, count=24878)]

### Windowing


#### Unwrap array of symbols to new rows

In [7]:
# Since tweets can be attributed to more than one symbol, we unwrap the list into more rows
# this is okay as our final calculation will be grouped by symbol among other things

def weight_sentiment(sentiment, count):
    if sentiment == 0:
        return 0
    return sentiment * count

def handle_neutral_sentiment(sentiment, count):
    if sentiment == 0:
        return 0
    return count

weightedSentimentUdf = udf(weight_sentiment, IntegerType())
neutralSentimentUdf = udf(handle_neutral_sentiment, IntegerType())


messages_df = messages_df \
                .withColumn('weighted_sentiment', weightedSentimentUdf('sentiment', 'interaction_count')) \
                .withColumn('symbol', explode(messages_df['symbols']))

# remove duplicates after exploding
messages_df = messages_df.dropDuplicates()

# do not consider interaction count for neutral sentiment
messages_df = messages_df.withColumn('interaction_count', neutralSentimentUdf('sentiment', 'interaction_count'))
# filter to only those symbols that we care about
messages_df = messages_df.where(messages_df['symbol'].isin(SYMBOLS_LIST))

#### Include counts of sentiment for later aggr in groupby

In [8]:
def positive_sentiment_count(sentiment):
    return 1 if (sentiment == 2) else 0
    
def negative_sentiment_count(sentiment):
    return 1 if (sentiment == 1) else 0
    
def null_sentiment_count(sentiment):
    return 1 if (sentiment == 0) else 0


positiveSentimentCountUdf = udf(positive_sentiment_count, IntegerType())
negativeSentimentCountUdf = udf(negative_sentiment_count, IntegerType())
nullSentimentCountUdf = udf(null_sentiment_count, IntegerType())

messages_df = messages_df \
                .withColumn('positive', positiveSentimentCountUdf('sentiment')) \
                .withColumn('negative', negativeSentimentCountUdf('sentiment')) \
                .withColumn('null', nullSentimentCountUdf('sentiment')) 

In [9]:
messages_df.groupby('symbol').count().collect()

[Row(symbol='BCH.X', count=9683),
 Row(symbol='BTC.X', count=21249),
 Row(symbol='ETH.X', count=13774),
 Row(symbol='DOGE.X', count=8312),
 Row(symbol='BSV.X', count=8993),
 Row(symbol='LTC.X', count=11144)]

In [10]:
grouped_df = messages_df.groupby(['created_at', 'symbol']).agg(
    sum('interaction_count').alias('sum_interaction_count'),
    sum('weighted_sentiment').alias('sum_weighted_sentiment'),
    sum('positive').alias('positive_count'),
    sum('negative').alias('negative_count'),
    sum('null').alias('null_count'),
    count('_id').alias('volume_tweets'))

grouped_df = grouped_df.withColumn('overall_sentiment', grouped_df['sum_weighted_sentiment'] / grouped_df['sum_interaction_count'])

In [11]:
gdf = grouped_df.toPandas()

In [12]:
gdf[gdf['created_at'] == '2020-03-24-23']

Unnamed: 0,created_at,symbol,sum_interaction_count,sum_weighted_sentiment,positive_count,negative_count,null_count,volume_tweets,overall_sentiment
487,2020-03-24-23,BSV.X,70,140,14,0,0,14,2.0
1078,2020-03-24-23,BCH.X,239,240,1,34,0,35,1.004184
1936,2020-03-24-23,ETH.X,249,260,4,34,4,42,1.044177
2011,2020-03-24-23,LTC.X,238,238,0,34,10,44,1.0
2275,2020-03-24-23,BTC.X,249,260,4,34,11,49,1.044177


In [13]:
grouped_df.groupby('symbol').count().collect()

[Row(symbol='BCH.X', count=256),
 Row(symbol='BTC.X', count=682),
 Row(symbol='ETH.X', count=583),
 Row(symbol='DOGE.X', count=192),
 Row(symbol='BSV.X', count=319),
 Row(symbol='LTC.X', count=405)]

## Historical Price Data

In [21]:
import pandas as pd 
from datetime import datetime
read_func = lambda x: spark.read.format('csv').load(x, header=True, inferSchema=True)

def format_date(date, sym):
    if sym in ['BCH.X', 'DOGE.X']:
        return datetime.strptime(date, '%Y-%m-%d %I-%p').strftime("%Y-%m-%d-%H")
    return datetime.strptime(date, '%m/%d/%y %H:%M').strftime("%Y-%m-%d-%H")

# needed for join with crypto data
def only_date(date):
    return datetime.strptime(date, "%Y-%m-%d-%H").strftime("%Y-%m-%d")
    
formatDateUdf = udf(format_date, StringType())
dateUdf = udf(only_date, StringType())

BTC_prices = read_func("data/gemini_BTCUSD_1hr.csv") 
ETH_prices = read_func("data/gemini_ETHUSD_1hr.csv") 
LTC_prices = read_func("data/gemini_LTCUSD_1hr.csv") 
BCH_prices = read_func("data/Bitbay_BCHUSD_1h.csv") 
DOGE_prices = read_func("data/Yobit_DOGERUR_1h.csv") 

price_df_lists = [
    ('BTC.X', BTC_prices),
    ('ETH.X', ETH_prices),
    ('LTC.X', LTC_prices),
    ('BCH.X', BCH_prices),
    ('DOGE.X', DOGE_prices)
]

#### 1. Datetime and Symbols

In [22]:
new_dfs = []
for sym, price_df in price_df_lists:
    new_dfs.append(price_df \
                .withColumn('Symbol', lit(sym)) \
                .withColumn('Date', formatDateUdf('Date', 'Symbol')) \
                .withColumn('Only_date', dateUdf('Date'))
                .drop('Unix Timestamp'))
price_df_lists = new_dfs

#### 2. Doge (RUR to USD)

In [23]:
RUR_USD = 0.013

def convert_doge(price):
    return price * RUR_USD

convertPriceUdf = udf(convert_doge, DoubleType())

for col in ['Open', 'High', 'Low', 'Close']:
    price_df_lists[-1] = price_df_lists[-1].withColumn(col, convertPriceUdf(col))
    
#DOGE
price_df_lists[-1] = price_df_lists[-1].drop('Volume ERUR').withColumnRenamed('Volume DOG', 'Volume')
#BCH
price_df_lists[-2] = price_df_lists[-2].drop('Volume USD').withColumnRenamed('Volume BCH', 'Volume')

In [24]:
for df in price_df_lists:
    print(df.first())

Row(Date='2020-04-18-00', Symbol='BTC.X', Open=7036.26, High=7064.99, Low=7028.23, Close=7064.99, Volume=4.22790759, Only_date='2020-04-18')
Row(Date='2020-04-18-00', Symbol='ETH.X', Open=170.8, High=170.82, Low=170.8, Close=170.82, Volume=0.003956, Only_date='2020-04-18')
Row(Date='2020-04-18-00', Symbol='LTC.X', Open=42.23, High=42.36, Low=42.16, Close=42.36, Volume=119.65145, Only_date='2020-04-18')
Row(Date='2020-04-01-11', Symbol='BCH.X', Open=210.85, High=210.85, Low=210.85, Close=210.85, Volume=0.0, Only_date='2020-04-01')
Row(Date='2018-08-22-23', Symbol='DOGE.X', Open=0.0021333, High=0.0021333, Low=0.0021294, Close=0.0021294, Volume=0.0, Only_date='2018-08-22')


In [25]:
from functools import reduce
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

prices_df = unionAll(*price_df_lists)

## Joining tweets with price data (tweets are k-hour earlier)

In [40]:
k_list = [2, 5, 12, 24, 48]
tweetsprice_df_list = []

In [41]:
from pyspark.sql.functions import isnan

for k in k_list:
    merged_df = grouped_df.join(prices_df, \
                               (grouped_df.symbol == prices_df.Symbol)&(grouped_df.created_at == prices_df.Date), \
                                how='left').drop(prices_df.Symbol) 
    unmatched_count = merged_df.filter((merged_df["Date"] == "") | 
                                       merged_df["Date"].isNull() | 
                                       isnan(merged_df["Date"])).count()
    print("Number of unmatched tweet group for k = " + str(k) + ": "+ str(unmatched_count))
    print("Number of matched tweet groups for k = " + str(k) + ": "+ str(merged_df.count() - unmatched_count) + "\n")
    tweetsprice_df_list.append(merged_df)

Number of unmatched tweet group for k = 2: 954
Number of matched tweet groups for k = 2: 1513

Number of unmatched tweet group for k = 5: 954
Number of matched tweet groups for k = 5: 1513

Number of unmatched tweet group for k = 12: 954
Number of matched tweet groups for k = 12: 1513

Number of unmatched tweet group for k = 24: 954
Number of matched tweet groups for k = 24: 1513

Number of unmatched tweet group for k = 48: 954
Number of matched tweet groups for k = 48: 1513



## Cryptocurrency-specific data

In [42]:
BTC_data = read_func("data/btc.csv") 
ETH_data = read_func("data/eth.csv") 
LTC_data = read_func("data/ltc.csv") 
BCH_data = read_func("data/bch.csv") 
DOGE_data = read_func("data/doge.csv") 

cryptospec_df_lists = [
    ('BTC.X', BTC_data),
    ('ETH.X', ETH_data),
    ('LTC.X', LTC_data),
    ('BCH.X', BCH_data),
    ('DOGE.X', DOGE_data)
]

#### Get common columns

In [43]:
# Columns that we are interested in and commonly refered in cryptocurrency data 
# Transaction count, volume, fees
# Network value, Realized cap 
# Active addresses: ADA.AdrActCnt
# Payment count, Average difficulty: DiffMean, Hash rate
# Block size, Block count, Current supply

common_col = ['date', 'AdrActCnt', 'TxCnt', 'TxTfrValAdjUSD', 'ROI30d', 'HashRate', 
              'BlkCnt', 'VtyDayRet180d', 'CapMrktCurUSD', 'SplyCur', 'ROI1yr',
              'BlkSizeMeanByte','VtyDayRet60d','VtyDayRet30d', 'FeeTotUSD',
              'DiffMean']

# make sure all the crypto_df has the columns before union
for sym, crypto_df in cryptospec_df_lists:
    common_col = list(set(common_col) & set(crypto_df.columns))

#### Concat data

In [44]:
crypto_dfs = []
for sym, crypto_df in cryptospec_df_lists:
    crypto_dfs.append(crypto_df \
                .select([col for col in crypto_df.columns if col in common_col])  
                .where(crypto_df['date'] > lit("2020-03-10")) 
                .withColumnRenamed('coin', 'symbol') \
                .withColumn('symbol', lit(sym)))
    
cryptos_df = unionAll(*crypto_dfs)        

## Joining tweets-price with crypto-specific data

In [46]:
tweetspricecrypto_df_list = []

for tweetsprice in tweetsprice_df_list:
    merged_df = tweetsprice.join(cryptos_df, \
                               (tweetsprice.symbol == cryptos_df.symbol)&(tweetsprice.Only_date == cryptos_df.date), \
                                how='left') 
    tweetspricecrypto_df_list.append(merged_df)

In [47]:
tweetspricecrypto_df_list[0].show()

+-------------+------+---------------------+----------------------+--------------+--------------+----------+-------------+------------------+-------------+-------+-------+-------+-------+-----------+----------+----------+---------+------+----------------+--------------------+--------------------+------------------+--------------------+--------------------+-------------------+-------------------+------+--------------------+--------------------+--------------------+-------------------+------+
|   created_at|symbol|sum_interaction_count|sum_weighted_sentiment|positive_count|negative_count|null_count|volume_tweets| overall_sentiment|         Date|   Open|   High|    Low|  Close|     Volume| Only_date|      date|AdrActCnt|BlkCnt| BlkSizeMeanByte|       CapMrktCurUSD|            DiffMean|         FeeTotUSD|            HashRate|              ROI1yr|             ROI30d|            SplyCur| TxCnt|      TxTfrValAdjUSD|       VtyDayRet180d|        VtyDayRet30d|       VtyDayRet60d|symbol|
+-------

In [None]:
spark.stop()
