# CS624 Final Project

#### Problem Statement:  To predict the closing price of the bitcoin using historical data and coinbase api

This is batch problem with large dataset,close to 4M historical records. The dataset is extracted from https://www.cryptodatadownload.com/data/bitstamp/

Platform/Packages:
* Databricks
* Pyspark
* Pandas
* MLFlow
* PypsarkMLib
* Matplotlib
* Seaborn
* FB Prophet
* DBFS
* Parquet/Snappy Files


Goals:
* Importing Bitcoin Historical dataset to Spark.
* Understanding of the dataset.
* Visualizing the data.
* Performing analysis using Spark ML.
* Present/Visualize your results.

* Cryptocurrency -> "Digital currency", used in mathematical operations and with a strong appeal for decentralization of financial resources and greater agility, privacy and security than the current alternatives.

* BTC -> Bitcoin, the first cryptography developed and launched commercially for trading, considered stronger and more stable than those currently existing due to its "resilience" to crises and devaluations.

* USD -> United States Dollar or American Dollar, currency used as a reference for most exchanges, due to an easy conversion between BTC and USD.

* Bitstamp -> Online crypto exchange responsible for facilitating negotiations between traders; Information related to the value and how it was made, serving as a database for this study.

* Negotiator -> Negotiator who operates in the market (in this case, crypto), using, through consecutive purchase and sale operations, to make a profit.

* Long -> Investment used / "betting" profit in case of increase in the value of the asset, normally involving the purchase of lower values ​​and the sale of higher values.

* Short -> Wins by "renting" higher value assets, selling the same rights and buying back a lower price, returning the same amount of assets (which is less than "rented") and profiting from a difference between the value at the time of the sale and the repurchase.
Columns

Attributes:
* Timestamp -> Date (in Epoch Unix format) of data collection; It will later be transformed into a "human" date for better understanding; Intervals of approximately 1 in 1 minute, with time zone set to UTC.

* Open -> Initial currency trading value in that measurement range, in USD.

* High -> Highest value reached by the asset during that measurement interval, in USD.

* Low -> Lowest value reached by the asset during that measurement interval, in USD.

* Close -> Value of the asset at the time of closing the measurement range, in USD.

* Volume_ (BTC) -> Volume, in BTC, traded on Bitstamp during a given measurement interval

* Volume_ (Currency) -> Volume, in USD, traded on Bitstamp during a given measurement interval;

* Weighted_Price -> Average asset price in that range, in USD; Calculated based on traded volumes; It will be considered as the average price for analytical issues.


Historical Dataset Detail=>https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data (Zielak)

Incremental Dataset Details=> https://api.pro.coinbase.com/products/BTC-USD/candles?granularity=86400

Github => https://github.com/ashishodu2023/CS624FinalProject.git

### Challenges With Databricks Community Edition

* Limited Resources
* Cluster Termination
* No Job Scheduling
* Limited Integration
* Data Transfer Limits
* Spark Optimization
* Frequent Cache and Persist Datasets
* Increase Garbage collection Heapsize
* Limited Memory with notebook
* Limit display() and show()
* Spark with ML takes longer time to spun.

<img src ='/files/tables/BitCoinSparkDesign.png'>

<img src ='/files/tables/SparkMLPrediction.png'>



<img src ='/files/tables/ProphetPredictions.png'>

In [0]:
#Pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,DoubleType,DateType,TimestampType
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import sum, col, to_date
from pyspark import SparkConf

In [0]:
 spark

In [0]:
class CSVtoParquetConverter:
    def __init__(self, input_csv, output_parquet):
        self.input_csv = input_csv
        self.output_parquet = output_parquet
        self.spark = SparkSession.builder \
            .appName("CSV to Parquet with Snappy Compression") \
            .getOrCreate()

    def convert(self):
        # Read CSV file into a DataFrame
        df = self.spark.read \
            .format("csv") \
            .option("header", "true") \
            .load(self.input_csv)

        # Write DataFrame to Parquet file with Snappy compression
        df.write \
            .format("parquet") \
            .option("compression", "snappy") \
            .mode("overwrite") \
            .save(self.output_parquet)

        print("Conversion complete.")

    def close(self):
        # Stop the SparkSession
        self.spark.stop()



