Mount Google Drive

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Initialize Spark

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark.sql import SparkSession, functions
import random

spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

In [3]:
# Install Libraries
!pip install autocorrect

Collecting autocorrect
  Downloading autocorrect-2.6.1.tar.gz (622 kB)
[?25l[K     |▌                               | 10 kB 12.9 MB/s eta 0:00:01[K     |█                               | 20 kB 17.3 MB/s eta 0:00:01[K     |█▋                              | 30 kB 21.2 MB/s eta 0:00:01[K     |██                              | 40 kB 14.4 MB/s eta 0:00:01[K     |██▋                             | 51 kB 5.7 MB/s eta 0:00:01[K     |███▏                            | 61 kB 6.6 MB/s eta 0:00:01[K     |███▊                            | 71 kB 7.5 MB/s eta 0:00:01[K     |████▏                           | 81 kB 7.5 MB/s eta 0:00:01[K     |████▊                           | 92 kB 8.3 MB/s eta 0:00:01[K     |█████▎                          | 102 kB 8.6 MB/s eta 0:00:01[K     |█████▉                          | 112 kB 8.6 MB/s eta 0:00:01[K     |██████▎                         | 122 kB 8.6 MB/s eta 0:00:01[K     |██████▉                         | 133 kB 8.6 MB/s eta 0:00:01[K 

In [5]:
import sys
import pandas as pd
import numpy as np
import os
import nltk
import string
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from datetime import datetime
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...




True

Import Datasets

In [6]:
btc_prices_raw = spark.read.csv("/content/drive/MyDrive/BTCUSD_daily.csv",sep=',',header=True,inferSchema=True).drop("unix").drop("symbol").drop("Volume USD").dropna(how="any")

In [7]:
btc_prices_raw.printSchema()
btc_prices_raw.show(10)

root
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- Volume BTC: double (nullable = true)

+-------------------+--------+--------+--------+--------+-------------+
|               date|    open|    high|     low|   close|   Volume BTC|
+-------------------+--------+--------+--------+--------+-------------+
|2022-03-31 00:00:00|47086.07|47341.83|46993.29|47173.36|  38.14053622|
|2022-03-30 00:00:00|47459.03|47721.41|46572.15|47068.08|1627.54321756|
|2022-03-29 00:00:00|47152.38|48128.87|46941.84|47459.03|1716.32392308|
|2022-03-28 00:00:00|46854.96| 48234.0|46672.25|47152.38|2691.93784761|
|2022-03-27 00:00:00|44553.24| 46950.0| 44456.9|46864.39|1548.88890527|
|2022-03-26 00:00:00|44340.49|44815.31| 44101.0|44535.65|  494.7242021|
|2022-03-25 00:00:00|44025.99|45137.12|43616.88| 44320.6| 1725.0715699|
|2022-03-24 00:00:00|42912.21| 44240.0|42636.54|4

In [8]:
tweets_raw = spark.read.csv("/content/drive/MyDrive/tweets.csv",sep=';',header=True,inferSchema=True,multiLine=True).drop("id").drop("user").drop("fullname").drop("url").drop("replies").drop("retweets").dropna(how="any")

In [9]:
tweets_raw.printSchema()
tweets_raw.show(10)

root
 |-- timestamp: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- text: string (nullable = true)

+--------------------+-----+-------------------------------------+
|
+--------------------+-----+-------------------------------------+
|2019-05-27 11:49:...|    0|                 È appena uscito u...|
|2019-05-27 11:49:...|    0|                 Cardano: Digitize...|
|2019-05-27 11:49:...|    2|                 Another Test twee...|
|2019-05-27 11:49:...|    0|                 Current Crypto Pr...|
|2019-05-27 11:49:...|    0|                 Spiv (Nosar Baz):...|
|2019-05-27 11:49:...|    0|                 #btc inceldiği ye...|
|2019-05-27 11:49:...|    0|                 @nwoodfine We hav...|
|2019-05-27 11:49:...|    0|                 @pedronauck como ...|
|2019-05-27 11:49:...|    0|ブラジルはまぁ置いといてもドイツは...|
|2019-05-27 11:49:...|    0|                 CHANGE IS COMING....|
+--------------------+-----+-------------------------------------+
only showing top 10 row

In [10]:
import string
from pyspark.sql.functions import *
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
nltk.download('omw')
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('words')
from nltk.corpus import wordnet
from nltk.corpus import stopwords
from nltk.corpus import words
import re
from autocorrect import Speller
from nltk.stem import WordNetLemmatizer

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package omw to /root/nltk_data...
[nltk_data]   Unzipping corpora/omw.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Unzipping corpora/words.zip.


In [11]:
tokenizer = RegexpTokenizer(r"[\w']+") # tokenizer using regular expressions
spell = Speller(lang='en')
lemmatizer = WordNetLemmatizer() 
words = set(nltk.corpus.words.words())
english_stopwords = set(stopwords.words("english"))

def simple_tokenize(s):
    return re.findall(r"[a-z]+(?:'[a-z]+)?",s.lower())

def get_wordnet_pos(word):
    '''
    get_wordnet_pos(word) Maps POS tag for word (i.e. noun, adjective etc.) to be used in 
    lemmatization function
    get_wordnet_pos: String -> String
    '''
    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}

    return tag_dict.get(tag, wordnet.NOUN)

def regex_clean(col):
    col = re.sub(r'http\S+', '', col)
    col = re.sub(r'[^a-zA-Z\s]', '', col, flags=re.UNICODE)
    col = re.sub(r'\b[a-zA-Z]\b', '', col)
    col = re.sub(r'\n', '', col)
    col = re.sub(r'\r', '', col)
    col = re.sub(r' +', ' ', col)
    col = re.sub(r'&amp', '', col)
    return col

def filter_minimum_like(x):
  try:
    likes = int(x[1])
  except:
    return False

  if likes < 100:
    return False
  else:
    return True

tweets_rdd = tweets_raw.rdd
tweets_rdd = tweets_rdd.filter(filter_minimum_like)
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], str(x[2]).lower()))
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], regex_clean(x[2])))
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], tokenizer.tokenize(x[2])))
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], [word for word in x[2] if word not in (english_stopwords)]))
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], [lemmatizer.lemmatize(y, get_wordnet_pos(y)) for y in x[2]]))
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], [w for w in x[2] if len(w) > 2]))
tweets_rdd = tweets_rdd.map(lambda x : (x[0], x[1], [w for w in x[2] if w.lower() in words or not w.isalpha()]))
tweets_rdd = tweets_rdd.filter(lambda x : len(x[2]) >= 3)
tweets_rdd = tweets_rdd.map(lambda x : (x[0], " ".join(x[2])))

