# Data Processing Pipeline

## Overview

Data processing pipeline that processes the cleaned tick data with:

1. **Volume Bars**: Ticker-specific bars targeting ~600 bars/day
2. **Feature Engineering**: 9 features avoiding look-ahead bias
3. **Triple Barrier Labels**: Volatility-scaled barriers (k=1.0, 20-min time horizon)
4. **Purged K-Fold CV**: Day-level LODO with 20-min embargo


In [1]:
import os
import sys

# Set working directory to project root
os.chdir('..')
print(f"Working directory: {os.getcwd()}")


Working directory: c:\Users\doqui\OneDrive\Documents\cs4641-131-project


## 1. Generate Configuration

Computes ticker-specific volume thresholds and locks all pipeline parameters.

**Formula**: `median_daily_volume / 600`, clamped to [2,000, 100,000] shares/bar

**Output**: `config/processing.yaml` with MD5 hash for reproducibility


In [2]:
!python src/dataProcessing/config_processing.py


Processing Configuration Generation

Computing volume thresholds...

Volume thresholds computed for 20 tickers:
  CCL   :  4,600 shares/bar
  CNC   :  2,200 shares/bar
  DAL   :  2,000 shares/bar
  DECK  :  2,000 shares/bar
  DLTR  :  2,000 shares/bar
  DOW   :  2,000 shares/bar
  DXCM  :  2,000 shares/bar
  EQT   :  2,000 shares/bar
  FCX   :  4,600 shares/bar
  INTC  : 20,500 shares/bar
  KVUE  :  7,500 shares/bar
  MCHP  :  2,000 shares/bar
  NCLH  :  2,300 shares/bar
  ON    :  2,000 shares/bar
  PCG   :  8,700 shares/bar
  SLB   :  2,200 shares/bar
  SMCI  :  6,800 shares/bar
  TTD   :  2,400 shares/bar
  UBER  :  2,000 shares/bar
  XYZ   :  2,000 shares/bar

Generating configuration...

Configuration saved to config/processing.yaml
Config hash: cfa7ade774a88fd3bac9fb4552b92c55


2025-10-23 00:03:17,274 [INFO] Computing volume thresholds from cleaned data...
2025-10-23 00:03:17,316 [INFO] CCL: median_daily_vol=2,740,152, threshold=4,600
2025-10-23 00:03:17,346 [INFO] CNC: median_daily_vol=1,346,947, threshold=2,200
2025-10-23 00:03:17,378 [INFO] DAL: median_daily_vol=830,936, threshold=2,000
2025-10-23 00:03:17,409 [INFO] DECK: median_daily_vol=198,671, threshold=2,000
2025-10-23 00:03:17,436 [INFO] DLTR: median_daily_vol=421,256, threshold=2,000
2025-10-23 00:03:17,461 [INFO] DXCM: median_daily_vol=360,965, threshold=2,000
2025-10-23 00:03:17,496 [INFO] DOW: median_daily_vol=1,032,204, threshold=2,000
2025-10-23 00:03:17,526 [INFO] EQT: median_daily_vol=1,037,487, threshold=2,000
2025-10-23 00:03:17,567 [INFO] FCX: median_daily_vol=2,782,226, threshold=4,600
2025-10-23 00:03:17,610 [INFO] KVUE: median_daily_vol=4,509,025, threshold=7,500
2025-10-23 00:03:17,667 [INFO] INTC: median_daily_vol=12,311,613, threshold=20,500
2025-10-23 00:03:17,699 [INFO] MCHP: medi

## 2. Volume Bars (Test)

Constructs volume bars from tick data. Accumulates trades until volume threshold is reached, forming bars with OHLCV, VWAP, and trade counts.

**Tests on**: CCL 10-2-25 (~13K trades → ~537 bars)

**Validates**: OHLC consistency, bar statistics, duration distribution


In [3]:
!python src/dataProcessing/volume_bars.py


Volume Bar Construction Test

Loaded config (hash: cfa7ade774a88fd3bac9fb4552b92c55)

Testing on: data/clean/CCL/CCL_10-2-25_clean.parquet
Volume threshold for CCL: 4,600 shares/bar
Loaded 13,143 trades

Built 537 volume bars

Validation: PASSED
  total_bars: 537
  invalid_ohlc: 0
  negative_volume: 0
  zero_trades: 0
  time_gaps: 0

Bar Statistics:
  total_bars: 537
  avg_volume: 5153.13
  median_volume: 4714.00
  avg_trade_count: 24.47
  avg_bar_duration_seconds: 38.82
  median_bar_duration_seconds: 29.00

First 5 bars:
                          t                     t_end  ...  ticker  duration_seconds
0 2025-10-02 13:35:03+00:00 2025-10-02 13:35:04+00:00  ...     CCL               1.0
1 2025-10-02 13:35:04+00:00 2025-10-02 13:35:24+00:00  ...     CCL              20.0
2 2025-10-02 13:35:24+00:00 2025-10-02 13:35:31+00:00  ...     CCL               7.0
3 2025-10-02 13:35:31+00:00 2025-10-02 13:35:42+00:00  ...     CCL              11.0
4 2025-10-02 13:35:42+00:00 2025-10-02 13:36:04