In [0]:
input_csv = "/FileStore/tables/btc_hist.csv"
output_parquet = "/FileStore/tables/btc_hist.parquet"
converter = CSVtoParquetConverter(input_csv, output_parquet)
converter.convert()

Conversion complete.


In [0]:
class DataAcquisition:
    def __init__(self, app_name="DataAcquisitionApp"):
        """
        Initializes a Spark session.

        Args:
        - app_name: Name of the Spark application.
        """
        conf = SparkConf()
        conf.set("spark.executor.memory", "20g")
        conf.set("spark.driver.memory", "10g")
        conf.set("spark.executor.cores", "4")
        conf.set("spark.default.parallelism", "200")
        conf.set("spark.sql.shuffle.partitions", "200")
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseG4GC")
        conf.set("spark.dynamicAllocation.enabled", "true")
        conf.set("spark.shuffle.service.enabled", "true")
        conf.set("spark.memory.fraction", "0.7")
        conf.set("spark.memory.storageFraction", "0.5")
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .getOrCreate()
            #.config("spark.sql.execution.arrow.enabled", "true") \


    def read_parquet(self, file_path, header=True, infer_schema=True):
        """
        Reads a Parquet file into a DataFrame.

        Args:
        - file_path: Path to the Parquet file.
        - header: Whether the Parquet file has a header row.
        - infer_schema: Whether to infer the data types of columns.

        Returns:
        - DataFrame: The DataFrame containing the Parquet data.
        """
        schema = StructType([
        StructField("Timestamp", DateType(), True),
        StructField("Open", DoubleType(), True),
        StructField("High", DoubleType(), True),
        StructField("Low", DoubleType(), True),
        StructField("Close", DoubleType(), True),
        StructField("Volume_(BTC)", DoubleType(), True),
        StructField("Volume_(Currency)", DoubleType(), True),
        StructField("Weighted_Price", DoubleType(), True)
    ])
        return self.spark.read.parquet(file_path, header=header, schema=schema)
    
    def ChangeDataType(self,clean_data):
        schema = StructType([
        StructField("Timestamp", StringType(), True),
        StructField("Open", DoubleType(), True),
        StructField("High", DoubleType(), True),
        StructField("Low", DoubleType(), True),
        StructField("Close", DoubleType(), True),
        StructField("Volume", IntegerType(), True)
        ])
        new_data = clean_data.select(
        [clean_data[col_name].cast(schema[col_name].dataType).alias(col_name) for col_name in clean_data.columns])
        return new_data



    def stop(self):
        """
        Stops the Spark session.
        """
        self.spark.stop()

In [0]:
data_acquisition = DataAcquisition(app_name="DataAcquisitionApp")
URL="/FileStore/tables/btc_hist.parquet"
raw_data_hist = data_acquisition.read_parquet(URL)
raw_data_hist = raw_data_hist.withColumn("Timestamp", raw_data_hist["Timestamp"].cast("long"))
converted_data_hist = raw_data_hist.withColumn("Timestamp_new", from_unixtime("Timestamp", "yyyy-MM-dd HH:mm:ss"))
converted_data_hist.persist()
converted_data_hist.display()

Timestamp,Open,High,Low,Close,Volume_(BTC),Volume_(Currency),Weighted_Price,Timestamp_new
1496427420,2429.09,2430.19,2424.58,2430.13,1.33349235,3240.5760806,2430.1422356,2017-06-02 18:17:00
1496427480,2430.1,2432.23,2430.1,2432.22,1.51923944,3694.3070891,2431.6819271,2017-06-02 18:18:00
1496427540,2432.2,2432.8,2431.89,2431.9,2.35693827,5732.0210778,2431.9776002,2017-06-02 18:19:00
1496427600,2432.72,2439.14,2432.6,2439.12,19.54745984,47652.528158,2437.7862161,2017-06-02 18:20:00
1496427660,2439.13,2439.14,2435.88,2435.88,1.8502236,4509.0938063,2437.0534492,2017-06-02 18:21:00
1496427720,2435.04,2438.94,2432.01,2432.01,16.6826187,40600.780207,2433.7174479,2017-06-02 18:22:00
1496427780,2435.02,2435.02,2434.87,2434.87,4.59029053,11177.372206,2435.0032167,2017-06-02 18:23:00
1496427840,2434.53,2434.53,2434.21,2434.21,5.25696119,12796.986861,2434.2935773,2017-06-02 18:24:00
1496427900,2434.18,2434.18,2429.24,2433.65,3.31298281,8054.0706597,2431.0632206,2017-06-02 18:25:00
1496427960,2428.9,2436.27,2428.9,2436.27,12.2571,29824.345537,2433.2301716,2017-06-02 18:26:00