In [14]:
tweets_rdd.take(10)
# When you run this block, it will fail for the first 2 times with the following error:
#   PicklingError: args[0] from __newobj__ args has the wrong class
#   It should work on the third try. it has something to do with stopwords not being broadcasted through all the spark nodes
#   this is not super important to fix, running it twice and fail gives it enough time to initialize accross nodes and will
#   work perfectly, thus it will not be fixed right now. (fix another time maybe when time allows)

[('2019-05-27 08:13:06+00', 'price hit new high whats drive hypnotic rally'),
 ('2019-05-02 17:36:29+00', 'cab taxi ride ride'),
 ('2019-05-27 01:37:37+00', 'may pump bet season'),
 ('2019-05-26 20:57:45+00', 'hit high sudden parabolic swing'),
 ('2019-05-26 19:58:37+00',
  'really gaslighting entire thread question never settle know lot try debate criticize learn improve without jerk'),
 ('2019-05-27 01:57:40+00',
  'someone check seem come back vengeance ever since block twitter'),
 ('2019-05-27 00:02:39+00', 'keep eye prize bear get'),
 ('2019-05-27 11:33:39+00', 'new release hour ago development developer'),
 ('2019-05-27 03:29:34+00', 'thanks ruin target guy'),
 ('2019-05-27 11:18:11+00', 'bought dip breakout rally')]

