## Modern Data Architectures project: Binance data ingestion
This project is part of the Modern Data Architectures course. The goal of this project is to ingest data from the Binance cryptocurrency exchange and visualize it using Apache Spark and Jupyter notebooks.
It grabs raw data from a local data warehouse, then does some light ETLing before sending the data in parquet files to a standardized location in the server.

In [1]:
import findspark
findspark.init()

In [2]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = ' pyspark-shell'

In [4]:
#import json

In [5]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
.appName("candlesticks_visualizations.ipynb")
.config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
.getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [33]:
import pyspark.sql.functions as F
#from pyspark.sql.functions import explode

In [7]:
#test_file = open('/datalake/raw/binance/bitcoin_klines/2022/06/25/20220625183800.json', 'r')
#test = json.load('hdfs://localhost:9000/datalake/raw/binance/bitcoin_klines/2022/06/25/20220625183800.json')

In [27]:
spark.conf.set("spark.sql.caseSensitive", "true")
df = (spark.read
        #.option("header","false")
        .option("inferSchema", "true") 
        #.option("caseSensitive", 'true')
        .option("recursiveFileLookup", "true")
        .option("multiLine", "true")
        .json("hdfs://localhost:9000/datalake/raw/binance/bitcoin_klines/"))
#df.select(explode("k"))
#df.show()

                                                                                

In [12]:
df.printSchema()

root
 |-- E: long (nullable = true)
 |-- e: string (nullable = true)
 |-- k: struct (nullable = true)
 |    |-- B: string (nullable = true)
 |    |-- L: long (nullable = true)
 |    |-- Q: string (nullable = true)
 |    |-- T: long (nullable = true)
 |    |-- V: string (nullable = true)
 |    |-- c: string (nullable = true)
 |    |-- f: long (nullable = true)
 |    |-- h: string (nullable = true)
 |    |-- i: string (nullable = true)
 |    |-- l: string (nullable = true)
 |    |-- n: long (nullable = true)
 |    |-- o: string (nullable = true)
 |    |-- q: string (nullable = true)
 |    |-- s: string (nullable = true)
 |    |-- t: long (nullable = true)
 |    |-- v: string (nullable = true)
 |    |-- x: boolean (nullable = true)
 |-- s: string (nullable = true)



In [50]:
# Removed e, k.s, k.f, k.L, k.B as they are not needed in our analysis.
df2 = df.select(df["E"], df["s"], df["k.t"], df["k.T"], df["k.i"]\
               , df["k.o"], df["k.c"], df["k.h"], df["k.l"], df["k.v"]\
               , df["k.n"], df["k.x"], df["k.q"], df["k.V"], df["k.Q"])
df2.printSchema()

root
 |-- E: long (nullable = true)
 |-- s: string (nullable = true)
 |-- t: long (nullable = true)
 |-- T: long (nullable = true)
 |-- i: string (nullable = true)
 |-- o: string (nullable = true)
 |-- c: string (nullable = true)
 |-- h: string (nullable = true)
 |-- l: string (nullable = true)
 |-- v: string (nullable = true)
 |-- n: long (nullable = true)
 |-- x: boolean (nullable = true)
 |-- q: string (nullable = true)
 |-- V: string (nullable = true)
 |-- Q: string (nullable = true)



In [52]:
# Converted E, t, T, o, c, h, l, v, n, q, V and Q to their respective datatypes.

df2 = (df2.withColumn("E", F.timestamp_seconds(F.expr("E/1000")))
       .withColumn("t", F.timestamp_seconds(F.expr("t/1000")))
       .withColumn("T", F.timestamp_seconds(F.expr("T/1000")))
       .withColumn("o", F.col("o").cast("double"))
       .withColumn("c", F.col("c").cast("double"))
       .withColumn("h", F.col("h").cast("double"))
       .withColumn("l", F.col("l").cast("double"))
       .withColumn("v", F.col("v").cast("integer"))
       .withColumn("n", F.col("n").cast("integer"))
       .withColumn("q", F.col("q").cast("double"))
       .withColumn("V", F.col("V").cast("integer"))
       .withColumn("Q", F.col("Q").cast("double")))

#df2.limit(10).toPandas()

