# MAS 649 Midterm
# Group 5
### Austin Gravely, Elias Eskind, Arlet Rodriguez, Cesar Diez, Juliano Torii
### 4/6/21

# Step 1: Loading & Splitting Data

## Loading in Data

In [None]:
btc = spark.read.csv('/FileStore/tables/BTC_USD.csv',inferSchema=True,header=True)
eth = spark.read.csv('/FileStore/tables/ETH_USD.csv',inferSchema=True,header=True)
theta = spark.read.csv('/FileStore/tables/THETA_USD.csv',inferSchema=True,header=True)

## Viewing Data

In [None]:
# Display
display(btc)
display(eth)
display(theta)

In [None]:
# View Schema
btc.printSchema()
eth.printSchema()
theta.printSchema()

## Dropping N/A's

In [None]:
btc2=btc.na.drop()
eth2=eth.na.drop()
theta2=theta.na.drop()

## Changing 'Date' Column from String to Date Datatype

In [None]:
from pyspark.sql.functions import to_date

# Bitcoin
btc2 = btc2.withColumn("Date", to_date(btc2['Date'], "M/d/yy"))
btc2.printSchema()

# Ethereum
eth2 = eth2.withColumn("Date", to_date(eth2['Date'], "M/d/yy"))
eth2.printSchema()

# Theta
theta2 = theta2.withColumn("Date", to_date(theta2['Date'], "M/d/yy"))
theta2.printSchema()

## Splitting data into Train & Test

### Bitcoin

In [None]:
# Train
train_databtc2 = btc2.limit(316)
display(train_databtc2)

#Test
test_databtc2 = btc2.orderBy(btc2["Date"].desc()).limit(30).orderBy("Date")
display(test_databtc2)

### Ethereum

In [None]:
# Train
train_dataeth2 = eth2.limit(316)
display(train_dataeth2)

#Test
test_dataeth2 = eth2.orderBy(eth2["Date"].desc()).limit(30).orderBy("Date")
display(test_dataeth2)

### Theta

In [None]:
# Train
train_datatheta2 = theta2.limit(316)
display(train_datatheta2)

#Test
test_datatheta2 = theta2.orderBy(theta2["Date"].desc()).limit(30).orderBy("Date")
display(test_datatheta2)

## Transforming Data

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'OCDiff',
 'Lag1',
 'Lag2',
 'Lag3',
 'Lag4'], outputCol = "features")

In [None]:
# Bitcoin
outputTrainbtc2 = assembler.transform(train_databtc2)
display(outputTrainbtc2)

outputTestbtc2 = assembler.transform(test_databtc2)
display(outputTestbtc2)

# Ethereum
outputTraineth2 = assembler.transform(train_dataeth2)
display(outputTraineth2)

outputTesteth2  = assembler.transform(test_dataeth2)
display(outputTesteth2)

# Theta
outputTraintheta2 = assembler.transform(train_datatheta2)
display(outputTraintheta2)

outputTesttheta2  = assembler.transform(test_datatheta2)
display(outputTesttheta2)

## Final Datasets

### Bitcoin

In [None]:
# Final Train
trainBtc = outputTrainbtc2.select("features", "ReturnY", "Date")
display(trainBtc)

# Final Test
testBtc = outputTestbtc2.select("features", "ReturnY", "Date")
display(testBtc)

# Displaying
display(trainBtc.describe())
display(testBtc.describe())

### Ethereum

In [None]:
#Final Train
trainEth = outputTraineth2.select("features", "ReturnY", "Date")
display(trainEth)

#Final Test
testEth = outputTesteth2.select("features", "ReturnY", "Date")
display(testEth)

# Displaying
display(trainEth.describe())
display(testEth.describe())

### Theta

In [None]:
#Final Train 
trainTheta = outputTraintheta2.select("features", "ReturnY", "Date")
display(trainTheta)

#Final Test 
testTheta = outputTesttheta2.select("features", "ReturnY", "Date")
display(testTheta)

# Displaying
display(trainTheta.describe())
display(testTheta.describe())

# Step 2: ML Algorithm

## Importing Required Packages

In [None]:
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor, DecisionTreeRegressor, LinearRegression

In [None]:
dt = DecisionTreeRegressor(labelCol = "ReturnY", featuresCol = "features")
rf = RandomForestRegressor(labelCol = "ReturnY", featuresCol = "features")
gb = GBTRegressor(labelCol = "ReturnY", featuresCol = "features")

## Training Models