In [15]:
tweets = tweets_rdd.toDF(["date", "text"])
tweets.printSchema()

root
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)



Convert BTC prices to percent changes

In [16]:
def compute_percent_change(row):
  date, open, high, low, close, volume = row

  range = high - low
  percent_change = (close - open) * 100 / open
  
  return (date.date(), "%.2f" % percent_change, range, volume)

btc_daily_changes = btc_prices_raw.rdd \
                                  .map(compute_percent_change) \
                                  .toDF(["date","btc_percent_change","btc_range","btc_volume"])

In [17]:
btc_daily_changes.createOrReplaceTempView("btc_daily_changes")
btc_daily_changes.printSchema()
btc_daily_changes.show(10)

root
 |-- date: date (nullable = true)
 |-- btc_percent_change: string (nullable = true)
 |-- btc_range: double (nullable = true)
 |-- btc_volume: double (nullable = true)

+----------+------------------+------------------+-------------+
|      date|btc_percent_change|         btc_range|   btc_volume|
+----------+------------------+------------------+-------------+
|2022-03-31|              0.19| 348.5400000000009|  38.14053622|
|2022-03-30|             -0.82| 1149.260000000002|1627.54321756|
|2022-03-29|              0.65| 1187.030000000006|1716.32392308|
|2022-03-28|              0.63|           1561.75|2691.93784761|
|2022-03-27|              5.19|2493.0999999999985|1548.88890527|
|2022-03-26|              0.44| 714.3099999999977|  494.7242021|
|2022-03-25|              0.67|1520.2400000000052| 1725.0715699|
|2022-03-24|              2.60|1603.4599999999991|2173.30497451|
|2022-03-23|              1.25|1242.6200000000026|1906.72354645|
|2022-03-22|              3.35| 2436.2800000000

Convert tweets to scores

In [18]:
def compute_score(row):  
  # row[1] is the text of the tweet. Please refer to the regex column above.
  score = SentimentIntensityAnalyzer().polarity_scores(row[1])["compound"]
  if score == 0.0:
    return []
  
  date = datetime.strptime("{}00".format(row[0]), "%Y-%m-%d %H:%M:%S%z") \
              .date()

  return [(date, score)]

tweet_daily_scores = tweets.rdd \
                        .flatMap(compute_score) \
                        .toDF(["date","score"]) \
                        .cache()

In [19]:
tweet_daily_scores.createOrReplaceTempView("tweet_daily_scores")
tweet_daily_scores.printSchema()
tweet_daily_scores.show(10)

root
 |-- date: date (nullable = true)
 |-- score: double (nullable = true)

+----------+-------+
|      date|  score|
+----------+-------+
|2019-05-26| 0.3261|
|2019-05-27|-0.4404|
|2019-05-27| 0.5106|
|2019-05-27|-0.2263|
|2019-05-27| 0.4215|
|2019-05-25| 0.7964|
|2019-05-25|   0.91|
|2019-05-03| 0.8442|
|2019-05-11|-0.5216|
|2019-05-15| 0.7964|
+----------+-------+
only showing top 10 rows



In [20]:
lstm_movements = spark.read.csv("/content/drive/MyDrive/btc_lstm_forecast.csv",sep=',',header=True,inferSchema=True)

In [21]:
lstm_movements.printSchema()
lstm_movements.show(10)

root
 |-- Date: timestamp (nullable = true)
 |-- Forecast: double (nullable = true)

