In [162]:
import pandas as pd
import numpy as np
%matplotlib inline
import config 
import sql_con
from requests import Session
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import json
import csv

In [163]:
# get module variables
ROOT_DIR = config.ROOT_DIR
select_records = sql_con.select_records
insert_records = sql_con.insert_records
update_records = sql_con.update_records
conn_odbc = sql_con.conn_odbc
read_contents = sql_con.read_contents

In [164]:
# function to make calls to cryptocompare API
def get_data(url, parameters, headers):
    session = Session()
    session.headers.update(headers)
    try:
        response = session.get(url, params=parameters)
        data = json.loads(response.text)
        return data
    except (ConnectionError, Timeout, TooManyRedirects) as e:
        print(e)

In [165]:
# function to get api data for top 10 coins
def get_coin_data(coin_list, url, headers):
    coin_data = []
    parameters = { 
        "tsym":"USD",
        "allData":"true"
    }
    for coin in coin_list:
        parameters["fsym"] = coin
        res_json = get_data(url, parameters, headers)
        data = res_json["Data"]
        # iterate through the data and add the coin name to each row
        for row in data:
            row["symbol"] = coin
        coin_data.extend(data)
    return coin_data

In [166]:
# authorization header for making calls to crypto compare API
# read api key from config file using dotev module
headers = {
  "authorization": f"Apikey {config.API_KEY}"
}

In [167]:
# relevant urls for making calls to crypto compare API
top10_url = "https://min-api.cryptocompare.com/data/top/mktcapfull"
hist_url = "https://min-api.cryptocompare.com/data/histoday"

In [168]:
# get top 10 coins by market cap, capture json response
parameters = {
  "tsym":"USD",
  "limit": 10
}

res_json_top10 = get_data(top10_url, parameters, headers)
data_top10 = res_json_top10["Data"]

In [169]:
# capture top 10 coins in a list of dictionaries and write to json file (ingestion layer)
top10_coins = [{"Name": coin["CoinInfo"]["Name"], "FullName": coin["CoinInfo"]["FullName"], "Algorithm": coin["CoinInfo"]["Algorithm"], "ProofType": coin["CoinInfo"]["ProofType"]} for coin in data_top10]
with open(rf"{ROOT_DIR}/data/top10_coins.json", "w") as f:
    f.write(json.dumps(top10_coins))

In [170]:
# make request to cryptocompare api to get historical data for bitcoin quote prices in USD

# parameters = {
#   "fsym": "BTC",
#   "tsym":"USD",
#   "allData":"true"
# }

# res_json = get_data(hist_url, parameters, headers)
# data = res_json["Data"]

In [171]:
import findspark
findspark.init()

In [172]:
# spark session start to begin transforming data (processing layer)
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("crypto_analysis").getOrCreate()

In [173]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

schema = StructType([
    StructField("Name", StringType(), True),
    StructField("FullName", StringType(), True),
    StructField("Algorithm", StringType(), True),
    StructField("ProofType", StringType(), True)
])
df_top10 = spark.read.option("schema", schema).json(rf"{ROOT_DIR}\data\top10_coins.json").select("Name", "FullName", "Algorithm", "ProofType")
df_top10.show(truncate=False)

+----+------------+---------+---------+
|Name|FullName    |Algorithm|ProofType|
+----+------------+---------+---------+
|BTC |Bitcoin     |SHA-256  |PoW      |
|ETH |Ethereum    |Ethash   |PoS      |
|USDT|Tether      |N/A      |N/A      |
|XRP |XRP         |N/A      |XRP LCP  |
|BNB |Binance Coin|BEP-2    |PoSA     |
|USDC|USD Coin    |N/A      |N/A      |
|ADA |Cardano     |Ouroboros|PoS      |
|DOGE|Dogecoin    |Scrypt   |PoW      |
|ARB |Arbitrum    |N/A      |N/A      |
|APT |Aptos       |N/A      |N/A      |
+----+------------+---------+---------+



In [174]:
coin_list = df_top10.rdd.map(lambda x: x[0]).collect()
coin_list

['BTC', 'ETH', 'USDT', 'XRP', 'BNB', 'USDC', 'ADA', 'DOGE', 'ARB', 'APT']

In [175]:
# read historical data for top 10 coins from cryptocompare API
all_coins_data = get_coin_data(coin_list, hist_url, headers)

In [176]:
# capture data from reponse and write to json file (ingestion layer)
with open(rf"{ROOT_DIR}\data\all_coins_data.json", "w") as f:
    f.write(json.dumps(all_coins_data))

In [177]:
# read ingested json file and print out first 10 records

schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("time", LongType(), True),
    StructField("close", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("open", DoubleType(), True),
    StructField("volumefrom", DoubleType(), True),
    StructField("volumeto", DoubleType(), True),
    StructField("conversionType", StringType(), True),
    StructField("conversionSymbol", StringType(), True)
])

df = spark.read.option("schema", schema).json(rf"{ROOT_DIR}/data/all_coins_data.json")
df.show(n=10)

+-------+----------------+--------------+-------+-------+-------+------+----------+----------+--------+
|  close|conversionSymbol|conversionType|   high|    low|   open|symbol|      time|volumefrom|volumeto|
+-------+----------------+--------------+-------+-------+-------+------+----------+----------+--------+
|0.04951|                |        direct|0.04951|0.04951|0.04951|   BTC|1279324800|      20.0|  0.9902|
|0.08584|                |        direct|0.08585|0.05941|0.04951|   BTC|1279411200|     75.01|   5.092|
| 0.0808|                |        direct|0.09307|0.07723|0.08584|   BTC|1279497600|     574.0|   49.66|
|0.07474|                |        direct|0.08181|0.07426| 0.0808|   BTC|1279584000|     262.0|   20.59|
|0.07921|                |        direct|0.07921|0.06634|0.07474|   BTC|1279670400|     575.0|   42.26|
| 0.0505|                |        direct|0.08181| 0.0505|0.07921|   BTC|1279756800|    2160.0|  129.78|
|0.06262|                |        direct|0.06767| 0.0505| 0.0505

In [178]:
df.printSchema()

