In [242]:
df = pd.read_csv("/workspaces/parameta-data-science-test/Parameta/stdev_test/results/rolling_std_results.csv")

In [243]:
results_df_checker = df[(df['security_id'] == 'id_44') & (df['snap_time'] >= '2021-11-20 00:00:00')].sort_values(by='snap_time')

In [245]:
results_df_checker.head(60)

Unnamed: 0,snap_time,security_id,bid_std,mid_std,ask_std
90683,2021-11-20 00:00:00,id_44,1.135613,1.135714,1.135816
90882,2021-11-20 01:00:00,id_44,1.076226,1.07634,1.076454
91081,2021-11-20 02:00:00,id_44,1.007814,1.007943,1.008073
91280,2021-11-20 03:00:00,id_44,0.9283843,0.9285327,0.9286823
91479,2021-11-20 04:00:00,id_44,0.8237706,0.8237002,0.8236308
91678,2021-11-20 05:00:00,id_44,0.8108593,0.8107387,0.8106192
91877,2021-11-20 06:00:00,id_44,0.815798,0.8156821,0.8155672
92076,2021-11-20 07:00:00,id_44,0.8177406,0.8176291,0.8175185
92275,2021-11-20 08:00:00,id_44,0.7455657,0.7453472,0.7451298
92474,2021-11-20 09:00:00,id_44,0.5505545,0.5507022,0.5508508


In [34]:
df = df[df['security_id'] == 'id_44'].sort_values(by='snap_time')

In [10]:
# Example: remove two rows for security_id 'ID_44'
mask = ~((df['security_id'] == 'id_44') & 
         (df['snap_time'].isin(pd.to_datetime(['2021-11-20 17:00:00', '2021-11-21 18:00:00']))))
df_missing = df[mask]

In [13]:
df_missing.to_parquet("/workspaces/parameta-data-science-test/Parameta/stdev_test/data/stdev_price_data_missing.parq.gzip", compression="gzip")

In [40]:
df_missing.shape

(97284, 5)

In [39]:
df.shape

(468, 5)

In [14]:
dummy_df = pd.read_parquet(r"/workspaces/parameta-data-science-test/Parameta/stdev_test/data/stdev_price_data_missing.parq.gzip")

In [16]:
dummy_df.shape

(97284, 5)

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import time
import argparse

