In [1]:
import requests

url = "https://api.coingecko.com/api/v3/coins/bitcoin/market_chart?vs_currency=usd&days=30"

headers = {"accept": "application/json"}

response = requests.get(url, headers=headers)


#print(response.text)

In [2]:
prices_data = response.json()["prices"]
market_caps_data = response.json()["market_caps"]
total_volumes_data = response.json()["total_volumes"]

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Spark Project")
    .config("spark.driver.memory", "8g") 
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "2")
    .master("local[*]") 
    .getOrCreate()
)



In [None]:
prices_schema = "timestamp long , price double"
market_caps_schema = "timestamp long , market_cap double"
total_volumes_schema = "timestamp long , total_volume double"


prices_df = spark.createDataFrame(data = prices_data , schema = prices_schema)
market_caps_df = spark.createDataFrame(data = market_caps_data , schema = market_caps_schema)
total_volumes_df = spark.createDataFrame(data = total_volumes_data , schema = total_volumes_schema)

In [None]:

spark.conf.set("spark.sql.adaptive.Enabled" , False)
spark.conf.set("spark.sql.coalescePartitions.enabled" , False)
spark.conf.set("spark.sql.autoBroacastJoinThreshold" , False)

In [None]:
from pyspark.sql.functions import broadcast

bitcoin_df1 = broadcast(prices_df).join(broadcast(market_caps_df) ,["timestamp"] , "inner").join(broadcast(total_volumes_df) ,["timestamp"] , "inner")

