In [2]:
import os
import yaml
from zipfile import ZipFile
from datetime import datetime
import re

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, DoubleType
from pyspark.sql.functions import col, from_unixtime, date_format, collect_set, asc

In [3]:
spark = SparkSession.builder.master("local[*]").appName('SetTimestampIndex')\
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
        .config("spark.driver.memory","4g") \
        .config("spark.executor.memory", "4g") \
        .config("spark.sql.session.timeZone", "UTC") \
        .getOrCreate()

24/04/01 15:40:57 WARN Utils: Your hostname, skynet resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp2s0)
24/04/01 15:40:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/01 15:40:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Open the YAML file and load its contents into a dictionary
with open('../../references/config_notebook.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Access the variables in the dictionary
my_vars = config

# Database Creation

### Zip to parquet

In [None]:
def zip_to_parquet(zip_folder_path, output_folder_path):
    """
    Extracts zipped CSV files, reads them into PySpark DataFrames,
    and saves the data as partitioned Parquet files based on the zip filename.

    Parameters:
    zip_folder_path (str): The path to the input zip folder.
    output_folder_path (str): The path to the output directory.

    Returns:
    None
    """

    # Create a SparkSession
    """spark = SparkSession.builder.appName("ZipToParquet")\
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
                .config("spark.driver.memory","4g") \
                .config("spark.executor.memory", "4g") \
                .config("spark.sql.session.timeZone", "UTC") \
                .getOrCreate()"""

    # Define the schema
    schema = StructType([ 
        StructField("id",StringType(),True), 
        StructField("price",DoubleType(),True), 
        StructField("qty",DoubleType(),True),
        StructField("quoteQty",DoubleType(),True),
        StructField("timestamp",StringType(),True),
        StructField("makerBuy",BooleanType(),True),
        StructField("bestPrice",BooleanType(),True)
    
    ])

    # Loop through all the zip files inside the input folder
    zip_paths = sorted([os.path.join(zip_folder_path, filename) for filename in os.listdir(zip_folder_path) if filename.endswith(".zip")])
    for zip_path in zip_paths:
        # Extract the CSV files from the zip file
        with ZipFile(zip_path, "r") as zip_ref:
            csv_filename = [f for f in zip_ref.namelist() if f.endswith('.csv')][0]  # assuming there is only one file in the zip file
        # Read the CSV data into a PySpark DataFrame
            csv_data = zip_ref.extract(csv_filename)

        df = spark.read \
            .format("csv") \
            .schema(schema) \
            .option("header", "false") \
            .option("sep", ",") \
            .load(csv_data)

        # Convert the timestamp column to a proper timestamp type and a human readable format
        df = df.withColumn("timestamp",  date_format(from_unixtime(df['timestamp']/ 1000), "yyyy-MM-dd HH:mm:ss"))
        
        # Add additional time-related columns for partitioning
        df = df.withColumn("zipname", date_format(col("timestamp"), "yyyyMMdd"))

        # Save the data as a partitioned Parquet file based on the zip filename
        output_path = os.path.join(output_folder_path, "BTCUSDT.parquet")
        

        df.write \
            .partitionBy("zipname") \
            .mode("append") \
            .option("compression", "gzip") \
            .option("blockSize", "256m") \
            .parquet(output_path)
        
        # Delete the CSV file
        os.remove(csv_data)
    
    # Stop the SparkSession
    # #spark.stop()
        

## Generate the database

In [None]:
zip_folder_path = my_vars['SOURCES']['binance']['BTCUSDT']
output_folder_path = my_vars['DATA']['external']
zip_to_parquet(zip_folder_path, output_folder_path)

# Update the database

## Update parquet datatabase

In [7]:
def update_parquet(zip_folder_path, output_folder_path, existing_parquet_path):
    """
    Updates an existing Parquet file with new data from zip files in the input folder.

    Parameters:
    zip_folder_path (str): The path to the input zip folder.
    output_folder_path (str): The path to the output directory.
    existing_parquet_path (str): The path to the existing Parquet file.

    Returns:
    None

    """
    # Create a SparkSession
    """spark = SparkSession.builder.appName("UpdateParquet")\
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
            .config("spark.driver.memory","4g") \
            .config("spark.executor.memory", "4g") \
            .config("spark.sql.session.timeZone", "UTC") \
            .getOrCreate()"""

    # Define the schema
    schema = StructType([ 
        StructField("id",StringType(),True), 
        StructField("price",DoubleType(),True), 
        StructField("qty",DoubleType(),True),
        StructField("quoteQty",DoubleType(),True),
        StructField("timestamp",StringType(),True),
        StructField("makerBuy",BooleanType(),True),
        StructField("bestPrice",BooleanType(),True)
    
    ])

    # Check if the existing Parquet file exists
    if os.path.exists(existing_parquet_path):
        # Read the existing Parquet file
        existing_df = spark.read.parquet(existing_parquet_path)
        # Get the list of zip names in the existing Parquet file
        existing_zips = existing_df.select(collect_set("zipname").alias("zips")).collect()[0]["zips"]
        zip_strings = [str(zipname) for zipname in existing_zips]
        

    # Get a list of new zip files in the input folder
    zip_names = [datetime.strptime(re.search("([0-9]{4}\-[0-9]{2}\-[0-9]{2})",os.path.join(zip_folder_path, f)).group(1), '%Y-%m-%d').strftime('%Y%m%d') for f in os.listdir(zip_folder_path) if f.endswith('.zip')]
    new_zip_files = [os.path.join(zip_folder_path, f"BTCUSDT-trades-{datetime.strptime(x, '%Y%m%d').strftime('%Y-%m-%d')}.zip") for x in zip_names if x not in zip_strings]
    
    
    # Check if there are any new zip files
    if not new_zip_files:
        raise ValueError("No new data found in input folder.")

    # Loop through the new zip files and write them to the Parquet file
    for zip_path in new_zip_files:
        # Extract the CSV files from the zip file
        with ZipFile(zip_path, "r") as zip_ref:
            csv_filename = [f for f in zip_ref.namelist() if f.endswith('.csv')][0]  # assuming there is only one file in the zip file
            csv_data = zip_ref.extract(csv_filename)

        # Read the CSV file into a PySpark DataFrame
        df = spark.read \
            .format("csv") \
            .schema(schema) \
            .option("header", "false") \
            .option("sep", ",") \
            .load(csv_data)

        # Convert the timestamp column to a proper timestamp type and a human readable format
        df = df.withColumn("timestamp",  date_format(from_unixtime(df['timestamp']/ 1000), "yyyy-MM-dd HH:mm:ss"))

        # Add additional time-related columns for partitioning
        df = df.withColumn("zipname", date_format(col("timestamp"), "yyyyMMdd"))

        # Save the data as a partitioned Parquet file
        df.write \
            .partitionBy("zipname") \
            .mode("append") \
            .option("compression", "gzip") \
            .option("blockSize", "256m") \
            .parquet(existing_parquet_path)

        # Delete the CSV file
        os.remove(csv_data)
        
    # Stop the SparkSession
    # #spark.stop()


## Update the database

In [8]:
zip_folder_path = my_vars['SOURCES']['binance']['BTCUSDT']
output_folder_path = my_vars['DATA']['external']
existing_parquet_path = os.path.join(output_folder_path,'BTCUSDT.parquet')

In [9]:
update_parquet(zip_folder_path, output_folder_path, existing_parquet_path)

                                                                                

# Controls

In [11]:
df = spark.read.parquet(f'{output_folder_path}/BTCUSDT.parquet')
df.show()

[Stage 368:>                                                        (0 + 1) / 1]

+----------+--------+-------+------------+-------------------+--------+---------+--------+
|        id|   price|    qty|    quoteQty|          timestamp|makerBuy|bestPrice| zipname|
+----------+--------+-------+------------+-------------------+--------+---------+--------+
|2960351314| 24764.4|  0.001|     24.7644|2023-03-14 11:17:20|   false|     true|20230314|
|2961749415|25140.59| 0.0017|   42.739003|2023-03-14 12:43:21|    true|     true|20230314|
|2960351315|24764.41|0.01953| 483.6489273|2023-03-14 11:17:20|   false|     true|20230314|
|2961749416|25140.44|0.35386|8896.1960984|2023-03-14 12:43:21|    true|     true|20230314|
|2960351316|24764.76|0.00634| 157.0085784|2023-03-14 11:17:20|   false|     true|20230314|
|2961749417|25140.45| 5.0E-4|   12.570225|2023-03-14 12:43:21|    true|     true|20230314|
|2960351317| 24765.0| 5.0E-4|     12.3825|2023-03-14 11:17:20|   false|     true|20230314|
|2961749418|25143.21|0.00131|  32.9376051|2023-03-14 12:43:21|   false|     true|20230314|

                                                                                

In [12]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- qty: double (nullable = true)
 |-- quoteQty: double (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- makerBuy: boolean (nullable = true)
 |-- bestPrice: boolean (nullable = true)
 |-- zipname: integer (nullable = true)



In [10]:
pdf = spark.read.parquet(f'{output_folder_path}/BTCUSDT.parquet/zipname=20210301')
pdf.show(truncate= False)

+---------+--------+--------+--------------+-------------------+--------+---------+
|id       |price   |qty     |quoteQty      |timestamp          |makerBuy|bestPrice|
+---------+--------+--------+--------------+-------------------+--------+---------+
|676193574|45134.11|0.060288|2721.04522368 |2021-03-01 00:00:00|true    |true     |
|676193575|45134.12|0.001   |45.13412      |2021-03-01 00:00:00|false   |true     |
|676193576|45134.11|0.004975|224.54219725  |2021-03-01 00:00:00|true    |true     |
|676193577|45134.11|0.278364|12563.71139604|2021-03-01 00:00:00|true    |true     |
|676193578|45134.12|8.3E-4  |37.4613196    |2021-03-01 00:00:00|false   |true     |
|676193579|45134.12|0.005721|258.21230052  |2021-03-01 00:00:00|false   |true     |
|676193580|45134.12|0.001899|85.70969388   |2021-03-01 00:00:00|false   |true     |
|676193581|45134.12|0.001242|56.05657704   |2021-03-01 00:00:00|false   |true     |
|676193582|45134.12|0.001076|48.56431312   |2021-03-01 00:00:00|false   |tru

In [13]:
df.describe().show()



+-------+--------------------+------------------+-------------------+------------------+-------------------+--------------------+
|summary|                  id|             price|                qty|          quoteQty|          timestamp|             zipname|
+-------+--------------------+------------------+-------------------+------------------+-------------------+--------------------+
|  count|          2847787820|        2847787820|         2847787820|        2847787820|         2847787820|          2847787820|
|   mean|2.1000916412026892E9| 30619.24829765197|0.04000853582058906|1150.6497008637173|               null|2.0223325979372453E7|
| stddev| 8.220859925967758E8|13994.768901813319| 0.1787653443882412| 5471.992021023345|               null|   8172.348094418442|
|    min|          1000000000|           15476.0|             1.0E-6|             0.016|2021-03-01 00:00:00|            20210301|
|    max|           999999999|           73777.0|          808.03151|   3.38029281194E7|20

                                                                                

In [14]:
result = df.groupBy("zipname").count().orderBy(asc("zipname"))
result.show(1000)



+--------+--------+
| zipname|   count|
+--------+--------+
|20210301| 2147223|
|20210302| 1855583|
|20210303| 2242131|
|20210304| 2291936|
|20210305| 2054216|
|20210306| 1476474|
|20210307| 1758101|
|20210308| 1999401|
|20210309| 2194398|
|20210310| 2647284|
|20210311| 2509970|
|20210312| 2489931|
|20210313| 2559002|
|20210314| 1829769|
|20210315| 2683815|
|20210316| 2241208|
|20210317| 2204296|
|20210318| 2040947|
|20210319| 1885830|
|20210320| 1724131|
|20210321| 1773546|
|20210322| 1927345|
|20210323| 1740349|
|20210324| 2401988|
|20210325| 2099425|
|20210326| 1773150|
|20210327| 1528776|
|20210328| 1551283|
|20210329| 1983437|
|20210330| 1612744|
|20210331| 1928224|
|20210401| 1594431|
|20210402| 1666356|
|20210403| 1606805|
|20210404| 1499113|
|20210405| 2035877|
|20210406| 1960513|
|20210407| 2206433|
|20210408| 1661080|
|20210409| 1615962|
|20210410| 2470398|
|20210411| 1746102|
|20210412| 1806298|
|20210413| 2703735|
|20210414| 2645625|
|20210415| 1938028|
|20210416| 2639940|


                                                                                

In [15]:
spark.stop()