In [1]:
import os
import pandas as pd
from tqdm.notebook import tqdm
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor

The tick trade data from Binance, provided in the [link](https://data.binance.vision/?prefix=data/spot/monthly/trades/), is organized in a CSV format. Each row in the data represents a single trade event on the Binance platform for the specified trading pair. The typical columns found in this tick trade data are:

1. **Trade ID**: A unique identifier for each trade.
2. **Price**: The price at which the trade was executed.
3. **Quantity**: The quantity of the asset traded.
4. **Quote Quantity**: The total value of the trade in quote currency (price * quantity).
5. **Timestamp**: The time at which the trade occurred, usually in Unix timestamp format (milliseconds since epoch).
6. **Buyer is Maker**: A boolean indicating whether the buyer is the maker (true) or the taker (false).
7. **Best Match**: A boolean indicating whether the trade was the best match (true) or not (false).

In [3]:
# Set the data types for the columns to optimize memory usage
dtypes = {
    'TradeID': 'int64',
    'Price': 'float64',
    'Quantity': 'float64',
    'QuoteQuantity': 'float64',
    'Timestamp': 'int64',
    'BuyerIsMaker': 'bool',
    'BestMatch': 'bool'
}

In [4]:
path = 'D://studying//masters//big_data//data//'

In [5]:
pd.read_csv(path+os.listdir(path)[0], header=None, dtype=dtypes, names=dtypes.keys())

Unnamed: 0,TradeID,Price,Quantity,QuoteQuantity,Timestamp,BuyerIsMaker,BestMatch
0,96005099,33.60,1.106,37.16160,1722470401898,False,True
1,96005100,33.61,1.889,63.48929,1722470401898,False,True
2,96005101,33.62,0.224,7.53088,1722470401898,False,True
3,96005102,33.62,2.759,92.75758,1722470401898,False,True
4,96005103,33.63,2.748,92.41524,1722470401898,False,True
...,...,...,...,...,...,...,...
1648273,97653372,32.20,2.446,78.76120,1725148774829,True,True
1648274,97653373,32.20,3.108,100.07760,1725148782875,True,True
1648275,97653374,32.20,1.089,35.06580,1725148782875,True,True
1648276,97653375,32.19,3.105,99.94995,1725148782911,False,True


The values of csv dataframes are sorted from the beginning till the end. So as not to make ourselves sort 10+ GB total dataframes, we should concat them in the most proper way. Therefore, let us store all names of the sorted (by descending) files in the variable csv_files. For calculating the features without getting blue screen we will store tata for each currency separately. 

In [7]:
# Function to process a single pair of CSV files and save as Parquet
def process_pair(pair: str, pair_files: List[str], path: str, dtypes: Dict[str, str]) -> None:
    try:
        # Extract date range from file names
        date_start: str = pair_files[0][-11:-4]  # Extract YYYY-MM from the first file
        date_end: str = pair_files[-1][-11:-4]   # Extract YYYY-MM from the last file

        # List to store DataFrames for the current pair
        df_list: List[pd.DataFrame] = []

        # Process each file for the current pair
        for file in pair_files:
            file_path: str = os.path.join(path, file)
            df_i: pd.DataFrame = pd.read_csv(file_path, header=None, dtype=dtypes, names=dtypes.keys())
            df_i['Currency'] = pair
            df_list.append(df_i)

        # Concatenate all DataFrames
        df: pd.DataFrame = pd.concat(df_list, ignore_index=True)

        # Construct the Parquet file name
        parquet_filename: str = f"{pair}-trades-{date_start}-{date_end}.parquet.gzip"
        parquet_path: str = os.path.join(path, parquet_filename)

        # Save to Parquet with gzip compression
        df.to_parquet(parquet_path, compression='gzip')

        print(f"Saved {parquet_filename}")
    except Exception as e:
        print(f"Error processing {pair}: {e}")

# Function to process all currency pairs in parallel
def process_all_pairs(path: str, dtypes: Dict[str, str]) -> None:
    # Get list of all CSV files in the directory
    csv_files: List[str] = sorted([f for f in os.listdir(path) if f.endswith('.csv')])
    currency_pairs: set[str] = set(f.split('-')[0] for f in csv_files)

    # Create a list of tasks for parallel processing
    tasks: List = []
    with ThreadPoolExecutor(max_workers=5) as executor:  # Adjust max_workers based on your system
        for pair in currency_pairs:
            # Filter files by pair
            pair_files: List[str] = sorted([f for f in csv_files if f.startswith(pair)])
            tasks.append(executor.submit(process_pair, pair, pair_files, path, dtypes))

        # Wait for all tasks to complete
        for task in tqdm(tasks, desc="Saving Parquet files"):
            task.result()  # Ensure all tasks are completed

# Set the directory path and data types
path: str = 'D://studying//masters//big_data//data//'
dtypes: Dict[str, str] = {
    'TradeID': 'int64',
    'Price': 'float64',
    'Quantity': 'float64',
    'QuoteQuantity': 'float64',
    'Timestamp': 'int64',
    'BuyerIsMaker': 'bool',
    'BestMatch': 'bool'
}

# Call the function to process and save all Parquet files in parallel
process_all_pairs(path, dtypes)

Saving Parquet files:   0%|          | 0/76 [00:00<?, ?it/s]

Saved LOOMUSDT-trades-2024-03-2024-08.parquet.gzip
Saved ZECUSDT-trades-2024-03-2024-08.parquet.gzip
Saved ARKUSDT-trades-2024-03-2024-08.parquet.gzip
Saved ARDRUSDT-trades-2024-03-2024-08.parquet.gzip
Saved DCRUSDT-trades-2024-03-2024-08.parquet.gzip
Saved IOTXUSDT-trades-2024-03-2024-08.parquet.gzip
Saved CYBERUSDT-trades-2024-03-2024-08.parquet.gzip
Saved VITEUSDT-trades-2024-03-2024-08.parquet.gzip
Saved SCUSDT-trades-2024-03-2024-08.parquet.gzip
Saved DYMUSDT-trades-2024-03-2024-08.parquet.gzip
Saved FIOUSDT-trades-2024-03-2024-08.parquet.gzip
Saved CKBUSDT-trades-2024-03-2024-08.parquet.gzip
Saved QKCUSDT-trades-2024-03-2024-08.parquet.gzip
Saved WAVESUSDT-trades-2024-03-2024-06.parquet.gzip
Saved HOTUSDT-trades-2024-03-2024-08.parquet.gzip
Saved FRONTUSDT-trades-2024-03-2024-08.parquet.gzip
Saved FIROUSDT-trades-2024-03-2024-08.parquet.gzip
Saved FISUSDT-trades-2024-03-2024-08.parquet.gzip
Saved NULSUSDT-trades-2024-03-2024-08.parquet.gzip
Saved QTUMUSDT-trades-2024-03-2024-08.p

In [8]:
# Function to delete CSV files after processing (optional)
def delete_files_in_directory(directory_path: str) -> None:
    try:
        files: List[str] = os.listdir(directory_path)
        for file in files:
            file_path: str = os.path.join(directory_path, file)
            if os.path.isfile(file_path) and file.endswith('.csv'):
                os.remove(file_path)
        print("All CSV files deleted successfully.")
    except OSError as e:
        print(f"Error occurred while deleting files: {e}")

# Usage (optional)
delete_files_in_directory(path)

All CSV files deleted successfully.