root
 |-- close: double (nullable = true)
 |-- conversionSymbol: string (nullable = true)
 |-- conversionType: string (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- open: double (nullable = true)
 |-- symbol: string (nullable = true)
 |-- time: long (nullable = true)
 |-- volumefrom: double (nullable = true)
 |-- volumeto: double (nullable = true)



In [179]:
# get ingestion date as current unix epoch time
# write data to csv file after adding ingestion date (csv ingestion point for data pipeline)
from pyspark.sql.functions import unix_timestamp, from_unixtime, col

df = df.withColumn("ingestion_date (unix epoch)", unix_timestamp()).withColumnRenamed("time", "time (unix epoch)")

In [180]:
# change column names for volumes to be more descriptive
df = df.withColumnRenamed("volumefrom", "volume (Crypto Units)").withColumnRenamed("volumeto", "volume (USD)")

In [181]:
df.write.mode("overwrite").option("quote", "\u0000").option("emptyValue", "").csv(rf"{ROOT_DIR}/data/all_coins_data.csv", header=True)

In [182]:
# read from ingested csv file and print out first 10 records
schema = StructType([
    StructField("close", DoubleType(), True),
    StructField("conversionSymbol", StringType(), True),
    StructField("conversionType", StringType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("open", DoubleType(), True),
    StructField("symbol", StringType(), True),
    StructField("time (unix epoch)", LongType(), True),
    StructField("volume (Crypto Units)", DoubleType(), True),
    StructField("volume (USD)", DoubleType(), True),
    StructField("ingestion_date (unix epoch)", LongType(), True)
])

df = spark.read.format("csv").option("header", "true").option("delimiter", ",").schema(schema).load(rf"{ROOT_DIR}/data/all_coins_data.csv")
#df = spark.read.csv(rf"{ROOT_DIR}/data/all_coins_data.csv", header=True)
df.show(n=10,truncate=False)

+-------+----------------+--------------+-------+-------+-------+------+-----------------+---------------------+------------+---------------------------+
|close  |conversionSymbol|conversionType|high   |low    |open   |symbol|time (unix epoch)|volume (Crypto Units)|volume (USD)|ingestion_date (unix epoch)|
+-------+----------------+--------------+-------+-------+-------+------+-----------------+---------------------+------------+---------------------------+
|0.04951|null            |direct        |0.04951|0.04951|0.04951|BTC   |1279324800       |20.0                 |0.9902      |1680642809                 |
|0.08584|null            |direct        |0.08585|0.05941|0.04951|BTC   |1279411200       |75.01                |5.092       |1680642809                 |
|0.0808 |null            |direct        |0.09307|0.07723|0.08584|BTC   |1279497600       |574.0                |49.66       |1680642809                 |
|0.07474|null            |direct        |0.08181|0.07426|0.0808 |BTC   |1279

In [185]:
df.printSchema()

root
 |-- close: double (nullable = true)
 |-- conversionSymbol: string (nullable = true)
 |-- conversionType: string (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- open: double (nullable = true)
 |-- symbol: string (nullable = true)
 |-- time (unix epoch): long (nullable = true)
 |-- volume (Crypto Units): double (nullable = true)
 |-- volume (USD): double (nullable = true)
 |-- ingestion_date (unix epoch): long (nullable = true)



In [186]:
# extract necessary columns
df = df.select(["symbol", "time (unix epoch)", "open", "close", "high", "low", "volume (Crypto Units)", "volume (USD)"])
df.dtypes

[('symbol', 'string'),
 ('time (unix epoch)', 'bigint'),
 ('open', 'double'),
 ('close', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('volume (Crypto Units)', 'double'),
 ('volume (USD)', 'double')]

In [194]:
# get descriptive statistics for numeric columns
df.describe(["open", "close", "high", "low", "volume (Crypto Units)", "volume (USD)"]).show()

+-------+-----------------+-----------------+------------------+-----------------+---------------------+--------------------+
|summary|             open|            close|              high|              low|volume (Crypto Units)|        volume (USD)|
+-------+-----------------+-----------------+------------------+-----------------+---------------------+--------------------+
|  count|            46450|            46450|             46450|            46450|                46450|               46450|
|   mean| 945.719361304651|946.3698190837255| 971.7991685467883|  916.80884632498| 2.4695360300692856E7| 8.732257568459764E7|
| stddev|5304.725793909023|5306.234746338349|5447.8031052766855|5142.940414625725| 1.5988972176306462E8|3.5171638534750915E8|
|    min|              0.0|              0.0|               0.0|              0.0|                  0.0|                 0.0|
|    max|         67549.14|         67549.14|          68978.64|         66312.42|    1.153458176332E10|   1.112022085

In [195]:
df.show(n=10)

+------+-----------------+-------+-------+-------+-------+---------------------+------------+-------------------+
|symbol|time (unix epoch)|   open|  close|   high|    low|volume (Crypto Units)|volume (USD)|   date_time (unix)|
+------+-----------------+-------+-------+-------+-------+---------------------+------------+-------------------+
|   BTC|       1279324800|0.04951|0.04951|0.04951|0.04951|                 20.0|      0.9902|2010-07-17 00:00:00|
|   BTC|       1279411200|0.04951|0.08584|0.08585|0.05941|                75.01|       5.092|2010-07-18 00:00:00|
|   BTC|       1279497600|0.08584| 0.0808|0.09307|0.07723|                574.0|       49.66|2010-07-19 00:00:00|
|   BTC|       1279584000| 0.0808|0.07474|0.08181|0.07426|                262.0|       20.59|2010-07-20 00:00:00|
|   BTC|       1279670400|0.07474|0.07921|0.07921|0.06634|                575.0|       42.26|2010-07-21 00:00:00|
|   BTC|       1279756800|0.07921| 0.0505|0.08181| 0.0505|               2160.0|      12

In [196]:
# set spark session timezone to UTC to have a uniform reference point for all date related fields
spark.conf.set("spark.sql.session.timeZone", "UTC")
df = df.withColumn("date_time (unix)", from_unixtime("time (unix epoch)", "yyyy-MM-dd HH:mm:ss"))
spark.conf.unset("spark.sql.session.timeZone")

In [207]:
# timezone will default to system timezone (Easter Standard Time) in absence of specific spark.sql.session.timeZone setting
df.withColumn("date_time", from_unixtime("time (unix epoch)", "yyyy-MM-dd HH:mm:ss")).show(n=10, truncate=False)

+------+-----------------+-------+-------+-------+-------+---------------------+------------+-------------------+-------------------+
|symbol|time (unix epoch)|open   |close  |high   |low    |volume (Crypto Units)|volume (USD)|date_time (unix)   |date_time          |
+------+-----------------+-------+-------+-------+-------+---------------------+------------+-------------------+-------------------+
|BTC   |1279324800       |0.04951|0.04951|0.04951|0.04951|20.0                 |0.9902      |2010-07-17 00:00:00|2010-07-16 20:00:00|
|BTC   |1279411200       |0.04951|0.08584|0.08585|0.05941|75.01                |5.092       |2010-07-18 00:00:00|2010-07-17 20:00:00|
|BTC   |1279497600       |0.08584|0.0808 |0.09307|0.07723|574.0                |49.66       |2010-07-19 00:00:00|2010-07-18 20:00:00|
|BTC   |1279584000       |0.0808 |0.07474|0.08181|0.07426|262.0                |20.59       |2010-07-20 00:00:00|2010-07-19 20:00:00|
|BTC   |1279670400       |0.07474|0.07921|0.07921|0.06634|575.

#### Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume(USD) of stock traded for a day

In [208]:
df2 = df.withColumn("HV Ratio", col("high")/col("volume (USD)"))

# display dataframe in descending order of HV Ratio
df2.sort("HV Ratio", ascending=False).show(truncate=False)

+------+-----------------+------+------+------+------+---------------------+------------+-------------------+------------------+
|symbol|time (unix epoch)|open  |close |high  |low   |volume (Crypto Units)|volume (USD)|date_time (unix)   |HV Ratio          |
+------+-----------------+------+------+------+------+---------------------+------------+-------------------+------------------+
|USDT  |1441152000       |0.96  |1.15  |1.15  |0.96  |1.252E-4             |1.44E-4     |2015-09-02 00:00:00|7986.11111111111  |
|USDT  |1424476800       |1.15  |1.5   |1.5   |1.15  |1.415E-4             |2.122E-4    |2015-02-21 00:00:00|7068.80301602262  |
|USDT  |1456185600       |1.1   |1.1   |1.1   |1.1   |1.518E-4             |1.67E-4     |2016-02-23 00:00:00|6586.82634730539  |
|USDT  |1420761600       |1.0   |1.0   |1.0   |1.0   |2.4E-4               |2.4E-4      |2015-01-09 00:00:00|4166.666666666667 |
|USDT  |1422662400       |0.9274|4.37  |4.37  |0.874 |0.005                |0.02185     |2015-01-

In [209]:
# sort by date_time (unix) in descending order to get HV Ratiio for most recent dates
df2.sort("date_time (unix)", ascending=False).show(truncate=False)

+------+-----------------+--------+--------+--------+--------+---------------------+--------------+-------------------+---------------------+
|symbol|time (unix epoch)|open    |close   |high    |low     |volume (Crypto Units)|volume (USD)  |date_time (unix)   |HV Ratio             |
+------+-----------------+--------+--------+--------+--------+---------------------+--------------+-------------------+---------------------+
|BTC   |1680566400       |27810.08|28238.05|28437.75|27674.33|31666.76             |8.902245158E8 |2023-04-04 00:00:00|3.194446962005357E-5 |
|XRP   |1680566400       |0.4964  |0.507   |0.5073  |0.4871  |7.787714001E7        |3.868020929E7 |2023-04-04 00:00:00|1.3115234103222713E-8|
|BNB   |1680566400       |308.66  |311.52  |312.45  |308.26  |13091.68             |4047818.66    |2023-04-04 00:00:00|7.718972272339888E-5 |
|USDT  |1680566400       |1.0     |1.0     |1.002   |0.9994  |2.3021928135E8       |2.3025598093E8|2023-04-04 00:00:00|4.351678492575693E-9 |
|ADA  

In [210]:
df2.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- time (unix epoch): long (nullable = true)
 |-- open: double (nullable = true)
 |-- close: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- volume (Crypto Units): double (nullable = true)
 |-- volume (USD): double (nullable = true)
 |-- date_time (unix): string (nullable = true)
 |-- HV Ratio: double (nullable = true)



In [211]:
df2.groupby(col("symbol").alias("sym")).max("high").show()

+----+---------+
| sym|max(high)|
+----+---------+
| ARB|    1.266|
| ETH|  4865.94|
|DOGE|   0.7398|
|USDC|    1.643|
| BTC| 68978.64|
|USDT|   207.09|
| XRP|     3.29|
| BNB|   693.73|
| APT|    20.33|
| ADA|    3.097|
+----+---------+



#### What day had the Peak High in Price for each cryptocurrency in dataset?

In [212]:
df_max_high = df2.groupby(col("symbol").alias("sym")).agg({"high": "max"})
df2.join(df_max_high, (df2["symbol"] == df_max_high["sym"]) & (df2["high"] == df_max_high["max(high)"])).select(["symbol", "date_time (unix)", "high"]).show()

+------+-------------------+--------+
|symbol|   date_time (unix)|    high|
+------+-------------------+--------+
|   BTC|2021-11-10 00:00:00|68978.64|
|   ETH|2021-11-10 00:00:00| 4865.94|
|  USDT|2015-11-06 00:00:00|  207.09|
|   XRP|2018-01-04 00:00:00|    3.29|
|   BNB|2021-11-08 00:00:00|  693.73|
|  USDC|2020-03-12 00:00:00|   1.643|
|   ADA|2021-09-02 00:00:00|   3.097|
|  DOGE|2021-05-08 00:00:00|  0.7398|
|   ARB|2023-04-04 00:00:00|   1.266|
|   APT|2023-01-30 00:00:00|   20.33|
+------+-------------------+--------+



#### What is the mean closing price for each cryptocurrency?

In [213]:
df2.groupby("symbol").mean("close").sort("avg(close)", ascending=False).show()

+------+--------------------+
|symbol|          avg(close)|
+------+--------------------+
|   BTC|   8894.909536402578|
|   ETH|  501.32571623250794|
|   BNB|   65.56586152852529|
|  USDT|   0.759260495156095|
|  USDC| 0.35313466092572676|
|   APT|  0.3236187298170076|
|   XRP| 0.22897907621097963|
|   ADA|  0.2054549364908503|
|  DOGE|0.026359883741657687|
|   ARB|2.688912809472551...|
+------+--------------------+



#### What is min and max volume (both unit assets and price) for each cryptocurrency?

In [221]:
from pyspark.sql.functions import max, min
df2.groupby("symbol").agg(
    max("volume (Crypto Units)").alias("max_volume (Crypto Units)"),
    min("volume (Crypto Units)").alias("min_volume (Crypto Units)"),
    max("volume (USD)").alias("max_volume (USD)"),
    min("volume (USD)").alias("min_volume (USD)")
).show()

+------+-------------------------+-------------------------+-----------------+----------------+
|symbol|max_volume (Crypto Units)|min_volume (Crypto Units)| max_volume (USD)|min_volume (USD)|
+------+-------------------------+-------------------------+-----------------+----------------+
|   ARB|               5023135.32|                      0.0|       6160146.76|             0.0|
|   ETH|            1.098033209E7|                      0.0|  9.65796855596E9|             0.0|
|  DOGE|        1.153458176332E10|                      0.0|  2.12237452234E9|             0.0|
|  USDC|           5.6439868532E8|                      0.0|    5.333564084E8|             0.0|
|   BTC|                572349.32|                      0.0|1.112022085477E10|             0.0|
|  USDT|          4.23247075921E9|                      0.0|  4.18769644556E9|             0.0|
|   XRP|          3.61211042715E9|                      0.0|  1.56522873526E9|             0.0|
|   BNB|            1.082578296E7|      

#### What is the Pearson's correlation coefficient between high and volume (USD) for each cryptocurrency?

In [224]:
from pyspark.sql.functions import corr
df2.groupby("symbol").agg(corr("high", "volume (USD)")).show()

+------+------------------------+
|symbol|corr(high, volume (USD))|
+------+------------------------+
|   ARB|      0.9999999999999999|
|   ETH|      0.7296692914683878|
|  DOGE|      0.7214350649378474|
|  USDC|     0.12918287030966932|
|   BTC|      0.7775434698600653|
|  USDT|    0.002493387757783...|
|   XRP|      0.6677518691371586|
|   BNB|     0.45926335184231143|
|   APT|      0.8610890548470935|
|   ADA|      0.7213457151699877|
+------+------------------------+



#### What is max metrics per year for each cryptocurrency?

In [238]:
max_metrics_df = df2.groupby(["symbol", "date_time (unix)"]).agg(max("high").alias("max_high"),
                                                max("close").alias("max_close"),
                                                max("open").alias("max_open"),
                                                max("low").alias("max_low"))
max_metrics_df.show()

+------+-------------------+--------+---------+--------+--------+
|symbol|   date_time (unix)|max_high|max_close|max_open| max_low|
+------+-------------------+--------+---------+--------+--------+
|   BTC|2011-03-16 00:00:00|    0.88|     0.86|    0.87|   0.836|
|   BTC|2011-06-22 00:00:00|   17.51|    17.51|   17.51|   17.51|
|   BTC|2011-08-04 00:00:00|   11.15|    10.75|    9.26|    9.27|
|   BTC|2012-06-15 00:00:00|   6.587|      6.5|   5.954|   5.883|
|   BTC|2013-11-28 00:00:00| 1224.48|  1101.38| 1079.89| 1032.06|
|   BTC|2014-03-26 00:00:00|  575.42|   562.45|  562.89|  546.26|
|   BTC|2014-04-07 00:00:00|  485.06|   462.38|  455.69|  447.98|
|   BTC|2015-02-15 00:00:00|  264.57|   233.27|  258.64|  226.56|
|   BTC|2015-04-18 00:00:00|  224.47|   223.35|  222.59|  220.57|
|   BTC|2016-11-05 00:00:00|  706.38|   702.11|  702.08|  694.03|
|   BTC|2016-12-08 00:00:00|  773.41|   768.49|  765.56|  761.17|
|   BTC|2016-12-09 00:00:00|  773.48|   770.48|  768.49|  765.03|
|   BTC|20