## 📘 VZC notebook sample project

**Author**: Vijay Chidambaram ( chidamv@gmail.com) 

**Date**: April 28, 2025

# Problem statement

1. You are asked to create an ETL process to inject a big stock pricing data into a database with limited a      resource(in terms of memory and CPU).
 The data is in 10 csv files of total size  100GB in random order,  and your ETL process will be run in a       server with memory 32G and 4CPUs, and free disk space 100GB.
 The data structure of the data is in the following format: date:: datetime(%Y-%m-%d), id::int, price::float,   trade_volume::int. 
  The price values will be saved into a price table which has the following structure: date, stk_001,stk_002,    stk_003,…,stk_200. 

    Assume there are 200 stocks in the data and the ids for the stocks are from 1 to 200. And save the             trade_volume to a volume table with the following column structure: date, stk_001, stk_002, …, stk_200. 

    Find a most efficient solution in terms of run time and resources.
   
2. Calculate stock returns using above data.

 

***Additional notes:* **

* We are evaluating problem solving skills and python + database skills(must have). Please demonstrate your skills.
* The stock data contains daily closing price and trade volume, i.e., on each date, there is only one closing price and trade volume for a given stock. Any file of 10 csv files can contain a particular stock on a particular date, and the stock and date can be in random order in the file. There will be no duplicate data based on date and stock id across the 10 files.
* Yes, you can create temp tables, but there is a limit of storage in temp space. Here we assume 20G temp storage. You can’t create permanent tables.
* We are looking for a most efficient solution(a subjective view). Please explain why your solution is most efficient. 



## 📚 Table of Contents

1. Overview
2. Objective
3. Design Overview
4. Setup and imports
5. Read and Process Functions
6. Validate Folders and Files
7. Merge Parquets
8. Calculate Returns
9. Cleanup
10. Create mock CSV
11. Main etl
12. Section by Section explations
13. Why this is Efficient
14. Conclusion
15. Sample dataset and output




In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Overview

This notebook demonstrates an efficient ETL process  


    for handling 100GB of stock price and volume data 

    across 10 CSV files 
    
    using limited resources (32GB RAM, 4 CPUs, and 100GB disk).

Requirements:
- Use only temporary storage (limit 20GB)
- No permanent tables allowed
- Must calculate daily stock returns
- Target schema: pivot data into `price_table` and `volume_table`



# Objective:

We are given 100GB of CSV files (10 in total), each containing stock price and volume data in the format:

date:: datetime(%Y-%m-%d)
id::int
price::float
trade_volume::int


Each day has one entry per stock (IDs 1–200). The task is to:

1. Efficiently load and transform this data on a resource-constrained server (32GB RAM, 4 CPUs, 100GB disk, 20GB temp).
2. Output two wide tables (one for prices, one for volumes) with the structure:
   
    *date, stk_001, stk_002, ..., stk_200*
4. Calculate daily stock returns. 
5. Work within constraints: no permanent tables, limited temp space.
6. *Error handling, retries, logging, validation included*

 # Design Overview:

💡 Transform into Wide tables with 1 row per date, and 200 columns ( stk_001 to stk_200)

Save
 * price.parquet
 * volume.parquet
 * returns.parquet ( % change of price )

 * Constraints
 * 1. Limited memory ( 32GB)
   2. CPUs (4)
   3. Temp Storage ( 20GB)
   4. Total disk space (100GB)
   5. No permanent Database tables



 # Why Dask ?


*  Out-of-core processing: Dask reads and processes data in chunks that fit into memory.
*  Parallel computing: Utilizes all 4 CPUs efficiently.
*  Disk-efficient formats: Writes intermediate and final results as compressed Parquet.
*  Familiar syntax: Dask mimics Pandas, making the code readable and easy to maintain.

 

# 📦 Setup and Imports

Dask for distributed and memory-efficient processing

Pandas used only for light operations (e.g., reindexing)

NumPy for numerical efficiency

In [None]:
#Install Dask if needed
!pip install dask[complete] --quiet

In [None]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import logging
import sys
import os
import time
import glob
from dask.diagnostics import ProgressBar


# Setup Logging and directories