Unnamed: 0,E,s,t,T,i,o,c,h,l,v,n,x,q,V,Q
0,2022-06-25 19:28:00.000,BTCUSDT,2022-06-25 19:27:00,2022-06-25 19:27:59.999,1m,21113.09,21118.15,21149.0,21113.09,141,2061,True,3000321.0,57,1214874.0
1,2022-06-25 19:31:00.001,BTCUSDT,2022-06-25 19:30:00,2022-06-25 19:30:59.999,1m,21118.63,21141.52,21155.28,21117.6,91,1140,True,1924663.0,55,1170408.0
2,2022-06-25 19:32:00.001,BTCUSDT,2022-06-25 19:31:00,2022-06-25 19:31:59.999,1m,21141.52,21151.44,21164.21,21136.9,67,1136,True,1432233.0,44,950134.3
3,2022-06-25 18:46:00.001,BTCUSDT,2022-06-25 18:45:00,2022-06-25 18:45:59.999,1m,21047.18,21045.55,21052.79,21029.99,53,781,True,1129604.0,22,473716.7
4,2022-06-25 19:01:03.303,BTCUSDT,2022-06-25 19:00:00,2022-06-25 19:00:59.999,1m,21053.63,21022.48,21057.01,21022.47,53,845,True,1122768.0,13,279359.1
5,2022-06-25 19:03:00.001,BTCUSDT,2022-06-25 19:02:00,2022-06-25 19:02:59.999,1m,21029.84,21027.14,21033.98,21021.28,53,842,True,1122814.0,27,571003.8
6,2022-06-25 19:10:00.001,BTCUSDT,2022-06-25 19:09:00,2022-06-25 19:09:59.999,1m,21053.5,21071.6,21081.63,21053.49,47,942,True,1009310.0,25,545585.3
7,2022-06-25 19:27:00.001,BTCUSDT,2022-06-25 19:26:00,2022-06-25 19:26:59.999,1m,21080.93,21113.09,21119.46,21076.69,47,843,True,1009271.0,29,620048.1
8,2022-06-25 19:29:00.001,BTCUSDT,2022-06-25 19:28:00,2022-06-25 19:28:59.999,1m,21118.16,21120.93,21141.28,21107.29,60,959,True,1273649.0,35,747174.2
9,2022-06-25 18:38:00.000,BTCUSDT,2022-06-25 18:37:00,2022-06-25 18:37:59.999,1m,21052.51,21060.52,21060.52,21051.45,25,461,True,547392.4,14,302391.8


In [55]:
# Renamed all fields for clarity

df2_std = (df2.withColumnRenamed("t","kline_start_time")
              .withColumnRenamed("E","event_time")
              .withColumnRenamed("s","symbol")
              .withColumnRenamed("c","close_price")
              .withColumnRenamed("o","open_price")
              .withColumnRenamed("h","high_price")
              .withColumnRenamed("l","low_price")
              .withColumnRenamed("v","base_volume") 
              .withColumnRenamed("q","quote_volume")
              .withColumnRenamed("T","kline_close_time")
              .withColumnRenamed("i","interval")
              .withColumnRenamed("n","num_of_trades")
              .withColumnRenamed("x","kline_closed")
              .withColumnRenamed("V","taker_buy_base_vol")
              .withColumnRenamed("Q","taker_buy_quote_vol")
          )

df2_std.toPandas()

Unnamed: 0,event_time,symbol,kline_start_time,kline_close_time,interval,open_price,close_price,high_price,low_price,base_volume,num_of_trades,kline_closed,quote_volume,taker_buy_base_vol,taker_buy_quote_vol
0,2022-06-25 19:28:00.000,BTCUSDT,2022-06-25 19:27:00,2022-06-25 19:27:59.999,1m,21113.09,21118.15,21149.00,21113.09,141,2061,True,3.000321e+06,57,1.214874e+06
1,2022-06-25 19:31:00.001,BTCUSDT,2022-06-25 19:30:00,2022-06-25 19:30:59.999,1m,21118.63,21141.52,21155.28,21117.60,91,1140,True,1.924663e+06,55,1.170408e+06
2,2022-06-25 19:32:00.001,BTCUSDT,2022-06-25 19:31:00,2022-06-25 19:31:59.999,1m,21141.52,21151.44,21164.21,21136.90,67,1136,True,1.432233e+06,44,9.501343e+05
3,2022-06-25 18:46:00.001,BTCUSDT,2022-06-25 18:45:00,2022-06-25 18:45:59.999,1m,21047.18,21045.55,21052.79,21029.99,53,781,True,1.129604e+06,22,4.737167e+05
4,2022-06-25 19:01:03.303,BTCUSDT,2022-06-25 19:00:00,2022-06-25 19:00:59.999,1m,21053.63,21022.48,21057.01,21022.47,53,845,True,1.122768e+06,13,2.793591e+05
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
79,2022-06-25 19:17:00.001,BTCUSDT,2022-06-25 19:16:00,2022-06-25 19:16:59.999,1m,21079.28,21077.48,21079.68,21074.43,8,320,True,1.709247e+05,3,8.245450e+04
80,2022-06-25 19:19:00.000,BTCUSDT,2022-06-25 19:18:00,2022-06-25 19:18:59.999,1m,21075.71,21073.40,21075.71,21072.55,5,265,True,1.092697e+05,3,6.367763e+04
81,2022-06-25 19:23:00.001,BTCUSDT,2022-06-25 19:22:00,2022-06-25 19:22:59.999,1m,21074.21,21078.27,21078.28,21070.92,7,241,True,1.518003e+05,4,9.608626e+04
82,2022-06-25 19:24:00.001,BTCUSDT,2022-06-25 19:23:00,2022-06-25 19:23:59.999,1m,21078.28,21071.22,21078.28,21071.21,8,214,True,1.881606e+05,0,1.576783e+04


In [56]:
# Adding date field for partitioning
df2_std = (df2_std.withColumn("date",F.to_date("event_time"))
                  .where("event_time is not null"))

In [58]:
# Upload standardized data into HDFS
(df2_std.coalesce(1)
          .write
          .partitionBy("symbol","date")
          .mode("overwrite")
          .parquet("hdfs://localhost:9000/datalake/std/binance/"))

                                                                                