+-------------------+--------+
|               Date|Forecast|
+-------------------+--------+
|2014-11-28 00:00:00|     0.0|
|2014-12-01 00:00:00|     1.0|
|2014-12-02 00:00:00|     1.0|
|2014-12-03 00:00:00|     1.0|
|2014-12-04 00:00:00|     1.0|
|2014-12-05 00:00:00|     1.0|
|2014-12-08 00:00:00|     1.0|
|2014-12-09 00:00:00|     1.0|
|2014-12-10 00:00:00|     1.0|
|2014-12-11 00:00:00|     1.0|
+-------------------+--------+
only showing top 10 rows



In [22]:
def clean_date(row):
  
  date = row[0].date()

  return [(date, row[1])]

lstm_daily = lstm_movements.rdd \
                        .flatMap(clean_date) \
                        .toDF(["date","forecast"]) \
                        .cache()

In [23]:
lstm_daily.show(10)

+----------+--------+
|      date|forecast|
+----------+--------+
|2014-11-28|     0.0|
|2014-12-01|     1.0|
|2014-12-02|     1.0|
|2014-12-03|     1.0|
|2014-12-04|     1.0|
|2014-12-05|     1.0|
|2014-12-08|     1.0|
|2014-12-09|     1.0|
|2014-12-10|     1.0|
|2014-12-11|     1.0|
+----------+--------+
only showing top 10 rows



Combine BTC daily changes with tweet scores

In [24]:
results = btc_daily_changes.join(
      tweet_daily_scores.groupBy("date").agg({"score": "mean"}),
      on="date"
    ).withColumnRenamed("avg(score)","twitter_sentiment") \
    .withColumnRenamed("percent_change","btc_percent_change") \
    .cache()

results = results.join(lstm_daily.groupBy("date").agg({"Forecast": "mean"}), on="date").withColumnRenamed("avg(Forecast)","lstm_forecast")

results.printSchema()
results.show(10)

root
 |-- date: date (nullable = true)
 |-- btc_percent_change: string (nullable = true)
 |-- btc_range: double (nullable = true)
 |-- btc_volume: double (nullable = true)
 |-- twitter_sentiment: double (nullable = true)
 |-- lstm_forecast: double (nullable = true)

+----------+------------------+------------------+--------------+-------------------+-------------+
|      date|btc_percent_change|         btc_range|    btc_volume|  twitter_sentiment|lstm_forecast|
+----------+------------------+------------------+--------------+-------------------+-------------+
|2019-11-22|             -4.29| 929.6999999999998|20120.14646602|0.14146783625731005|         -1.0|
|2019-11-21|             -5.86| 722.8299999999999| 9374.99957046| 0.2574916201117318|         -1.0|
|2019-11-20|             -0.44|203.76000000000113| 2996.00619203|0.37080331125827803|         -1.0|
|2019-11-19|             -0.68| 208.6200000000008| 4412.39464174| 0.2568555555555556|         -1.0|
|2019-11-18|             -3.87| 4

In [25]:
results_count = results.count()
print("Number of results:", results_count)
true_positive_count = results.filter(
      "(btc_percent_change < 0 and twitter_sentiment < 0) or (btc_percent_change >= 0 and twitter_sentiment >= 0)"
    ).count()
print("Number of true positives:", true_positive_count)
print("Number of true negatives:", results_count - true_positive_count)
print("Accuracy: {}%".format("%.2f" % (true_positive_count * 100 / results_count)))

Number of results: 721
Number of true positives: 455
Number of true negatives: 266
Accuracy: 63.11%


Incorporate BTC volume and volatility into sentiment analysis score

In [26]:
avg_btc_range = btc_daily_changes.select(functions.avg("btc_range")).collect()[0][0]
max_btc_range = btc_daily_changes.select(functions.max("btc_range")).collect()[0][0]
avg_btc_volume = btc_daily_changes.select(functions.avg("btc_volume")).collect()[0][0]
max_btc_volume = btc_daily_changes.select(functions.max("btc_volume")).collect()[0][0]

