## Requirements

In [226]:
import sys
from pyspark import SparkConf, SparkContext, SQLContext
from math import sqrt
import numpy as np
from pyspark.sql.functions import rand, randn
from pyspark.mllib.stat import Statistics

In [245]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat

In [3]:
sc = SparkContext.getOrCreate()
sqlcont = SQLContext(sc)

## Getting Data

In [72]:
# Using the same schema for all three pairs.

schema = StructType() \
      .add("Open Time",DoubleType(),True) \
      .add("Open",DoubleType(),True) \
      .add("High",DoubleType(),True) \
      .add("Low",DoubleType(),True) \
      .add("Close",DoubleType(),True) \
      .add("Volume",DoubleType(),True) \
      .add("Close Time",DoubleType(),True) \
      .add("Quote asset volume",DoubleType(),True) \
      .add("Number of trades",IntegerType(),True) \
      .add("Taker buy base asset volume",DoubleType(),True) \
      .add("Taker buy quote asset volume",DoubleType(),True) \
      .add("Ignore",StringType(),True) \

      
df_btc = sqlcont.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load('csv/btc_2021_hourly.csv')

df_eth = sqlcont.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load('csv/eth_2021_hourly.csv')

df_doge = sqlcont.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load('csv/doge_2021_hourly.csv')

In [73]:
df_doge.printSchema()

root
 |-- Open Time: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Close Time: double (nullable = true)
 |-- Quote asset volume: double (nullable = true)
 |-- Number of trades: integer (nullable = true)
 |-- Taker buy base asset volume: double (nullable = true)
 |-- Taker buy quote asset volume: double (nullable = true)
 |-- Ignore: string (nullable = true)



In [65]:
df_btc.show(10, vertical=True)

-RECORD 0-------------------------------------------
 Open Time                    | 1.6094628E9         
 Open                         | 28995.13            
 High                         | 29470.0             
 Low                          | 28960.35            
 Close                        | 29409.99            
 Volume                       | 5403.068471         
 Close Time                   | 1.609466399999E12   
 Quote asset volume           | 1.583578168180572E8 
 Number of trades             | 103896              
 Taker buy base asset volume  | 3160.041701         
 Taker buy quote asset volume | 9.261399193555292E7 
 Ignore                       | 0                   
-RECORD 1-------------------------------------------
 Open Time                    | 1.6094664E9         
 Open                         | 29410.0             
 High                         | 29465.26            
 Low                          | 29120.03            
 Close                        | 29194.65      

# Categorizing price movement

In [37]:
# If the price moves;
#  + 5-∞ category A
#  + 0-5 category B
#  - 0-5 category C
#  - 5-100 category D

In [94]:
df_btc=df_btc.withColumn("Category", \
   when(((df_btc.Close-df_btc.Open)*100/df_btc.Open >= 5), lit("A")) \
     .when(((df_btc.Close-df_btc.Open)*100/df_btc.Open >= 0) & ((df_btc.Close-df_btc.Open)*100/df_btc.Open < 5), lit("B")) \
     .when(((df_btc.Close-df_btc.Open)*100/df_btc.Open < 0) & ((df_btc.Close-df_btc.Open)*100/df_btc.Open > -5), lit("C")) \
     .otherwise(lit("D"))            
  )

df_eth=df_eth.withColumn("Category", \
   when(((df_eth.Close-df_eth.Open)*100/df_eth.Open >= 5 ), lit("A")) \
     .when(((df_eth.Close-df_eth.Open)*100/df_eth.Open >= 0) & ((df_eth.Close-df_eth.Open)*100/df_eth.Open < 5), lit("B")) \
     .when(((df_eth.Close-df_eth.Open)*100/df_eth.Open < 0) & ((df_eth.Close-df_eth.Open)*100/df_eth.Open > -5), lit("C")) \
     .otherwise(lit("D"))            
  )

df_doge=df_doge.withColumn("Category", \
   when(((df_doge.Close-df_doge.Open)*100/df_doge.Open >= 5 ), lit("A")) \
     .when(((df_doge.Close-df_doge.Open)*100/df_doge.Open >= 0) & ((df_doge.Close-df_doge.Open)*100/df_doge.Open < 5), lit("B")) \
     .when(((df_doge.Close-df_doge.Open)*100/df_doge.Open < 0) & ((df_doge.Close-df_doge.Open)*100/df_doge.Open > -5), lit("C")) \
     .otherwise(lit("D"))            
  )

df_btc = df_btc.withColumn("Pair", lit("BTCUSDT"))
df_eth = df_eth.withColumn("Pair", lit("ETHUSDT"))
df_doge = df_doge.withColumn("Pair", lit("DOGEUSDT"))

# Designed a new DF to store pairs and their categories together to use crosstab. 
## DOGE pair is more volatile than the others as we can see. Category A is desired since 5 percent up within an hour is not so frequent.

