# Overview:

This utility script provides a streamlined way to process and load large volumes of financial data into a ScyllaDB database. It primarily targets minute-resolution, adjusted-split financial bars but can be adapted for different data structures. In addition to its core functionalities, it offers the `fetch_minute_data` function to retrieve specific minute-level datasets from the database. The script uses efficient techniques such as multiprocessing, batching, and asynchronous fetching to enhance speed and consistency.

---

## Features:

- **Parallel Processing:** Utilizes Python's multiprocessing library to parallelize data processing, making full use of available CPU cores.
- **Automatic Retries:** Implements a retry mechanism for data insertions, ensuring data consistency even in the face of transient database errors.
- **Progress Monitoring:** Offers a real-time progress bar to keep track of the data loading process.
- **Error Logging:** Captures and logs errors encountered during data processing and loading for easier troubleshooting.
- **Data Preprocessing:** Preprocesses raw data files to extract relevant information and organize it into a more database-friendly format.
- **Custom Date Ranges:** Users can set specific start and end times using `fetch_minute_data`.
- **Trading Hours Filter:** Provides an option to retrieve data only from standard trading hours.
- **Month-based Bucketing System:** Organizes data for efficient retrieval.
- **Asynchronous Data Fetching:** Enables parallel data retrieval across different time intervals.

---

## Setup and Usage:

### Prerequisites:
- Python 3.11.4
- ScyllaDB
- Python packages: `os`, `shutil`, `subprocess`, `multiprocessing`, `cassandra-driver`, `tqdm`, `datetime`

### Configuration:
1. Adjust the constants at the beginning of the script, such as `KEYSPACE`, `TABLE`, and `CSV_PATH`, to match your setup.
2. Ensure the ScyllaDB cluster is up and accessible from the script execution environment.

### Execution:
- Simply run the cells sequentially. By default, the script processes a specific number of .txt files located in the directory specified by `CSV_PATH`. Adjust as needed for processing different file counts.

---

## Function Descriptions:

- `divide_list_into_chunks()`: Splits a list into approximately equal-sized chunks for parallel processing.
- `list_files()`: Lists all '.txt' files in the specified directory.
- `create_keyspace_and_table()`: Establishes a connection to ScyllaDB and creates a keyspace and table if not already present.
- `clear_temp_folder()`: Clears any existing temporary files before processing starts.
- `log_error()`: Logs errors encountered during data processing.
- `execute_with_retry()`: Retries data insertion in case of transient database errors.
- `get_bucket_from_timestamp()`: Extracts year-month info from timestamps, useful for bucketing data.
- `preprocess_and_load()`: Processes each financial data file and loads the data into ScyllaDB.
- `process_chunk()`: Handles a chunk of data files, invoking preprocessing and loading for each.
- `monitor_progress()`: A thread-safe function that updates the progress bar as data files are processed.
- `fetch_minute_data`: Retrieves minute-level financial data from the database based on specified criteria.

---

**Note:** Always ensure you have backups of your data and that you've set up appropriate monitoring and alerting for your ScyllaDB cluster. Regularly check the error log for any issues during data loading.

The notebook was created and tested in Visual Studio Code. It is not guaranteed that it will work in a different environment.  


In [10]:
import os
import sys
import shutil
import subprocess
import time
import multiprocessing as mp
from cassandra.cluster import Cluster
from tqdm.notebook import tqdm
from datetime import datetime
from threading import Thread


In [11]:
# Constants for database, paths, and retries
KEYSPACE = 'financial_data'
TABLE = 'test_data_bars_1m_adjsplit'
CSV_PATH = '/home/jj/anaconda3/envs/stocks/Database/1M/data'
CSV_PATH_TEMP = '/home/jj/anaconda3/envs/stocks/Database/1M/temp'
CSV_PATH_LOG = '/home/jj/anaconda3/envs/stocks/Database/1M/log'
MAX_RETRIES = 5
RETRY_PAUSE = 60  # Duration in seconds to pause between retries
SCYLLA_NODE_IP = '192.168.3.41' # Node IP address
SCYLLA_NODE_PORT = '9042' # Node port

def divide_list_into_chunks(lst, m):
    # Split a list into approximately equal-sized chunks
    n = len(lst)
    chunk_size = n // m
    for i in range(0, m - 1):
        yield lst[i * chunk_size : (i + 1) * chunk_size]
    yield lst[(m - 1) * chunk_size:]