class RollingStandardDeviationCalculator:
    def __init__(self, window_size: int = 20):
        self.window_size = window_size
        self.price_columns = ['bid', 'mid', 'ask']

    def load_data(self, file_path: Path) -> pd.DataFrame:
        """Load and prepare data"""
        df = pd.read_parquet(file_path, engine="pyarrow")
        df['snap_time'] = pd.to_datetime(df['snap_time'])
        return df.sort_values(['security_id', 'snap_time'])

    def _find_contiguous_windows(self, values, valid_mask):
        """
        Pre-compute all valid contiguous windows of size >= window_size
        Returns dict mapping end_index -> (start_index, end_index, precomputed_std)
        """
        windows = {}
        n = len(valid_mask)
        
        # Find all contiguous sequences
        i = 0
        while i < n:
            if not valid_mask[i]:
                i += 1
                continue
            
            # Found start of contiguous sequence
            start = i
            while i < n and valid_mask[i]:
                i += 1
            end = i - 1
            
            # If sequence is long enough, compute rolling std for all positions
            seq_length = end - start + 1
            if seq_length >= self.window_size:
                # Extract the sequence
                seq_values = values[start:end+1]
                
                # Compute rolling std using pandas (vectorized)
                seq_df = pd.DataFrame(seq_values, columns=self.price_columns)
                rolling_std = seq_df.rolling(self.window_size).std(ddof=1).values
                
                # Store each valid window
                for j in range(self.window_size - 1, seq_length):
                    actual_idx = start + j
                    windows[actual_idx] = rolling_std[j]
        
        return windows

    def _get_most_recent_std(self, windows, current_idx, sorted_keys=None):
        """Optimized: use sorted keys + binary search"""
        if not windows:
            return np.full(len(self.price_columns), np.nan)
        
        if sorted_keys is None:
            sorted_keys = sorted(windows.keys())
        
        # Find rightmost key <= current_idx using binary search
        import bisect
        pos = bisect.bisect_right(sorted_keys, current_idx)
        if pos == 0:
            return np.full(len(self.price_columns), np.nan)
        
        best_idx = sorted_keys[pos - 1]
        return windows[best_idx]

    def calculate_rolling_std(self, df: pd.DataFrame, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        """Calculate rolling standard deviations with optimized lookback"""
        if start_time:
            start_dt = pd.to_datetime(start_time)
        else:
            start_dt = df['snap_time'].min()

        if end_time:
            end_dt = pd.to_datetime(end_time)
        else:
            end_dt = df['snap_time'].max()

        # Generate complete time grid
        all_snaps = pd.date_range(start=start_dt, end=end_dt, freq='h')
        
        # Pre-group data by security for efficiency
        grouped = df.groupby('security_id')
        
        # Process all securities at once using vectorized operations
        result_data = []
        
        for sec_id, sec_df in grouped:
            # Create complete time series including historical data for lookback
            sec_df = sec_df.set_index('snap_time').sort_index()
            
            # Reindex to hourly grid (this will add NaN for missing snaps)
            full_series = sec_df.reindex(all_snaps)
            
            # Convert to numpy for faster processing
            values = full_series[self.price_columns].values
            valid_mask = ~np.isnan(values).any(axis=1)
            
            # Pre-compute all valid rolling std windows
            windows = self._find_contiguous_windows(values, valid_mask)
            
            # Pre-sort keys once for binary search optimization
            sorted_keys = sorted(windows.keys()) if windows else []
            
            # Vectorized lookup for all snap times
            result_stds = np.full((len(all_snaps), len(self.price_columns)), np.nan)
            
            for i in range(len(all_snaps)):
                result_stds[i] = self._get_most_recent_std(windows, i, sorted_keys)
            
            # Create result DataFrame directly (avoid numpy dtype mixing)
            sec_result_data = {
                'snap_time': all_snaps,
                'security_id': sec_id
            }
            
            # Add std columns
            for j, col in enumerate(self.price_columns):
                sec_result_data[f"{col}_std"] = result_stds[:, j]
            
            sec_result = pd.DataFrame(sec_result_data)
            result_data.append(sec_result)
        
        if result_data:
            # Combine all securities using pandas concat
            results = pd.concat(result_data, ignore_index=True)
            
            # Sort final results
            results = results.sort_values(['snap_time', 'security_id']).reset_index(drop=True)
        else:
            # Empty result
            columns = ['snap_time', 'security_id'] + [f"{c}_std" for c in self.price_columns]
            results = pd.DataFrame(columns=columns)

        return results

    def save_results(self, results_df: pd.DataFrame, output_path: Path):
        """Save results to CSV file"""
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        results_df_output = results_df.copy()
        results_df_output['snap_time'] = results_df_output['snap_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
        results_df_output.to_csv(output_path, index=False)
        print(f"Results saved to: {output_path}")

    def process_file(self, input_file: Path, output_file: Path, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        """Full processing pipeline"""
        start_processing = time.time()
        
        print(f"Loading data from: {input_file}")
        df = self.load_data(input_file)
        print(f"Loaded {len(df):,} rows")
        
        print("Calculating rolling standard deviations...")
        results = self.calculate_rolling_std(df, start_time, end_time)
        print(f"Generated {len(results):,} result rows")
        
        self.save_results(results, output_file)
        
        processing_time = time.time() - start_processing
        print(f"Total processing time: {processing_time:.2f} seconds")
        return results


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Calculate rolling standard deviation with lookback")
    
    default_input = Path(__file__).parent.parent / "data" / "stdev_price_data.parq"
    default_output = Path(__file__).parent.parent / "results" / "rolling_std_results.csv"

    parser.add_argument("--input", type=str, default=default_input, 
                       help="Path to input parquet file")
    parser.add_argument("--output", type=str, default=default_output, 
                       help="Path to output CSV file")
    parser.add_argument("--start_time", type=str, 
                       help="Start time (YYYY-MM-DD HH:MM:SS)")
    parser.add_argument("--end_time", type=str, 
                       help="End time (YYYY-MM-DD HH:MM:SS)")

    args = parser.parse_args()

    calculator = RollingStandardDeviationCalculator(window_size=20)
    calculator.process_file(
        input_file=Path(args.input),
        output_file=Path(args.output),
        start_time=args.start_time,
        end_time=args.end_time
    )

In [55]:
#working code

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import time
import argparse

class RollingStandardDeviationCalculator:
    def __init__(self, window_size: int = 20):
        self.window_size = window_size
        self.price_columns = ['bid', 'mid', 'ask']

    def load_data(self, file_path: Path) -> pd.DataFrame:
        df = pd.read_parquet(file_path, engine="pyarrow")
        df['snap_time'] = pd.to_datetime(df['snap_time'])
        return df.sort_values(['security_id', 'snap_time'])

    def calculate_rolling_std(self, df: pd.DataFrame, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        if start_time:
            start_dt = pd.to_datetime(start_time)
        else:
            start_dt = df['snap_time'].min()
        if end_time:
            end_dt = pd.to_datetime(end_time)
        else:
            end_dt = df['snap_time'].max()

        # Generate output hourly range
        output_snaps = pd.date_range(start=start_dt, end=end_dt, freq='h')
        grouped = df.groupby('security_id')
        result_data = []

        for sec_id, sec_df in grouped:
            sec_df = sec_df.set_index('snap_time').sort_index()
            # Reindex only once from earliest data to end_dt
            full_range = pd.date_range(start=sec_df.index.min(), end=end_dt, freq='h')
            full_df = sec_df.reindex(full_range)
            values = full_df[self.price_columns].values  # shape (n_timestamps, 3)

            # Mask for valid rows (no NaNs)
            valid_mask = ~np.isnan(values).any(axis=1)

            # Precompute rolling std only on valid rows using a sliding window
            n_rows = values.shape[0]
            rolling_std = np.full_like(values, np.nan, dtype=float)

            # Use a deque for a fast sliding window of 20 valid values
            from collections import deque
            window = deque(maxlen=self.window_size)

            for i in range(n_rows):
                if valid_mask[i]:
                    window.append(values[i])
                if len(window) == self.window_size:
                    rolling_std[i] = np.std(np.array(window), axis=0, ddof=1)

            # Map results to output snaps
            idx_map = full_range.get_indexer(output_snaps)
            output_std = rolling_std[idx_map]

            # Build result DataFrame
            sec_result = pd.DataFrame({
                'snap_time': output_snaps,
                'security_id': sec_id,
                'bid_std': output_std[:, 0],
                'mid_std': output_std[:, 1],
                'ask_std': output_std[:, 2]
            })

            result_data.append(sec_result)

        results = pd.concat(result_data, ignore_index=True)
        results.sort_values(['snap_time', 'security_id'], inplace=True)
        results.reset_index(drop=True, inplace=True)
        return results

    def save_results(self, results_df: pd.DataFrame, output_path: Path):
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        results_df_output = results_df.copy()
        results_df_output['snap_time'] = results_df_output['snap_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
        results_df_output.to_csv(output_path, index=False)
        print(f"Results saved to: {output_path}")

    def process_file(self, input_file: Path, output_file: Path, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        start_processing = time.time()
        df = self.load_data(input_file)
        results = self.calculate_rolling_std(df, start_time, end_time)
        self.save_results(results, output_file)
        print(f"Total processing time: {time.time() - start_processing:.2f} seconds")
        return results


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Calculate rolling standard deviation with lookback")
    default_input = Path(__file__).parent.parent / "data" / "stdev_price_data.parq"
    default_output = Path(__file__).parent.parent / "results" / "rolling_std_results.csv"

    parser.add_argument("--input", type=str, default=default_input)
    parser.add_argument("--output", type=str, default=default_output)
    parser.add_argument("--start_time", type=str)
    parser.add_argument("--end_time", type=str)

    args = parser.parse_args()

    calculator = RollingStandardDeviationCalculator(window_size=20)
    calculator.process_file(
        input_file=Path(args.input),
        output_file=Path(args.output),
        start_time=args.start_time,
        end_time=args.end_time
    )


In [100]:
#perfect version with starting trouble

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import time
import argparse

class RollingStandardDeviationCalculator:
    def __init__(self, window_size: int = 20):
        self.window_size = window_size
        self.price_columns = ['bid', 'mid', 'ask']

    def load_data(self, file_path: Path) -> pd.DataFrame:
        df = pd.read_parquet(file_path, engine="pyarrow")
        df['snap_time'] = pd.to_datetime(df['snap_time'])
        return df.sort_values(['security_id', 'snap_time'])

    def calculate_rolling_std(self, df: pd.DataFrame, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        if start_time:
            start_dt = pd.to_datetime(start_time)
        else:
            start_dt = df['snap_time'].min()
        if end_time:
            end_dt = pd.to_datetime(end_time)
        else:
            end_dt = df['snap_time'].max()

        # Generate output hourly range for the REQUESTED period
        output_snaps = pd.date_range(start=start_dt, end=end_dt, freq='h')
        
        # Get all unique security IDs in the dataset
        all_security_ids = df['security_id'].unique()
        
        # Create a complete cross-product of all security_ids and all output snap times
        result_data = []

        for sec_id in all_security_ids:
            sec_df = df[df['security_id'] == sec_id].copy()
            
            if len(sec_df) == 0:
                # If no data for this security, create all NaN results
                sec_result = pd.DataFrame({
                    'snap_time': output_snaps,
                    'security_id': sec_id,
                    'bid_std': np.nan,
                    'mid_std': np.nan,
                    'ask_std': np.nan
                })
                result_data.append(sec_result)
                continue
            
            sec_df = sec_df.set_index('snap_time').sort_index()
            
            # Reindex to cover the full range needed for lookback calculation
            # We need to go back far enough to potentially find 20 contiguous values
            earliest_data = sec_df.index.min()
            lookback_start = min(earliest_data, start_dt - pd.Timedelta(hours=self.window_size))
            
            # Create full range from lookback_start to end_dt
            full_range = pd.date_range(start=lookback_start, end=end_dt, freq='h')
            full_df = sec_df.reindex(full_range)
            values = full_df[self.price_columns].values  # shape (n_timestamps, 3)

            n_rows = values.shape[0]
            rolling_std = np.full((n_rows, 3), np.nan, dtype=float)

            # Track the rolling window and the most recent valid calculation
            current_window = []
            last_valid_window = None
            last_valid_std = None
            
            for i in range(n_rows):
                row = values[i]
                
                if np.isnan(row).any():
                    # Missing data - reset current window but keep using last valid std
                    current_window = []
                    if last_valid_std is not None:
                        rolling_std[i] = last_valid_std
                else:
                    # Valid data - add to current window
                    current_window.append(row)
                    
                    # Keep only the last window_size values
                    if len(current_window) > self.window_size:
                        current_window.pop(0)
                    
                    if len(current_window) == self.window_size:
                        # We have a new complete window of 20 contiguous values
                        last_valid_window = current_window.copy()
                        last_valid_std = np.std(np.array(last_valid_window), axis=0, ddof=1)
                        rolling_std[i] = last_valid_std
                    elif last_valid_std is not None:
                        # Still building new window, but use last valid std
                        rolling_std[i] = last_valid_std

            # Map results to the requested output snaps ONLY
            idx_map = full_range.get_indexer(output_snaps)
            output_std = rolling_std[idx_map]

            sec_result = pd.DataFrame({
                'snap_time': output_snaps,
                'security_id': sec_id,
                'bid_std': output_std[:, 0],
                'mid_std': output_std[:, 1],
                'ask_std': output_std[:, 2]
            })
            result_data.append(sec_result)

        results = pd.concat(result_data, ignore_index=True)
        results = results.sort_values(['snap_time', 'security_id']).reset_index(drop=True)
        return results

    def save_results(self, results_df: pd.DataFrame, output_path: Path):
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        results_df_output = results_df.copy()
        results_df_output['snap_time'] = results_df_output['snap_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
        results_df_output.to_csv(output_path, index=False)
        print(f"Results saved to: {output_path}")

    def process_file(self, input_file: Path, output_file: Path, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        start_processing = time.time()
        df = self.load_data(input_file)
        results = self.calculate_rolling_std(df, start_time, end_time)
        self.save_results(results, output_file)
        print(f"Total processing time: {time.time() - start_processing:.2f} seconds")
        print(f"Output shape: {results.shape}")
        print(f"Unique security IDs: {results['security_id'].nunique()}")
        print(f"Time range: {results['snap_time'].min()} to {results['snap_time'].max()}")
        return results


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Calculate rolling standard deviation with lookback")
    default_input = Path(__file__).parent.parent / "data" / "stdev_price_data.parq"
    default_output = Path(__file__).parent.parent / "results" / "rolling_std_results.csv"

    parser.add_argument("--input", type=str, default=default_input)
    parser.add_argument("--output", type=str, default=default_output)
    parser.add_argument("--start_time", type=str, default="2021-11-20 00:00:00")
    parser.add_argument("--end_time", type=str, default="2021-11-23 09:00:00")

    args = parser.parse_args()

    calculator = RollingStandardDeviationCalculator(window_size=20)
    calculator.process_file(
        input_file=Path(args.input),
        output_file=Path(args.output),
        start_time=args.start_time,
        end_time=args.end_time
    )

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import time
import argparse
from dataclasses import dataclass
from typing import List

@dataclass
class CalculationConfig:
    window_size: int = 20
    price_columns: List[str] = None

    def __post_init__(self):
        if self.price_columns is None:
            self.price_columns = ['bid', 'mid', 'ask']

class RollingStandardDeviationCalculator:
    def __init__(self, window_size: int = 20):
        self.config = CalculationConfig(window_size=window_size)

    def load_data(self, file_path: Path) -> pd.DataFrame:
        """Load and prepare data with optimized dtypes"""
        df = pd.read_parquet(file_path, engine="pyarrow")
        df['snap_time'] = pd.to_datetime(df['snap_time'])
        return df.sort_values(['security_id', 'snap_time'])

    def _vectorized_rolling_std_single_security(self, sec_df: pd.DataFrame, output_snaps: pd.DatetimeIndex) -> pd.DataFrame:
        """
        Fully vectorized rolling std calculation using pandas rolling + forward fill
        """
        # Align index to full hourly range
        sec_df = sec_df.set_index('snap_time').sort_index().asfreq('H')
        
        # Calculate rolling std using pandas (C-optimized)
        rolling_std = sec_df[self.config.price_columns].rolling(
            window=self.config.window_size, min_periods=self.config.window_size
        ).std(ddof=1)
        
        # Forward fill last valid std to mimic your last_valid_std logic
        rolling_std = rolling_std.ffill()
        
        # Reindex to output snapshots
        rolling_std = rolling_std.reindex(output_snaps)
        
        return rolling_std

    def calculate_rolling_std(self, df: pd.DataFrame, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        """Optimized calculation using vectorized operations"""
        if start_time:
            start_dt = pd.to_datetime(start_time)
        else:
            start_dt = df['snap_time'].min()
        if end_time:
            end_dt = pd.to_datetime(end_time)
        else:
            end_dt = df['snap_time'].max()

        output_snaps = pd.date_range(start=start_dt, end=end_dt, freq='h')
        grouped = df.groupby('security_id', sort=False)

        result_list = []

        for sec_id, sec_df in grouped:
            sec_std = self._vectorized_rolling_std_single_security(sec_df, output_snaps)
            sec_std['security_id'] = sec_id
            sec_std['snap_time'] = sec_std.index
            result_list.append(sec_std.reset_index(drop=True))

        results = pd.concat(result_list, ignore_index=True)
        results = results[['snap_time', 'security_id'] + self.config.price_columns]
        results.columns = ['snap_time', 'security_id', 'bid_std', 'mid_std', 'ask_std']
        
        return results.sort_values(['snap_time', 'security_id']).reset_index(drop=True)

    def save_results(self, results_df: pd.DataFrame, output_path: Path):
        """Save results efficiently"""
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        # Convert snap_time to string in vectorized manner
        results_df['snap_time'] = results_df['snap_time'].astype(str)
        results_df.to_csv(output_path, index=False)
        print(f"Results saved to: {output_path}")

    def process_file(self, input_file: Path, output_file: Path, start_time: str = None, end_time: str = None) -> pd.DataFrame:
        """Main processing pipeline with timing"""
        start_processing = time.time()
        
        df = self.load_data(input_file)
        results = self.calculate_rolling_std(df, start_time, end_time)
        self.save_results(results, output_file)
        
        processing_time = time.time() - start_processing
        print(f"Total processing time: {processing_time:.2f} seconds")
        print(f"Output shape: {results.shape}")
        print(f"Unique security IDs: {results['security_id'].nunique()}")
        print(f"Time range: {results['snap_time'].min()} to {results['snap_time'].max()}")
        
        return results


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Calculate rolling standard deviation with lookback")
    default_input = Path(__file__).parent.parent / "data" / "stdev_price_data.parq"
    default_output = Path(__file__).parent.parent / "results" / "rolling_std_results.csv"

    parser.add_argument("--input", type=str, default=default_input)
    parser.add_argument("--output", type=str, default=default_output)
    parser.add_argument("--start_time", type=str, default="2021-11-20 00:00:00")
    parser.add_argument("--end_time", type=str, default="2021-11-23 09:00:00")

    args = parser.parse_args()

    calculator = RollingStandardDeviationCalculator(window_size=20)
    calculator.process_file(
        input_file=Path(args.input),
        output_file=Path(args.output),
        start_time=args.start_time,
        end_time=args.end_time
    )