In [None]:
# Bitcoin
dt_modelBtc = dt.fit(trainBtc)
rf_modelBtc = rf.fit(trainBtc)
gb_modelBtc = gb.fit(trainBtc)

# Ethereum
dt_modelEth = dt.fit(trainEth)
rf_modelEth = rf.fit(trainEth)
gb_modelEth = gb.fit(trainEth)

# Theta
dt_modelTheta = dt.fit(trainTheta)
rf_modelTheta = rf.fit(trainTheta)
gb_modelTheta = gb.fit(trainTheta)

## Testing Models

In [None]:
# Bitcoin
dt_predictionsBtc = dt_modelBtc.transform(testBtc)
rf_predictionsBtc = rf_modelBtc.transform(testBtc)
gb_predictionsBtc = gb_modelBtc.transform(testBtc)

# Ethereum
dt_predictionsEth = dt_modelEth.transform(testEth)
rf_predictionsEth = rf_modelEth.transform(testEth)
gb_predictionsEth = gb_modelEth.transform(testEth)

# Theta
dt_predictionsTheta = dt_modelTheta.transform(testTheta)
rf_predictionsTheta = rf_modelTheta.transform(testTheta)
gb_predictionsTheta = gb_modelTheta.transform(testTheta)

## Evaluator

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="ReturnY", predictionCol="prediction", metricName="rmse")

# Bitcoin
print('Bitcoin:')
print(evaluator.evaluate(dt_predictionsBtc))
print(evaluator.evaluate(rf_predictionsBtc))
print(evaluator.evaluate(gb_predictionsBtc))

# Ethereum
print('Ethereum:')
print(evaluator.evaluate(dt_predictionsEth))
print(evaluator.evaluate(rf_predictionsEth))
print(evaluator.evaluate(gb_predictionsEth))

# Theta
print('Theta:')
print(evaluator.evaluate(dt_predictionsTheta))
print(evaluator.evaluate(rf_predictionsTheta))
print(evaluator.evaluate(gb_predictionsTheta))

## Hyperparameter RF

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [2,4,6]).addGrid(rf.maxBins, [20,60]).build()

crossval = CrossValidator(estimator = rf, estimatorParamMaps=paramGrid, evaluator = evaluator, numFolds = 10)

In [None]:
# Training Models
cv_modelTheta = crossval.fit(trainTheta)
cv_modelEth = crossval.fit(trainEth)
cv_modelBtc = crossval.fit(trainBtc)

# RMSE:
# Bitcoin
print('Bitcoin:')
cv_predictionsBtc = cv_modelBtc.transform(testBtc)
print(evaluator.evaluate(cv_predictionsBtc))

# Ethereum
print('Ethereum:')
cv_predictionsEth = cv_modelEth.transform(testEth)
print(evaluator.evaluate(cv_predictionsEth))

# Theta
print('Theta:')
cv_predictionsTheta = cv_modelTheta.transform(testTheta)
print(evaluator.evaluate(cv_predictionsTheta))

# You may need to install mlflow if this chunk is taking forever
# Even so this chunk will take 5 minutes or so

## Elastic Net Hyperparameter Tuning

In [None]:
# Importing Packages
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol="ReturnY", featuresCol="features")
lrModel = lr.fit(trainTheta)
lrModel

# Training Models
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0, 0.5, 1]).build()
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
cv_modelTheta = crossval.fit(trainTheta)
cv_modelEth = crossval.fit(trainEth)
cv_modelBtc = crossval.fit(trainBtc)

# RMSE:
# Bitcoin
print('Bitcoin:')
cv2_predictionsBtc = cv_modelBtc.transform(testBtc)
print(evaluator.evaluate(cv2_predictionsBtc))

# Ethereum
print('Ethereum:')
cv2_predictionsEth = cv_modelEth.transform(testEth)
print(evaluator.evaluate(cv2_predictionsEth))

# Theta
print('Theta:')
cv2_predictionsTheta = cv_modelTheta.transform(testTheta)
print(evaluator.evaluate(cv2_predictionsTheta))

In [None]:
# import numpy as np
# print(cv_model.getEstimatorParamMaps()[np.argmax(cv_model.avgMetrics)])
# print(cv_model.bestModel._java_obj.getRegParam())
# print(cv_model.bestModel._java_obj.getElasticNetParam())

## Basic Linear Regression (for Ethereum since this Model had the Best RMSE)

In [None]:
from pyspark.ml.regression import LinearRegression
ethlr = LinearRegression(labelCol = "ReturnY", featuresCol = "features", solver = 'l-bfgs', regParam = 0.2, elasticNetParam = 0.6)