In [105]:
cols = ['Pair', 'Category']    

df1 = df_btc.select(cols)
df2 = df_eth.select(cols)
df3 = df_doge.select(cols)
df_categorical = df1.union(df2.union(df3))

df_categorical.stat.crosstab("Pair","Category").show()

+-------------+---+----+----+---+
|Pair_Category|  A|   B|   C|  D|
+-------------+---+----+----+---+
|      BTCUSDT|  3|1694|1609|  6|
|     DOGEUSDT|144|1537|1525|106|
|      ETHUSDT|  7|1765|1526| 14|
+-------------+---+----+----+---+



## Frequent Categories

In [106]:
freq = df_categorical.stat.freqItems(["Category"], 0.4)
freq.collect()[0]

Row(Category_freqItems=['C', 'B'])

## Calculated Mean Value, Covariance, Correlation, Skewness and Kurtosis

In [203]:
schema = StructType() \
      .add("index",IntegerType(),True) \
      .add("BTC",DoubleType(),True) \
      .add("ETH",DoubleType(),True) \

      
df_comparison = sqlcont.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load('csv/comparison.csv')

df_comparison.show()

+-----+--------+------+
|index|     BTC|   ETH|
+-----+--------+------+
|    0|28995.13|734.07|
|    1|29409.99|748.28|
|    2|29194.65|744.06|
|    3| 29278.4|744.82|
|    4|29220.31|742.29|
|    5|29187.01|740.65|
|    6|29174.35|739.97|
|    7|29092.83|737.38|
|    8|29000.01|730.07|
|    9|29202.21|733.68|
|   10|29223.82|736.81|
|   11|29313.49|738.85|
|   12|29233.49|733.19|
|   13|29464.79|740.08|
|   14|29327.84| 738.0|
|   15|29188.67|735.39|
|   16|29300.57|735.83|
|   17|29079.64|727.94|
|   18| 29072.7| 724.6|
|   19|29029.04|725.34|
+-----+--------+------+
only showing top 20 rows



In [244]:
# get the hourly closing prices of Bitcoin/USDT pair within a rdd.

rdd = df_comparison.rdd
rdd_btc = rdd.map(lambda x: x[1])
rdd_eth = rdd.map(lambda x: x[2])

sum_rdd = rdd_btc.sum()
count_rdd = rdd_btc.count()
mean = sum_rdd/count_rdd

print("Mean Value of BTC since the beginning of 2021 is: " + str(mean))
print("\nCovariance between BTC and ETH is " + str(df_comparison.cov("BTC","ETH")))
print("\nCorrelation between pairs: " + str(Statistics.corr(rdd_btc, rdd_eth, method="pearson")))

n= float(rdd_btc.count())
skewness = rdd_btc.map(lambda x: pow((x - mean),3) / pow(std,3)).sum()*1/n
print("\nSkewness of BTC is: " + str(skewness))
kurtosis=rdd_btc.map(lambda x: pow((x - mean),4) / pow(std,4)).sum()*1/n
print("\nKurtosis of BTC is: " + str(kurtosis))

Mean Value of BTC since the beginning of 2021 is: 48766.10135526716

Covariance between BTC and ETH is 4130680.824073262

Correlation between pairs: 0.5652529024784977

Skewness of BTC is: -0.5350558333775998

Kurtosis of BTC is: 1.918070966854103


# Used colStat for min, max Norm L1, L2 and non-zero values 

In [238]:
#ColStat

rdd_=sc.textFile('csv/btc_2021_hourly.csv') \
    .map(lambda x: x.split(",")) \
    .map(lambda x: (x[4])) \
    .map(lambda x: np.array(x)) 
summary = Statistics.colStats(rdd_)

print("{} is the ".format(summary.max()[0]) + "\033[4m" + "maximum" +"\033[0m" +" value of Bitcoin/USDT pair in 2021\n")
print("{} is the ".format(summary.min()[0]) + "\033[4m" + "minimum" +"\033[0m" +" value of Bitcoin/USDT pair in 2021\n")

print('\033[1m' + "Norm L1" +'\033[0m' +" and " +'\033[1m' +"Norm L2 " +'\033[0m' + "values are given: " + str(summary.normL1()[0]) + " , " + str(summary.normL2()[0]))

print("\nIn this case, all values are non-zero, as expected: {}".format(int(summary.numNonzeros()[0])))

64577.26 is the [4mmaximum[0m value of Bitcoin/USDT pair in 2021

28995.13 is the [4mminimum[0m value of Bitcoin/USDT pair in 2021

[1mNorm L1[0m and [1mNorm L2 [0mvalues are given: 161562093.79000002 , 2861012.8995879926

In this case, all values are non-zero, as expected: 3313
