In [1]:
import urllib.request
import json
import os
from time import gmtime, strftime
from IPython.display import clear_output
import time
from datetime import timedelta
import sklearn
import numpy as np
import pandas as pd
import xgboost as xgb
import datetime
from time import sleep
import matplotlib.pyplot as plt
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/root/spark-2.3.3-bin-hadoop2.7/"
CURRENCY_CONVERT = "USD"
CURRENCY = "bitcoin"
CURRENCY_SYMBOL = "BTC"
BASE_URL = 'https://min-api.cryptocompare.com/data/price'
CRYPTO_FOLDER = f"data/crypto/{CURRENCY_SYMBOL}"
TWITTER_FOLDER = f"data/twitter/{CURRENCY_SYMBOL}"

%matplotlib inline

In [2]:
def preprocess_data(dataframe):
    del dataframe['time']

    dataframe = dataframe.dropna() # drop missing values
    y = dataframe.iloc[:, 1]  # Get label column 
    y = y.values

    dataframe = dataframe.drop(columns='label') # Drop the label column
    X = dataframe.values

    return X, y

def bootstrap_model(dataframe):

    X_train, y_train = preprocess_data(dataframe)

    my_params = {'objective':'reg:squarederror', 'verbose':False, 'n_estimators': 500, 'max_depth': 4}

    model = xgb.XGBRegressor(**my_params)

    model.fit(X_train, y_train)

    model.save_model('btc_model.model')


def predict_price(avg_score, prev_close, rolling_avg_val):
    x_test = list((avg_score, prev_close, rolling_avg_val))
    model = xgb.XGBRegressor()
    model.load_model('btc_model.model')
    pred_price = model.predict(x_test)
    return pred_price.tolist()[0]


def retrain_model(avg_score, prev_close, rolling_avg_val, y_train_sample):
    x_train_sample = list((avg_score, prev_close, rolling_avg_val))
    x_train_sample = np.array(x_train_sample)    
    x_train_sample = x_train_sample.reshape(1,3)
    model = xgb.XGBRegressor()

    model.fit(x_train_sample, np.array(y_train_sample).reshape(1,1), xgb_model = 'btc_model.model')
    model.save_model('btc_model.model')

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext, HiveContext
from pyspark.streaming import StreamingContext
import pyspark

In [4]:
sc = SparkSession.builder \
    .appName("model").master('local').enableHiveSupport().getOrCreate()

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [5]:
crypto_data=sc.read.option('header', True).option('inferSchema', True).csv(f"{CRYPTO_FOLDER}/{CURRENCY}_currency_grouped.csv")

In [6]:
twitter_data=sc.read.option('header', True).option('inferSchema', True).csv(f"{TWITTER_FOLDER}/{CURRENCY}_tweets_grouped.csv").withColumnRenamed('CreatedAt','time')

In [7]:
cond=[crypto_data.time == twitter_data.time]

In [8]:
data_train=twitter_data.join(crypto_data,cond,'left').select(twitter_data.time,twitter_data.score,crypto_data.close)

In [9]:
from pyspark.sql.functions import lag,col,avg
from pyspark.sql.window import Window

w=Window.partitionBy().orderBy(col("time"))
data_train=data_train.select("*", lag("close").over(w).alias("previous_close"))

In [10]:
hour = lambda i: i*(60*60)
w = (Window.orderBy(col("time").cast('long')).rangeBetween(-hour(1),0))
data_train=data_train.withColumn('rolling_average',avg(data_train['previous_close']).over(w))

In [11]:
data_train=data_train.na.drop(subset=["previous_close"])
data_train=data_train.na.drop(subset=["close"])
data_train = data_train.select("time","score",col("close").alias("label"),"previous_close","rolling_average")

In [12]:
data_train_pd = data_train.toPandas()

In [13]:
bootstrap_model(data_train_pd)

In [14]:
columns = ['time', 'score', 'label', 'previous_close', 'rolling_average']