def list_files(path):
    # Lists all '.txt' files in the provided directory path
    all_files = os.listdir(path)
    files = list(filter(lambda f: f.endswith('.txt'), all_files))
    return files
    

In [12]:
import pandas as pd

symbol = 'A'
sample_data = pd.read_csv(f"{CSV_PATH}/{symbol}_full_1min_adjsplit.txt", header=None)
sample_data

Unnamed: 0,0,1,2,3,4,5
0,2005-01-03 09:30:00,17.2389,17.2389,17.2318,17.2389,173492.0
1,2005-01-03 09:31:00,17.2389,17.2389,17.2246,17.2246,9646.0
2,2005-01-03 09:32:00,17.2246,17.2389,17.2246,17.2318,34810.0
3,2005-01-03 09:33:00,17.2318,17.2389,17.2318,17.2389,11464.0
4,2005-01-03 09:34:00,17.2389,17.2389,17.2318,17.2318,12302.0
...,...,...,...,...,...,...
1835693,2023-10-05 15:56:00,110.3200,110.3400,110.2800,110.3200,8780.0
1835694,2023-10-05 15:57:00,110.3100,110.4350,110.3100,110.3800,13658.0
1835695,2023-10-05 15:58:00,110.3750,110.4200,110.3650,110.4200,12518.0
1835696,2023-10-05 15:59:00,110.4200,110.4200,110.3100,110.3200,30497.0


In [4]:
def clear_temp_folder():
    """Purges the contents of the designated temporary directory."""
    
    for filename in os.listdir(CSV_PATH_TEMP):
        file_path = os.path.join(CSV_PATH_TEMP, filename)
        
        try:
            # Identify and remove files or symbolic links
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            # Recognize and delete directories along with their inner contents
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            # Capture and report any encountered deletion issues
            print(f'Failed to delete {file_path}. Reason: {e}')

def log_error(symbol, error_message):
    """
    Appends error messages related to specific financial symbols to a designated log file.

    Parameters:
        symbol (str): The identifier of the financial instrument or stock.
        error_message (str): A concise description of the encountered issue.
    """
    
    log_file = os.path.join(CSV_PATH_LOG, "error_log.txt")
    
    with open(log_file, "a") as file:
        file.write(f"Symbol: {symbol} - Error: {error_message}\n")

def execute_with_retry(command, symbol):
    """
    Tries executing a given command and retries in case of specific exceptions.
    
    Parameters:
        command (list): The command to be executed as a list of strings.
        symbol (str): The financial symbol for which the command is being executed.
        
    Returns:
        subprocess.CompletedProcess: The result of the executed command.
    """
    
    # Initialize the retry count
    retries = 0
    
    # Keep trying until reaching the maximum allowed retries
    while retries < MAX_RETRIES:
        try:
            # Run the command and capture its output
            result = subprocess.run(command, capture_output=True, text=True)
            
            # If the error stream does not indicate a "WriteTimeout", return the result
            if "WriteTimeout" not in result.stderr:
                return result
            else:
                # Otherwise, raise a custom exception to trigger a retry
                raise Exception("WriteTimeout encountered")
        except Exception as e:
            # If any exception occurs, increment the retry count
            retries += 1
            
            # Log the error for the specific symbol and the exception encountered
            log_error(symbol, str(e))
            
            # Pause the execution for a specified duration before the next retry
            time.sleep(RETRY_PAUSE)
    
    # If the function reaches here, it means max retries were attempted and all failed
    print(f"Max retries reached for symbol {symbol}. Moving on to the next file...")