In [None]:
logging.basicConfig( level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s")

In [None]:
PRICE_TMP_DIR = "tmp_price"
VOLUME_TMP_DIR = "tmp_volume"
FINAL_PRICE_PATH = "final_price.parquet"
FINAL_VOLUME_PATH = "final_volume.parquet"
RETURNS_PATH = "returns.parquet"
STOCK_COLS = [f"stk_{i:03d}" for i in range(1, 201)]
MAX_RETRIES = 3

In [None]:
os.makedirs(PRICE_TMP_DIR, exist_ok=True)
os.makedirs(VOLUME_TMP_DIR, exist_ok=True)

# Read and Process Functions

In [None]:
def read_csv_with_retry(filepath):
    """Retry logic is here for reading large CSVs."""
    for attempt in range(1, MAX_RETRIES+1):
        try:
            df = dd.read_csv(filepath, assume_missing=True, dtype={
                "id": "int32",
                "price": "float32",
                "trade_volume": "int64"
            })
            logging.info(f"Successfully read {filepath}")
            return df
        except Exception as e:
            logging.error(f" Error reading {filepath}: {e}, Attempt {attempt}")
            time.sleep(2)
    raise Exception(f"Failed to read {filepath} after {MAX_RETRIES} retries")

In [None]:
def process_csv(df, chunk_id, file):
    """Process a CSV file: pivot and save price/volume separately."""
    try:
        # Convert 'stk_id' to category dtype
        df.columns = df.columns.str.strip()

        if 'stk_id' not in df.columns:
            logging.warning(f"Skipping file due to missing 'stk_id' column: {file}")
            return # Skip this file   if 'stk_id' is missing
            
        df['stk_id'] = df['stk_id'].astype('category')
        
        # Use map_partitions to convert 'date' to datetime in each partition
        df['date'] = df.map_partitions(lambda df: pd.to_datetime(df['date'], format='%Y-%m-%d'))
     
        # Pivot
        price_df = df.pivot_table(index='date', columns='stk_id', values='price', aggfunc='last')
        volume_df = df.pivot_table(index='date', columns='stk_id', values='trade_volume', aggfunc='last')

        # Making sure column names follow the format 'stk_001', 'stk_002', etc
        price_df.columns = [f"stk_{str(col).zfill(3)}" for col in price_df.columns]
        volume_df.columns = [f"stk_{str(col).zfill(3)}" for col in volume_df.columns]
        
        # Ensure all stocks present
        price_df = price_df.reindex(columns=STOCK_COLS)
        volume_df = volume_df.reindex(columns=STOCK_COLS)

         # Ensure output folders exist
        os.makedirs(PRICE_TMP_DIR, exist_ok=True)
        os.makedirs(VOLUME_TMP_DIR, exist_ok=True)

        # Save to parquet
        price_output_file = os.path.join(PRICE_TMP_DIR, f"price_{chunk_id:03d}.parquet")
        volume_output_file = os.path.join(VOLUME_TMP_DIR, f"volume_{chunk_id:03d}.parquet")

        price_df.to_parquet(price_output_file)
        volume_df.to_parquet(volume_output_file)

        logging.info(f" Processed and saved chunk {chunk_id}")
    except Exception as e:
        logging.error(f" Error processing chunk {chunk_id}: {e}")
        raise 

# Validate Parquet Folder

In [None]:
def validate_parquet_folder(folder):
    """Ensure temp folder has parquet files."""
    
    if not os.path.exists(folder):
        raise RuntimeError(f" Folder {folder} does not exist.")
        
    files = glob.glob(f"{folder}/*.parquet")

    if not files:
        logging.warning(f" No parquet files found in {folder}. Files in folder: {os.listdir(folder)}. Check if CSVs were loaded properly.")
    else:
        logging.info(f" Validation passed: {len(files)} files found in {folder}")
   

# Merge parquets into single file

In [None]:
def combine_and_save(tmp_dir, output_path):
    """Combine all parquets into a single file."""
    try:
        if not glob.glob(f"{tmp_dir}/*.parquet"):
            logging.warning(f" No parquet files found in {tmp_dir}. Skipping combine.")
            return

        df = dd.read_parquet(tmp_dir)
        df = df.groupby(df.index).last().compute()
        df = df.sort_index()
        df.to_parquet(output_path, index=True)
        logging.info(f" Combined and saved to {output_path}")
    except Exception as e:
        logging.error(f" Failed to combine parquet files: {e}")
        raise e


# Calculate Returns

In [None]:
def calculate_returns(price_file, output_file):
    """Calculate stock returns."""
    try:
        if not os.path.exists(price_file):
            logging.warning(f" Skipping returns calculation: {price_file} does not exist.")
            return

        prices = dd.read_parquet(price_file)
        returns = prices.pct_change().dropna()
        returns.compute().to_parquet(output_file, compression='snappy')
        logging.info(f"  Returns calculated and saved to {output_file}")
    except Exception as e:
        logging.error(f"  Failed to calculate returns: {e}")
        raise e

# Cleanup

In [None]:
def cleanup():
    """Remove temp files."""
    import shutil
    shutil.rmtree(PRICE_TMP_DIR, ignore_errors=True)
    shutil.rmtree(VOLUME_TMP_DIR, ignore_errors=True)
    logging.info("  Cleaned up temp folders")


# Create Mock CSV

In [None]:
def create_mock_csv():
    """Create a sample CSV if no data exists."""
    np.random.seed(42)
    dates = pd.date_range(start="2022-01-01", periods=5)
    ids = np.arange(1, 6)  # 5 stocks
    rows = []
    for date in dates:
        for stock_id in ids:
            rows.append({
                "date": date.strftime('%Y-%m-%d'),
                "id": stock_id,
                "price": np.round(np.random.uniform(100, 200), 2),
                "trade_volume": np.random.randint(1000, 10000)
            })
    df_sample = pd.DataFrame(rows)
    df_sample.to_csv("mock_data.csv", index=False)
    print(" Mock CSV data generated.")

# Generate mock CSV if none exists
if not glob.glob("*.csv"):
    create_mock_csv()


# Main ETL Driver

In [None]:
# Main ETL
def run_etl():
    """Main ETL runner."""
    try:
        logging.info(" ETL Process Started")
        csv_files = glob.glob("*.csv")
        if not glob.glob("*.csv"):
           create_mock_csv()
        if not csv_files:
           print(" No CSV files found to process. Please upload CSV files first.")
           return  # <-- Don't crash. Just return cleanly.
        chunk_id = 0
        for file in sorted(glob.glob("*.csv")):
            logging.info(f" Processing file: {file}")
            df = read_csv_with_retry(file)
            process_csv(df, chunk_id, file)
            chunk_id += 1

        validate_parquet_folder(PRICE_TMP_DIR)
        validate_parquet_folder(VOLUME_TMP_DIR)

        combine_and_save(PRICE_TMP_DIR, FINAL_PRICE_PATH)
        combine_and_save(VOLUME_TMP_DIR, FINAL_VOLUME_PATH)

        if os.path.exists(FINAL_PRICE_PATH):
           calculate_returns(FINAL_PRICE_PATH, RETURNS_PATH)
        else:
           logging.warning(f" Skipping returns: {FINAL_PRICE_PATH} not found.")
      

        cleanup()

        logging.info(" ETL Process Completed Successfully")
    except Exception as e:
        logging.error(f" ETL Process Failed: {e}")
        raise e

In [None]:
if __name__ == "__main__":
    run_etl()

# 📖Section-by-Section Explanation:


|Ref No| Section         | Purpose                                 | Why Efficient                          |
|------|-----------|-----------------------------------------|---------------------------------------------|   
|1 |Imports and Setup| Load required modules, create folders   | Organized imports, modular folders          |
|2 |Logging:         | Track every operation , exception       | Crucial for oncall support                  |
|3 |CSV Read with Retry | Auto-retry if initial read fails     |  Prevents simple failures from breaking ETL |
|4 |Process CSV         | Pivot to wide format, save each chunk | Memory Sace with Dask                      |
|5 |Validation          | Ensure temp output exist before combining | Early error detection                  |
|6 |Combine Parquet     | Merge Chunks efficiently, save final tables | Optimized compute step               |
|7 |Calculate Returns   | Compute pct_change, drop NaNs               | Fast returns computation             |
|8 | Cleanup             | Delete temp folders post-processing         | Save 100GB Storage space             |
|9 |Run ETL             | Full Pipelin wrapped in try-except          | Crash safe and clear failures        |

# ⚙️ Why this is Efficient

| Ref No | Config Item                           | How it is handled                    |
|--------|---------------------------------------|--------------------------------------|
|1.      | Limits Memory Usage                   | Data is processed chunked reading    |
|2.      | Saves Disk and temp space             | Parque compression is used           |
|3.      |Utilizes all Cpus with Dask scheduler  | Parallel Processing                  |
|4.      |Data Consistency without overhead      | Pivot and Reindex                    |
|5.      |Loads Data only when needed            | Lazy evaluation                      |
|6.      |Reduces temp memory usage ( 20Gb limit)| Disk Writes                          |
|7.      |All saved as files                     | No permanent tables                  |      


# Conclusion

**This solution demonstrates:**

  
1. Strong Python + SQL Skills
2. Scalable ETL Design
3. Efficient Use of Limited Resources
4. Clarity, Maintainability, and Extensibility



# Sample Input Dataset ( CSV)
| date       | id | price | trade_volume |
|------------|----|-------|--------------|
| 2024-01-01 | 1  | 150.5 | 1000         |
| 2024-01-01 | 2  | 250.7 | 2000         |
| 2024-01-01 | 3  | 300.2 | 1500         |
| 2024-01-02 | 1  | 151.2 | 1100         |
| 2024-01-02 | 2  | 249.5 | 2100         |
| 2024-01-02 | 3  | 305.0 | 1550         |



# Transfromed Price Table ( pivoted output)

|date       | stk_001   | stk_002    | stk_003 |
|-----------|-----------|------------|---------|
|2024-01-01 | 150.5     | 250.7      | 300.2   |
|2024-01-02 | 151.2     | 249.5      | 305.0   |


# Transformed Volume Table ( pivoted output)

|date       | stk_001   | stk_002   | stk_003 |
|-----------|-----------|-----------|---------|
|2024-01-01 | 1000      | 2000      | 1500    |
|2024-01-02 | 1100      | 2100      | 1550    |

# 📈 Calculated Returns Table 

***( Return  = ( Today's price / Yesterdays's Price) -1 )***



|date         | stk_001      | stk_002     | stk_003  |
|-------------|--------------|-------------|----------|
|2024-01-02   | 0.004650     | -0.004782   | 0.015976 |