In [None]:
#bitcoin_df1.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [timestamp#2L, price#3, market_cap#7, total_volume#11]
   +- BroadcastHashJoin [timestamp#2L], [timestamp#10L], Inner, BuildRight, false, true
      :- Project [timestamp#2L, price#3, market_cap#7]
      :  +- BroadcastHashJoin [timestamp#2L], [timestamp#6L], Inner, BuildRight, false, true
      :     :- Filter isnotnull(timestamp#2L)
      :     :  +- Scan ExistingRDD[timestamp#2L,price#3]
      :     +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=45]
      :        +- Filter isnotnull(timestamp#6L)
      :           +- Scan ExistingRDD[timestamp#6L,market_cap#7]
      +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=49]
         +- Filter isnotnull(timestamp#10L)
            +- Scan ExistingRDD[timestamp#10L,total_volume#11]




In [None]:
from pyspark.sql.functions import from_unixtime , col , round , to_date

bitcoin_df2 = ( bitcoin_df1.withColumn("Date" , to_date(from_unixtime(col("timestamp") / 1000)))
                        .withColumn("price",round(col("price") , 2))
                        .withColumn("market_cap",round(col("market_cap") / 1e12 , 2))
                        .withColumn("total_volume",round(col("total_volume") / 1e9 , 2))
                        .drop("timestamp")
                        .select("Date" , "price" , "market_cap" , "total_volume")
)

In [None]:
from pyspark.sql.functions import avg,sum,asc

bitcoin_df2 = (bitcoin_df2.groupBy(col("Date"))
              .agg(round(avg("price"),2).alias("Average_Daily_Price"),
                   round(avg("market_cap"),2).alias("Average_Market_Cap"),
                   round(sum("total_volume"),2).alias("Daily_Volume")
                )
              .orderBy(col("Date").asc())
)


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag , when , col, round

Window_Specs = Window.orderBy(col("Date").asc())

def percentage_change(df , colName , alias):
    lag_name = "Lag_" + colName
    lag_func = lag(colName).over(Window_Specs)

    return (
            df.withColumn(lag_name , lag_func)
              .withColumn("Lag_Date" , lag("Date").over(Window_Specs))
              .withColumn(alias , when(col(lag_name).isNull() , 0).when(col(lag_name) == 0 , 0).otherwise(round(((col(colName) - col(lag_name)) / col(lag_name)) * 100 ,2)))
              .select("Date" , "Lag_Date" , alias)
    ) 

bitcoin_price = percentage_change(bitcoin_df2 , "Average_Daily_Price" , "Daily_Price_Change_Pct")
bitcoin_market_cap = percentage_change(bitcoin_df2 , "Average_Market_Cap" , "Daily_Market_Cap_Change_Pct")
bitcoin_volume = percentage_change(bitcoin_df2 , "Daily_Volume" , "Daily_Volume_Change_Pct")

In [None]:
from pyspark.sql.functions import asc , desc , col 

def sharpest_values(changefunc , pctname,colName , mode):

    if mode == "increase":
        col_ord = col(pctname).desc()
    else:
        col_ord = col(pctname).asc()

    sharpest_row = changefunc.orderBy(col_ord).limit(1).select("Date","Lag_Date").alias("s")
    
    return (
            bitcoin_df2.alias("b").join(sharpest_row,
                (col("b.Date") == col("s.Date")) | (col("b.Date") == col("s.Lag_Date")), "inner").select(col("b.Date") , col(colName))
    )

In [None]:
bitcoin_price_increase_values = sharpest_values(bitcoin_price ,"Daily_Price_Change_Pct","Average_Daily_Price" , "increase" )
bitcoin_market_cap_increase_values = sharpest_values(bitcoin_market_cap ,"Daily_Market_Cap_Change_Pct",  "Average_Market_Cap" , "increase")
bitcoin_volume_increase_values = sharpest_values(bitcoin_volume ,"Daily_Volume_Change_Pct","Daily_Volume" , "increase")

bitcoin_price_decrease_values = sharpest_values(bitcoin_price ,  "Daily_Price_Change_Pct","Average_Daily_Price" , "decrease")
bitcoin_market_cap_decrease_values = sharpest_values(bitcoin_market_cap , "Daily_Market_Cap_Change_Pct","Average_Market_Cap" , "decrease")
bitcoin_volume_decrease_values = sharpest_values(bitcoin_volume , "Daily_Volume_Change_Pct","Daily_Volume" , "decrease")

In [None]:
def df_to_lists(df , col1 , col2):
    l1 = df.select(col1 , col2).toLocalIterator()
    x = []
    y = []
    for i in l1:
        x.append(i[col1].strftime('%Y - %m - %d'))
        y.append(i[col2])
        
    return x , y

In [None]:
price_increase_x , price_increase_y = df_to_lists(bitcoin_price_increase_values, "Date" , "Average_Daily_Price")
#price_decrease_x , price_decrease_y = df_to_lists(bitcoin_price_decrease_values,  "Date" , "Average_Daily_Price")

#market_cap_increase_x , market_cap_increase_y = df_to_lists(bitcoin_market_cap_increase_values, "Date", "Average_Market_Cap")
#market_cap_decrease_x , market_cap_decrease_y = df_to_lists(bitcoin_market_cap_decrease_values, "Date", "Average_Market_Cap")

#volume_increase_x,volume_increase_y = df_to_lists(bitcoin_volume_increase_values, "Date" , "Daily_Volume")
#volume_decrease_x,volume_decrease_y = df_to_lists(bitcoin_volume_decrease_values, "Date" , "Daily_Volume")


In [None]:
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

def graphs(mode , xcol , ycol ,name):
    plt.figure(figsize=(10, 6))
    plt.plot(xcol, ycol , marker = "o")
    plt.title(f"Sharpest {name} {mode}")
    plt.xlabel("Date")
    plt.ylabel(f"{name}")
    plt.legend()
    plt.grid(True)


with PdfPages("/opt/airflow/outputs/Bitcoin_Analysis_Report.pdf") as pdf:
    graphs("Increase" , price_increase_x , price_increase_y , "Price")
    pdf.savefig()
    plt.close()
    
    #graphs("Decrease" , price_decrease_x , price_increase_y , "Price")
    #pdf.savefig()
    #plt.close()

    #graphs("Increase" , market_cap_increase_x , market_cap_increase_y , "Market Cap")
    #pdf.savefig()
    #plt.close()
    
    #graphs("Decrease" , market_cap_decrease_x , market_cap_decrease_y , "Market Cap")
    #pdf.savefig()
    #plt.close()

    #graphs("Increase" , volume_increase_x , volume_increase_y , "Volume")
    #pdf.savefig()
    #plt.close()
    
    #graphs("Decrease" , volume_decrease_x , volume_decrease_y , "Volume")
    #pdf.savefig()
    #plt.close()