time_stamp = time.strftime("%Y-%m-%d %H:%M:00", time.gmtime())
data_train.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT time, label FROM table_df ORDER BY time DESC limit 1"""
latest_rec = sqlContext.sql(query_latest_rec)
prev_close = latest_rec.collect()[0]["label"]
latest_time = latest_rec.collect()[0]["time"]
delta = timedelta(hours=1)
window_time = latest_time - delta

query_rolling_avg = "SELECT previous_close FROM table_df WHERE TIME > \'%s\' AND TIME <= \'%s\' ORDER BY time DESC" % (window_time, latest_time)
avg_rec = sqlContext.sql(query_rolling_avg)
#     avg_rec.show()

new_close_row = sc.createDataFrame([(prev_close,)], ['previous_close'])
#     new_close_row.show()

avg_rec = avg_rec.union(new_close_row)
#     avg_rec.show()

rolling_avg_val = avg_rec.agg(avg(col('previous_close'))).collect()[0]['avg(previous_close)']


## Run Twitter Stream Now

In [None]:
while(True):
    sleep(60)
    
    stream = open("streamdata.txt","r+")
    past_minute_data = stream.read().split(",")
    stream.seek(0)
    stream.truncate() # Deleting contents of file and getting ready for next 

    past_minute_data = list(map(float,past_minute_data[:-1]))
    if( len(past_minute_data) ==0 ):
        print("No data for last minute. Skipping..")
        continue
        
    last_minute_score = sum(past_minute_data)
    
    columns = ['time', 'score', 'label', 'previous_close', 'rolling_average']

    time_stamp = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
    data_train.createOrReplaceTempView("table_df")
    query_latest_rec = """SELECT time, label FROM table_df ORDER BY time DESC limit 1"""
    latest_rec = sqlContext.sql(query_latest_rec)
    prev_close = latest_rec.collect()[0]["label"]
    latest_time = latest_rec.collect()[0]["time"]
    delta = timedelta(hours=1)
    window_time = latest_time - delta

    query_rolling_avg = "SELECT previous_close FROM table_df WHERE TIME > \'%s\' AND TIME <= \'%s\' ORDER BY time DESC" % (window_time, latest_time)
    avg_rec = sqlContext.sql(query_rolling_avg)

    new_close_row = sc.createDataFrame([(prev_close,)], ['previous_close'])

    avg_rec = avg_rec.union(new_close_row)

    rolling_avg_val = avg_rec.agg(avg(col('previous_close'))).collect()[0]['avg(previous_close)']
    
    predicted_price = predict_price(last_minute_score, prev_close, rolling_avg_val)
    clear_output()
    print("Oracle predicts that the next minute " + CURRENCY +" value will be $"+repr(predicted_price))
    
    contents = urllib.request.urlopen(
        "%s?fsym=%s&tsyms=%s"%(BASE_URL,CURRENCY_SYMBOL,CURRENCY_CONVERT)
    ).read()
    json_string = contents.decode("utf-8")
    current_price = json.loads(json_string)[CURRENCY_CONVERT]
    
#     print("The current "+ CURRENCY+ "value is " + repr(current_price))
    
#     print("Oracle updates itself to get beter next time..\n")
    
    retrain_model(last_minute_score, prev_close, rolling_avg_val,current_price)
    newRow = sc.createDataFrame([(time_stamp, last_minute_score, current_price, prev_close, rolling_avg_val)], columns)
    data_train = data_train.union(newRow)
    
    # Insert new row into Hive
    sc.sql('INSERT INTO TABLE training_data values (\'%s\', %s, %s, %s, %s)' % (time_stamp, last_minute_score, current_price, prev_close, rolling_avg_val))
    
    oracle_db = open("oracle_db.txt","a+")
    oracle_db.write(repr(predicted_price)+","+repr(current_price)+"\n")
    oracle_db.close()
    sc.sql("INSERT INTO TABLE oracle_db values (%s,%s)" % (predicted_price, current_price))
    plot_data = np.genfromtxt("oracle_db.txt",delimiter= ",")
    
    plt.style.use('ggplot')
    plt.ion()
    fig = plt.figure(figsize=(13,6))
    ax = fig.add_subplot(111)

    # create a variable for the line so we can later update it
    line1, = ax.plot(plot_data[:,0],'-o', color="red", alpha=0.8,label ='Predicted Price')
    line2, = ax.plot(plot_data[:,1],'-o',color="blue", alpha=0.8,label ='Actual Price')
    ax.legend()
    #update plot label/title
    plt.ylabel('USD ($)')
    plt.xlabel('Time elapsed (mins)')
    
    plt.show()
    

In [14]:
data_train_pd.to_csv('final30daydata.csv',index = False, header=False)