# positive difference in range == high and low is larger than usual == more volatile == more fear == more negative sentiment
# negative difference in range == high and low is smaller than usual == less volatile == more greed == more positive sentiment
# positive difference in volume == more is being traded than usual == more greed == more positive sentiment
# negative difference in volume == less is being traded than usual == more fear == more negative sentiment

def combine_volume_volatility_with_sentiment(row):
  date, btc_percent_change, btc_range, btc_volume, twitter_sentiment, lstm_forecast = row
  return (date,
          btc_percent_change,
          twitter_sentiment,
          avg_btc_range - btc_range,
          btc_volume - avg_btc_volume, lstm_forecast)

intermediate_results = results.rdd \
    .map(combine_volume_volatility_with_sentiment) \
    .toDF(["date","btc_percent_change","twitter_sentiment","range_diff","volume_diff", "lstm_forecast"])

avg_twitter_sentiment = intermediate_results.select(functions.avg("twitter_sentiment")).collect()[0][0]
max_twitter_sentiment = intermediate_results.select(functions.max("twitter_sentiment")).collect()[0][0]
min_twitter_sentiment = intermediate_results.select(functions.min("twitter_sentiment")).collect()[0][0]
avg_range_diff = intermediate_results.select(functions.avg("range_diff")).collect()[0][0]
max_range_diff = intermediate_results.select(functions.max("range_diff")).collect()[0][0]
min_range_diff = intermediate_results.select(functions.min("range_diff")).collect()[0][0]
avg_volume_diff = intermediate_results.select(functions.avg("volume_diff")).collect()[0][0]
max_volume_diff = intermediate_results.select(functions.max("volume_diff")).collect()[0][0]
min_volume_diff = intermediate_results.select(functions.min("volume_diff")).collect()[0][0]

random_probability = 0.5

def compute_agg_score(row):
  date, btc_percent_change, twitter_sentiment, range_diff, volume_diff, lstm_forecast = row
  score = (0.15 * twitter_sentiment) + \
          (0.15 * lstm_forecast) + \
          ((0.35 * (range_diff - min_range_diff)) / (max_range_diff - min_range_diff)) + \
          ((0.35 * (volume_diff - min_volume_diff)) / (max_volume_diff - min_volume_diff))
  # if score < 0.35 and score > -0.35 and random.random() < random_probability:
  #   score *= -1
  return (date,
          btc_percent_change,
          score)

volume_volatility_results = intermediate_results.rdd.map(compute_agg_score).toDF(["date","btc_percent_change","score"])
volume_volatility_results.show(10)

+----------+------------------+-------------------+
|      date|btc_percent_change|              score|
+----------+------------------+-------------------+
|2019-11-22|             -4.29|0.15123706499452289|
|2019-11-21|             -5.86|0.18426601831725828|
|2019-11-20|             -0.44|0.24047869763276628|
|2019-11-19|             -0.68|0.22301997433175647|
|2019-11-18|             -3.87| 0.2106305497973267|
|2019-11-15|             -1.95|0.21558152187899748|
|2019-11-14|             -1.47|0.23632162826568293|
|2019-11-13|             -0.70|0.23438928324559938|
|2019-11-12|              1.07|0.22924683657734332|
|2019-11-11|             -3.43|0.21459712715571538|
+----------+------------------+-------------------+
only showing top 10 rows



In [27]:
volume_volatility_results_count = volume_volatility_results.count()
print("Number of results:", volume_volatility_results_count)
true_positive_count = volume_volatility_results.filter(
      "(btc_percent_change < 0 and score < 0) or (btc_percent_change >= 0 and score >= 0)"
    ).count()
print("Number of true positives:", true_positive_count)
print("Number of true negatives:", volume_volatility_results_count - true_positive_count)
print("Accuracy: {}%".format("%.2f" % (true_positive_count * 100 / volume_volatility_results_count)))

Number of results: 721
Number of true positives: 484
Number of true negatives: 237
Accuracy: 67.13%