lrModelEth = ethlr.fit(trainEth)
# printing coeff and intercept
print("Coefficients: %s" % str(lrModelEth.coefficients))
print("Intercept: %s" % str(lrModelEth.intercept))

# more details about model
trainingSummary = lrModelEth.summary
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
#RMSE
testResultsEth = lrModelEth.evaluate(testEth)
rmseEth = testResultsEth.rootMeanSquaredError
rmseEth

# Step 3: Testing Best Models on Dataset

## Bitcoin

In [None]:
# Bitcoin Model with Lowest RMSE = ELASTIC NET HYPERPARAMETER TURNING
cv2_predictionsBtc = cv_modelBtc.transform(testBtc)
rmseBtc = evaluator.evaluate(cv2_predictionsBtc)
rmseBtc

## Ethereum

In [None]:
# Ethereum Model with Lowest RMSE = LINEAR REGRESSION
testResultsEth = lrModelEth.evaluate(testEth)
rmseEth = testResultsEth.rootMeanSquaredError
rmseEth

## Theta

In [None]:
# Theta Model with Lowest RMSE = RANDOM FOREST
rf_predictionsTheta = rf_modelTheta.transform(testTheta)
rmseTheta = evaluator.evaluate(rf_predictionsTheta)
rmseTheta

# Step 4: Average the RMSE

In [None]:
import numpy as np
rmseList = [rmseBtc, rmseEth, rmseTheta]
rmseMean = np.mean(rmseList)
print(rmseMean)

## The Avergae RMSE for BTC, ETH, & THETA is:  *0.04327*

# Step 5: Visualizations

## Ethereum Graphs

In [None]:
print(btc2.filter(btc2["Date"] > "2020-11-09").count())
print(eth2.filter(eth2["Date"] > "2020-11-09").count())
print(theta2.filter(theta2["Date"] > "2020-11-09").count())

In [None]:
display(eth2.select("ReturnY", "Date"))

In [None]:
display(eth2.select("Volume", "Date"))

In [None]:
display(eth2.select("Close","Low", "High", "Date"))

## Candlestick Graphs (Extra)

In [None]:
import plotly.graph_objects as go

### Bitcoin

In [None]:
#Converting dataframe from Pyspark to Pandas
df_btc = btc.toPandas()

fig = go.Figure(data=[go.Candlestick(x=df_btc['Date'],
              open=df_btc['Open'],
              high=df_btc['High'],
              low=df_btc['Low'],
              close=df_btc['Close'],
              increasing_line_color= 'magenta', decreasing_line_color= 'cyan')])

fig.update_layout(title='Bitcoin', yaxis_title='Price')

fig.show()

### Ethereum

In [None]:
#Converting dataframe from Pyspark to Pandas
df_eth = eth.toPandas()

fig = go.Figure(data=[go.Candlestick(x=df_eth['Date'],
              open=df_eth['Open'],
              high=df_eth['High'],
              low=df_eth['Low'],
              close=df_eth['Close'],
              increasing_line_color= 'magenta', decreasing_line_color= 'cyan')])

fig.update_layout(title='Etheruem', yaxis_title='Price')

fig.show()

### Theta

In [None]:
#Converting dataframe from Pyspark to Pandas
df_theta = theta.toPandas()

fig = go.Figure(data=[go.Candlestick(x=df_theta['Date'],
              open=df_theta['Open'],
              high=df_theta['High'],
              low=df_theta['Low'],
              close=df_theta['Close'],
              increasing_line_color= 'magenta', decreasing_line_color= 'cyan')])

fig.update_layout(title='Theta', yaxis_title='Price')

fig.show()

# Step 6: Options
### We chose options 1, 3, & 6 (poster is in the file)

## Option 1
##### Download (from finance.yahoo.com) and test your data on the next month of data for all three currencies.

In [None]:
# Load in new data
btcNM = spark.read.csv('/FileStore/tables/BTC_USDNM.csv',inferSchema=True,header=True)
ethNM = spark.read.csv('/FileStore/tables/ETH_USDNM.csv',inferSchema=True,header=True)
thetaNM = spark.read.csv('/FileStore/tables/THETA_USDNM.csv',inferSchema=True,header=True)

### Pretty much Step 1 again

In [None]:
display(btcNM)
btcNM.printSchema()
display(ethNM)
ethNM.printSchema()
display(thetaNM)
thetaNM.printSchema()

# Dropping NAs
btcNM = btcNM.na.drop()
ethNM = ethNM.na.drop()
thetaNM = thetaNM.na.drop()