In [5]:
def create_keyspace_and_table():
    """
    Establishes a connection to Scylla DB, then initializes a keyspace and a corresponding table if they aren't present.

    This function serves as the initial setup phase in the data processing pipeline, ensuring the database is ready
    for incoming data. The table is designed to store financial data like stock prices, and its schema optimizes query 
    performance for time-based lookups and data compactness.

    Steps:
    1. Establish a connection to the Scylla cluster.
    2. Create the desired keyspace, if it's absent. The keyspace is equipped with a simple replication strategy with 
       a factor of 1, which implies data is only stored on one node. This strategy might not be optimal for 
       production systems, where redundancy is crucial.
    3. Set the active keyspace for the session.
    4. Design and initialize the table. The schema is crafted with:
       - A composite primary key consisting of the symbol, bucket (year-month), and timestamp. This design allows 
         efficient time-based queries within a specific stock symbol and date range.
       - The 'TimeWindowCompactionStrategy' is employed, which is optimal for time series data. It groups data by 
         windows of time, compacting and purging obsolete data. The strategy used here compacts data in 30-day windows.
    5. Once the database operations are complete, the session and the connection to the cluster are terminated.
    """
    
    # Connect to the Scylla cluster
    cluster = Cluster([SCYLLA_NODE_IP], port=SCYLLA_NODE_PORT)
    session = cluster.connect()
    
    # Create the keyspace if it doesn't exist
    session.execute(f"CREATE KEYSPACE IF NOT EXISTS {KEYSPACE} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor' : 1}};")
    session.set_keyspace(KEYSPACE)
    
    # Define and create the table if it's not already there
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {KEYSPACE}.{TABLE} (
        symbol text,
        bucket text,
        timestamp text,
        open float,
        high float,
        low float,
        close float,
        volume float,
        PRIMARY KEY ((symbol, bucket), timestamp)
    ) WITH compaction = {{
        'class': 'TimeWindowCompactionStrategy',
        'compaction_window_unit': 'DAYS',
        'compaction_window_size': 30
    }};
    """)
    
    # Disconnect from the cluster and end the session
    session.shutdown()
    cluster.shutdown()

def get_bucket_from_timestamp(timestamp):
    # Extracts year-month information from a timestamp
    dt_obj = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
    return f"{dt_obj.year}-{dt_obj.month:02}"

def preprocess_and_load(symbol):
    """
    Preprocesses and then loads data for a specific financial symbol into the database.
    
    Parameters:
        symbol (str): The financial symbol to be processed.
        
    Returns:
        int: The number of lines (data entries) processed for the given symbol.
    """
    # Notify the user about the symbol currently being processed
    # print(f"Processing data for symbol: {symbol}")

    # Construct the paths for the input and temporary files
    input_file = os.path.join(CSV_PATH, f"{symbol}_full_1min_adjsplit.txt")
    temp_file = os.path.join(CSV_PATH_TEMP, f"{symbol}_temp.txt")
    
    line_count = 0  # Counter to track the number of lines processed
    
    # Open both the source file (input_file) and a temporary file (temp_file)
    with open(input_file, 'r') as source, open(temp_file, 'w') as target:
        # Loop through each line in the source file
        for line in source:
            line_count += 1  # Increment the line counter
            
            # Split the data and extract the timestamp
            data = line.strip().split(',')
            timestamp = data[0]
            
            # Get the year-month bucket based on the timestamp
            bucket = get_bucket_from_timestamp(timestamp)
            
            # Write the preprocessed data into the temporary file
            target.write(f"{symbol},{bucket},{timestamp},{','.join(data[1:])}\n")
    
    # Construct the command to copy data from the temporary file to ScyllaDB
    copy_cmd = ['./bin/cqlsh', '-e', f"COPY {KEYSPACE}.{TABLE} (symbol, bucket, timestamp, open, high, low, close, volume) FROM '{temp_file}' WITH DELIMITER=',' AND HEADER=FALSE"]
    
    # Execute the copy command with retries in case of WriteTimeout errors
    result = execute_with_retry(copy_cmd, symbol)
    
    # If there is any output or errors from the command, print them
    # if result:
    #     print(result.stdout)
    #     print(result.stderr)
    
    # Remove the temporary file after successfully copying data to the database
    os.remove(temp_file)
    
    return line_count


def process_chunk(chunk):
    """
    Process a subset (chunk) of financial symbol data files.
    The function will preprocess and load data for each symbol in the chunk.
    """
    total_rows = 0
    for symbol in chunk:
        # Preprocess and load data for the current symbol and count the number of rows.
        total_rows += preprocess_and_load(symbol)
        # Indicate that processing for one symbol has finished.
        progress_queue.put(1)
    return total_rows

def monitor_progress():
    """
    Monitors and updates the progress of the data loading process.
    It counts processed symbols and updates the progress bar.
    """
    processed = 0
    while processed < len(files_list):
        # Wait until a symbol has been processed.
        _ = progress_queue.get()
        # Update the progress bar.
        pbar.update(1)
        processed += 1

In [6]:
if __name__ == "__main__":
    # Main script execution starts.
    
    # Clear any existing temporary files.
    clear_temp_folder()
    # Create the required keyspace and table in ScyllaDB.
    create_keyspace_and_table()
    # List all the financial data files.
    files = list_files(CSV_PATH)
    # Extract symbol names from file names.
    files_list = [element.replace('_full_1min_adjsplit.txt', '') for element in files]
    # Sort the file list for consistent processing order.
    files_list.sort()
    # Initialize a multiprocessing queue to track progress.
    progress_queue = mp.Queue()
    processes = 5
    # (Optional) Limit to the first 10 files for processing.
    files_list = files_list[0:20]
    # Divide the file list into chunks for parallel processing.
    chunks = divide_list_into_chunks(files_list, processes)
    # Initialize a progress bar using tqdm.
    pbar = tqdm(total=len(files_list))
    # Start a separate thread to monitor processing progress.
    t = Thread(target=monitor_progress)
    t.start()
    with mp.Pool(processes=processes) as pool:
        results = []
        start_time = time.time()
        # Process each chunk in parallel using multiprocessing.
        for chunk in chunks:
            results.append(pool.apply_async(process_chunk, (chunk,)))
        # Wait for all processes to finish and get their results.
        results = [res.get() for res in results]
        end_time = time.time()
        # Calculate total processing time.
        total_time = end_time - start_time
        # Aggregate the number of rows processed across all chunks.
        total_rows_aggregated = sum(results)
        print(f"\nTotal rows inserted across processes: {total_rows_aggregated:,.0f} rows")
        print(f"Average insertion rate: {total_rows_aggregated / total_time:,.0f} rows/sec")
    # Ensure the progress monitoring thread completes.
    t.join()

  0%|          | 0/20 [00:00<?, ?it/s]


Total rows inserted across processes: 15,261,664 rows
Average insertion rate: 194,470 rows/sec


## Data Loader from the Database

### `fetch_minute_data` Function Overview:

The `fetch_minute_data` function is designed to pull minute-level financial data from a ScyllaDB (or Cassandra) database. With the assistance of pandas, it provides several functionalities:

- **Custom Date Ranges:** Users can set a specific start and end times for fetching the dataset.
- **Trading Hours Filter:** Provides an option to retrieve data only from standard trading hours.
- **Month-based Bucketing System:** Organizes data for efficient retrieval.
- **Asynchronous Data Fetching:** Enables parallel data retrieval across different time intervals.
- **Data Rounding:** Users have the option to round numerical values in the dataset to a precision of four decimal places.

For a smooth operation, ensure there's an operational ScyllaDB or Cassandra cluster, an appropriate Python environment, and the necessary keyspace and table references.


In [7]:
from datetime import datetime, timedelta
import pandas as pd
from cassandra.cluster import Cluster

def fetch_minute_data(symbol, start_time, end_time, KEYSPACE, TABLE, trading_hours_only, rounding):
    """
    Fetch minute-level financial data for a specified symbol and timeframe from a ScyllaDB (or Cassandra) database.
    
    Parameters:
    - symbol (str): The ticker symbol to retrieve data for.
    - start_time (str): The start of the desired timeframe in the format 'YYYY-MM-DD HH:MM:SS'.
    - end_time (str): The end of the desired timeframe in the format 'YYYY-MM-DD HH:MM:SS'.
    - KEYSPACE (str): The ScyllaDB/Cassandra keyspace to connect to.
    - TABLE (str): The table within the keyspace to query from.
    
    Returns:
    - dataframe (pd.DataFrame): A pandas DataFrame containing the queried financial data.
    """
    
    # Helper function to generate monthly bucket strings (YYYY-MM) for data partitioning
    def generate_monthly_buckets(start_time, end_time):
        start_date = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
        end_date = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
        current_date = start_date
        buckets = []
        
        # Loop through each month in the timeframe to generate bucket strings
        while current_date <= end_date:
            buckets.append(f"{current_date.year}-{current_date.month:02}")
            
            # Advance to the next month
            current_date = current_date + timedelta(days=31)
            current_date = datetime(current_date.year, current_date.month, 1)
        return buckets

    # Generate the necessary bucket strings for the desired timeframe
    buckets = generate_monthly_buckets(start_time, end_time)
    
    # List to store dataframes from each bucket
    dfs = []
    
    # Establish a connection to the ScyllaDB/Cassandra cluster and the specific keyspace
    cluster = Cluster([SCYLLA_NODE_IP], port=SCYLLA_NODE_PORT)
    session = cluster.connect(KEYSPACE)
    
    # Prepare the query to fetch data
    query = f"SELECT timestamp, open, high, low, close, volume FROM {TABLE} WHERE symbol = ? AND bucket = ? AND \"timestamp\" >= ? AND \"timestamp\" <= ?"
    prepared = session.prepare(query)
    futures = []

    # For each bucket, asynchronously fetch data for the symbol within the given timeframe
    for bucket in buckets:
        future = session.execute_async(prepared, (symbol, bucket, start_time, end_time))
        futures.append(future)

    # As queries complete, convert results to pandas dataframes
    for future in futures:
        rows = future.result()
        dfs.append(pd.DataFrame(list(rows)))

    # Shutdown the session and cluster connection after fetching data
    session.shutdown()
    cluster.shutdown()
    
    # Combine all the dataframes into one dataframe
    dataframe = pd.concat(dfs, ignore_index=True)
    
    # Set timestamp as the dataframe index and remove the timestamp column
    dataframe.index = dataframe['timestamp'].astype('datetime64[ns]') # type: ignore
    dataframe.drop('timestamp', inplace=True, axis=1)

    if trading_hours_only:
        dataframe = dataframe.between_time('09:30:00', '16:00:00')
    if rounding:
        dataframe = dataframe.round(4)

    return dataframe


In [8]:
symbol = 'A'
KEYSPACE = 'financial_data'
TABLE = 'test_data_bars_1m_adjsplit'
ohlc_df = fetch_minute_data(symbol, '2005-01-01 00:00:00', '2023-10-05 23:59:00', KEYSPACE, TABLE, trading_hours_only=False, rounding=True)
ohlc_df, print(ohlc_df.info())

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1835698 entries, 2005-01-03 09:30:00 to 2023-10-05 16:01:00
Data columns (total 5 columns):
 #   Column  Dtype  
---  ------  -----  
 0   open    float64
 1   high    float64
 2   low     float64
 3   close   float64
 4   volume  float64
dtypes: float64(5)
memory usage: 84.0 MB
None


(                         open      high       low     close    volume
 timestamp                                                            
 2005-01-03 09:30:00   17.2389   17.2389   17.2318   17.2389  173492.0
 2005-01-03 09:31:00   17.2389   17.2389   17.2246   17.2246    9646.0
 2005-01-03 09:32:00   17.2246   17.2389   17.2246   17.2318   34810.0
 2005-01-03 09:33:00   17.2318   17.2389   17.2318   17.2389   11464.0
 2005-01-03 09:34:00   17.2389   17.2389   17.2318   17.2318   12302.0
 ...                       ...       ...       ...       ...       ...
 2023-10-05 15:56:00  110.3200  110.3400  110.2800  110.3200    8780.0
 2023-10-05 15:57:00  110.3100  110.4350  110.3100  110.3800   13658.0
 2023-10-05 15:58:00  110.3750  110.4200  110.3650  110.4200   12518.0
 2023-10-05 15:59:00  110.4200  110.4200  110.3100  110.3200   30497.0
 2023-10-05 16:01:00  110.3500  110.3500  110.3500  110.3500  195339.0
 
 [1835698 rows x 5 columns],
 None)

In [9]:
print(sample_data) # To visually compare with original data

                           0         1         2         3         4         5
0        2005-01-03 09:30:00   17.2389   17.2389   17.2318   17.2389  173492.0
1        2005-01-03 09:31:00   17.2389   17.2389   17.2246   17.2246    9646.0
2        2005-01-03 09:32:00   17.2246   17.2389   17.2246   17.2318   34810.0
3        2005-01-03 09:33:00   17.2318   17.2389   17.2318   17.2389   11464.0
4        2005-01-03 09:34:00   17.2389   17.2389   17.2318   17.2318   12302.0
...                      ...       ...       ...       ...       ...       ...
1835693  2023-10-05 15:56:00  110.3200  110.3400  110.2800  110.3200    8780.0
1835694  2023-10-05 15:57:00  110.3100  110.4350  110.3100  110.3800   13658.0
1835695  2023-10-05 15:58:00  110.3750  110.4200  110.3650  110.4200   12518.0
1835696  2023-10-05 15:59:00  110.4200  110.4200  110.3100  110.3200   30497.0
1835697  2023-10-05 16:01:00  110.3500  110.3500  110.3500  110.3500  195339.0

[1835698 rows x 6 columns]