## 3. Feature Engineering (Test)

Engineers 9 features from volume bars. avoids look-ahead bias.
- VWAP distance (z-score)
- Bollinger position (20-bar, 2σ)
- Momentum (3-bar, 5-bar returns)
- Relative volume
- Time of day
- 5-minute context (bar count, avg volume, price range)

**All rolling stats use `.shift(1)` to prevent look-ahead**


In [4]:
!python src/dataProcessing/feature_engineering.py


Feature Engineering Test

Loading trades from: data/clean/CCL/CCL_10-2-25_clean.parquet
Building volume bars...
Built 537 bars

Engineering features...

Validating features...

Feature Summary:
  Total features: 9
  Features with NaN: 2
  Features with inf: 0

Feature Ranges:
  feat_vwap_zscore:
    Range: [-5.9193, 6.7689]
    Mean: 0.0485, Std: 1.0780
  feat_bollinger_position:
    Range: [-3.0000, 2.1663]
    Mean: 0.0764, Std: 0.7144
  feat_momentum_3bar:
    Range: [-0.0047, 0.0056]
    Mean: 0.0001, Std: 0.0013
  feat_momentum_5bar:
    Range: [-0.0062, 0.0066]
    Mean: 0.0001, Std: 0.0016
  feat_relative_volume:
    Range: [0.3298, 11.5952]
    Mean: 1.0105, Std: 0.6284
  feat_time_of_day:
    Range: [0.0001, 0.9994]
    Mean: 0.4480, Std: 0.3333
  feat_context_bar_count:
    Range: [0.0000, 25.0000]
    Mean: 8.9572, Std: 5.5882
  feat_context_avg_volume:
    Range: [4628.5000, 13962.5000]
    Mean: 5172.7549, Std: 1196.2134
  feat_context_price_range:
    Range: [0.0000, 0.01

2025-10-23 00:03:32,413 [INFO] Engineering features for 537 bars...
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
2025-10-23 00:03:34,024 [INFO] Engineered 9 features


## 4. Triple Barrier Labeling (Test)

**Barriers**:
- Upper: `p0 × (1 + k × ewm_std)` where k=1.0
- Lower: `p0 × (1 - k × ewm_std)`
- Time: 20 minutes

**Volatility**: `ewm_std = returns.ewm(halflife=50).std().shift(1)`

**Labels**: +1 (upper hit), -1 (lower hit), 0 (time out)


In [6]:
!python src/dataProcessing/labeling.py


Triple Barrier Labeling Test

Loading trades from: data/clean/CCL/CCL_10-2-25_clean.parquet
Building volume bars...
Built 537 bars

Applying triple barrier labeling...
  k = 1.0
  time_barrier = 20 minutes
  ewm_halflife = 50 bars

Validating labels...

Validation Summary:
  Total bars: 537
  Labeled bars: 535
  Unlabeled bars: 2

  Label distribution:
    -1:  233 ( 43.4%)
    +0:    2 (  0.4%)
    +1:  302 ( 56.2%)

  Holding period statistics:
    mean: 74.79 seconds
    median: 33.00 seconds
    min: 0.00 seconds
    max: 709.00 seconds
    std: 106.15 seconds

Sample labeled bars:
                          t  ...  label_holding_period_seconds
0 2025-10-02 13:35:03+00:00  ...                          20.0
1 2025-10-02 13:35:04+00:00  ...                           0.0
2 2025-10-02 13:35:24+00:00  ...                           0.0
3 2025-10-02 13:35:31+00:00  ...                           0.0
4 2025-10-02 13:35:42+00:00  ...                          22.0
5 2025-10-02 13:36:05+00:00  

2025-10-23 00:05:44,277 [INFO] Applying triple barrier labeling (k=1.0, time_barrier=20min)...
  ewm_std = ewm_std.fillna(method='bfill')
  df.loc[i, 'label_t_end'] = bar_time
2025-10-23 00:05:44,741 [INFO] Labeling complete:
2025-10-23 00:05:44,741 [INFO]   Total bars: 537
2025-10-23 00:05:44,741 [INFO]   Labeled: 535
2025-10-23 00:05:44,741 [INFO]   Label distribution:
2025-10-23 00:05:44,741 [INFO]     upper: 302 (56.2%)
2025-10-23 00:05:44,741 [INFO]     lower: 233 (43.4%)
2025-10-23 00:05:44,741 [INFO]     : 2 (0.4%)
2025-10-23 00:05:44,741 [INFO]   Avg holding period (label=-1): 73.5s
2025-10-23 00:05:44,743 [INFO]   Avg holding period (label=+0): 0.0s
2025-10-23 00:05:44,743 [INFO]   Avg holding period (label=+1): 75.8s


## 5. Cross-Validation Setup (Test)

Creates day-level LODO (Leave-One-Day-Out) CV splits with purging and embargo:

**Strategy**: 5 folds (train on 4 days, validate on 1 day)

**Purge rule**: Remove training samples where `[t, t_end]` overlaps validation period

**Embargo**: Remove training samples within 20 minutes after validation period

**Validates**: No data leakage between folds

**Tests on**: 3 days of CCL data


In [7]:
!python src/dataProcessing/cross_validation.py


Cross-Validation Split Test

Loading: data/clean/CCL/CCL_10-2-25_clean.parquet

Loading: data/clean/CCL/CCL_10-3-25_clean.parquet

Loading: data/clean/CCL/CCL_10-6-25_clean.parquet

Total bars: 1680
Days: 3

--------------------------------------------------------------------------------

--------------------------------------------------------------------------------

--------------------------------------------------------------------------------
CV Fold Metadata:
   fold   test_day  n_train  n_val  n_purged  n_embargoed                 val_start                   val_end            embargo_cutoff
0     0 2025-10-02     1143    537         0            0 2025-10-02 13:35:03+00:00 2025-10-02 19:33:41+00:00 2025-10-02 19:53:41+00:00
1     1 2025-10-03     1129    551         0            0 2025-10-03 13:35:01+00:00 2025-10-03 19:34:50+00:00 2025-10-03 19:54:50+00:00
2     2 2025-10-06     1088    592         0            0 2025-10-06 13:35:05+00:00 2025-10-06 19:34:26+00:00 2025-10-06 

2025-10-23 00:06:04,799 [INFO] Built 537 bars across 1 days
2025-10-23 00:06:04,809 [INFO] Applying triple barrier labeling (k=1.0, time_barrier=20min)...
  ewm_std = ewm_std.fillna(method='bfill')
  df.loc[i, 'label_t_end'] = bar_time
2025-10-23 00:06:05,110 [INFO] Labeling complete:
2025-10-23 00:06:05,110 [INFO]   Total bars: 537
2025-10-23 00:06:05,110 [INFO]   Labeled: 535
2025-10-23 00:06:05,110 [INFO]   Label distribution:
2025-10-23 00:06:05,110 [INFO]     upper: 302 (56.2%)
2025-10-23 00:06:05,110 [INFO]     lower: 233 (43.4%)
2025-10-23 00:06:05,110 [INFO]     : 2 (0.4%)
2025-10-23 00:06:05,110 [INFO]   Avg holding period (label=-1): 73.5s
2025-10-23 00:06:05,110 [INFO]   Avg holding period (label=+0): 0.0s
2025-10-23 00:06:05,110 [INFO]   Avg holding period (label=+1): 75.8s
2025-10-23 00:06:05,830 [INFO] Built 551 bars across 1 days
2025-10-23 00:06:05,831 [INFO] Applying triple barrier labeling (k=1.0, time_barrier=20min)...
  ewm_std = ewm_std.fillna(method='bfill')
  df.

## 6. Pipeline Execution


In [9]:
!python src/dataProcessing/process_pipeline.py


2025-10-23 00:11:06,183 [INFO] Data Processing Pipeline
2025-10-23 00:11:06,187 [INFO] Found 100 files to process
2025-10-23 00:11:06,187 [INFO] Processing with 7 parallel workers...
2025-10-23 00:12:32,839 [INFO] 
Creating CV split metadata...
2025-10-23 00:12:34,860 [INFO] Loaded 46855 total bars from 100 files
2025-10-23 00:12:34,860 [INFO] Creating purged k-fold CV splits (day-level LODO)...
2025-10-23 00:12:34,874 [INFO] Found 5 unique days: ['2025-10-02', '2025-10-03', '2025-10-06', '2025-10-07', '2025-10-08']
2025-10-23 00:12:34,874 [INFO] 
Fold 1/5: Test day = 2025-10-02
2025-10-23 00:12:34,874 [INFO]   Initial: 36709 train, 10146 val
2025-10-23 00:12:34,874 [INFO]   Validation period: 2025-10-02 13:35:00+00:00 to 2025-10-02 19:34:59+00:00
2025-10-23 00:12:35,636 [INFO]   Purged: 0 samples
2025-10-23 00:12:36,131 [INFO]   Embargoed: 0 samples
2025-10-23 00:12:36,131 [INFO]   Final: 36709 train, 10146 val
2025-10-23 00:12:36,131 [INFO] 
Fold 2/5: Test day = 2025-10-03
2025-10-23

  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
  df.loc[i, 'feat_context_avg_volume'] = context_bars['volume'].mean()
Features with NaN values: {'feat_momentum_3bar': np.int64(3), 'feat_momentum_5bar': np.int64(5)}
  ewm_std = ewm_std.fillna(method='bfill')
  df.loc[i, 'label_t_end'] = bar_time
Features with NaN values: {'feat_momentum_3bar': np.int64(3), 'feat_momentum_5bar': np.int64(5)}
  ewm_std = ewm_std.fillna(method='bfill')
  df.loc[i, 'label_t_end'] = bar_time
Features with NaN values: {'feat_momentum_3bar': np.int64(3), 'feat_momentum_5bar': np.int64(5)}
  ewm_std = ewm_std.fillna(method='bfill')
  df.lo