# Asssembler
assembler = VectorAssembler(inputCols = ['Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'OCDiff',
 'Lag1',
 'Lag2',
 'Lag3',
 'Lag4'], outputCol = "features")

testbtcNM = assembler.transform(btcNM)
testethNM = assembler.transform(ethNM)
testthetaNM = assembler.transform(thetaNM)

# Final Test Data Bitcoin
testbtcNM2 = testbtcNM.select("features", "ReturnY", "Date")
display(testbtcNM2)

# Final Test Data Ethereum
testethNM2 = testethNM.select("features", "ReturnY", "Date")
display(testethNM2)

# Final Test Data Theta
testthetaNM2 = testthetaNM.select("features", "ReturnY", "Date")
display(testthetaNM)

### Results

#### Bitcoin

In [None]:
#RMSE
test_resultsbtcNM = cv_modelBtc.transform(testbtcNM2)
rmse_btcNM = evaluator.evaluate(test_resultsbtcNM)
rmse_btcNM

#### Etereum

In [None]:
#RMSE
test_resultsethNM = lrModelEth.evaluate(testethNM2)
rmse_ethNM = test_resultsethNM.rootMeanSquaredError
print(rmse_ethNM)

# Displaying residuals
display(test_resultsethNM.residuals)

#### Theta

In [None]:
# Finding rmse
test_resultsthetaNM = rf_modelTheta.transform(testthetaNM2)
rmse_thetaNM = evaluator.evaluate(test_resultsthetaNM)
rmse_thetaNM

## Option 3

##### Find a single algorithm that works well on all of the 3 crypto-currencies and see how it works on other crypto-currencies (download another crypto currency and predict it for 30 days). You may choose your timeframes for both train and test.

### Loading in and Transforming Dogecoin Dataset

In [None]:
#Loading in dataset
doge = spark.read.csv('/FileStore/tables/DOGE_USD.csv',inferSchema=True,header=True)
#display(doge)
doge.printSchema()

In [None]:
# Dropping na
doge2=doge.na.drop()

# Changing the date datatype to date
from pyspark.sql.functions import to_date
doge2 = doge2.withColumn("Date", to_date(doge2['Date'], "M/d/yy"))
doge2.printSchema()

# Splitting into traingin and testing dataset
train_datadoge2 = doge2.limit(315)
test_datadoge2 = doge2.orderBy(doge2["Date"].desc()).limit(30).orderBy("Date")

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'OCDIff',
 'Lag1',
 'Lag2',
 'Lag3',
 'Lag4'], outputCol = "features")

outputTraindoge2 = assembler.transform(train_datadoge2)
outputTestdoge2 = assembler.transform(test_datadoge2)

#Final Train DOGE
trainDoge = outputTraindoge2.select("features", "ReturnY", "Date")
#Final Test DOGE
testDoge = outputTestdoge2.select("features", "ReturnY", "Date")

# Example
display(trainDoge)

In [None]:
from pyspark.ml.regression import LinearRegression

dogeLR = LinearRegression(labelCol = "ReturnY", featuresCol = "features", solver = 'l-bfgs', regParam=0.7)

lrModelDoge = dogeLR.fit(trainDoge)
# printing coeff and intercept
print("Coefficients: %s" % str(lrModelDoge.coefficients))
print("Intercept: %s" % str(lrModelDoge.intercept))

# more details about model
trainingSummary = lrModelDoge.summary
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
#RMSE
testResultsDoge = lrModelDoge.evaluate(testDoge)
rmseDoge = testResultsDoge.rootMeanSquaredError
rmseDoge

### RMSE = .05880
##### If we were to guess the reason why this model's RMSE is higher than the other cryptocurrencies is because of Dogecoin's extreme volatility.

### Doge Candlestock Graph

In [None]:
#Converting dataframe from Pyspark to Pandas
df_doge = doge.toPandas()

fig = go.Figure(data=[go.Candlestick(x=df_doge['Date'],
              open=df_doge['Open'],
              high=df_doge['High'],
              low=df_doge['Low'],
              close=df_doge['Close'],
              increasing_line_color= 'magenta', decreasing_line_color= 'cyan')])

fig.update_layout(title='Dogecoin to the Moon', yaxis_title='Price')

fig.show()

### Predicting Next 30 Days

In [None]:
lr_predictions = lrModelDoge.transform(testDoge)
lr_predictions.select("prediction","ReturnY","features").show(30)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="ReturnY",metricName="r2")
print("R^2 = %g" % lr_evaluator.evaluate(lr_predictions))