In [0]:
print(f"The total rows in the historical data is={converted_data_hist.count()}")

The total rows in the historical data is=4857377


In [0]:
%fs ls /FileStore/tables/btc_hist.parquet


path,name,size,modificationTime
dbfs:/FileStore/tables/btc_hist.parquet/_committed_5759613211440336147,_committed_5759613211440336147,1607,1713596908000
dbfs:/FileStore/tables/btc_hist.parquet/_committed_9123405493718051227,_committed_9123405493718051227,1623,1713416481000
dbfs:/FileStore/tables/btc_hist.parquet/_committed_9177663678628362956,_committed_9177663678628362956,1618,1713416386000
dbfs:/FileStore/tables/btc_hist.parquet/_committed_vacuum7816266291426440608,_committed_vacuum7816266291426440608,129,1713596910000
dbfs:/FileStore/tables/btc_hist.parquet/_started_5759613211440336147,_started_5759613211440336147,0,1713596847000
dbfs:/FileStore/tables/btc_hist.parquet/part-00000-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-1-1-c000.snappy.parquet,part-00000-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-1-1-c000.snappy.parquet,12614476,1713596906000
dbfs:/FileStore/tables/btc_hist.parquet/part-00001-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-2-1-c000.snappy.parquet,part-00001-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-2-1-c000.snappy.parquet,23539794,1713596907000
dbfs:/FileStore/tables/btc_hist.parquet/part-00002-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-3-1-c000.snappy.parquet,part-00002-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-3-1-c000.snappy.parquet,22071087,1713596904000
dbfs:/FileStore/tables/btc_hist.parquet/part-00003-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-4-1-c000.snappy.parquet,part-00003-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-4-1-c000.snappy.parquet,23040181,1713596907000
dbfs:/FileStore/tables/btc_hist.parquet/part-00004-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-5-1-c000.snappy.parquet,part-00004-tid-5759613211440336147-cb0f554f-2591-4c78-a590-25c2fbfa3772-5-1-c000.snappy.parquet,28069206,1713596900000


In [0]:
converted_data_hist.write.mode("overwrite").parquet("/FileStore/tables/parquet_data/converted_data_hist.parquet", compression="snappy")

In [0]:
%fs ls /FileStore/tables/parquet_data/converted_data_hist.parquet


path,name,size,modificationTime
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/_committed_3646424222917117867,_committed_3646424222917117867,1618,1712983031000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/_committed_5591784131558051154,_committed_5591784131558051154,1607,1713132423000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/_committed_7796620584464095062,_committed_7796620584464095062,1607,1713597042000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/_committed_vacuum5648875412127594794,_committed_vacuum5648875412127594794,129,1713597043000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/_started_7796620584464095062,_started_7796620584464095062,0,1713596996000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/part-00000-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-28-1-c000.snappy.parquet,part-00000-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-28-1-c000.snappy.parquet,29966457,1713597034000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/part-00001-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-29-1-c000.snappy.parquet,part-00001-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-29-1-c000.snappy.parquet,29626398,1713597032000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/part-00002-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-30-1-c000.snappy.parquet,part-00002-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-30-1-c000.snappy.parquet,28637558,1713597036000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/part-00003-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-31-1-c000.snappy.parquet,part-00003-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-31-1-c000.snappy.parquet,26480587,1713597028000
dbfs:/FileStore/tables/parquet_data/converted_data_hist.parquet/part-00004-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-32-1-c000.snappy.parquet,part-00004-tid-7796620584464095062-05ea438a-7254-4e87-9a6c-f829251ad879-32-1-c000.snappy.parquet,25900456,1713597041000
