## Loading Data

In [52]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

def load_and_clean_data(file_path):
    """Load and perform initial cleaning of insider trading data"""
    df = pd.read_csv(file_path)
    
    # Clean column names
    df.columns = df.columns.str.strip().str.replace('\n', '')
    
    # Convert date columns to datetime
    date_cols = ['DATE OF ALLOTMENT/ACQUISITION FROM', 'DATE OF ALLOTMENT/ACQUISITION TO']
    for col in date_cols:
        df[col] = pd.to_datetime(df[col], format='%d-%b-%Y', errors='coerce')
    
    # Clean numeric columns
    df['NO. OF SECURITIES (ACQUIRED/DISPLOSED)'] = pd.to_numeric(df['NO. OF SECURITIES (ACQUIRED/DISPLOSED)'], errors='coerce')
    
    # Clean VALUE column
    df['VALUE OF SECURITY (ACQUIRED/DISPLOSED)'] = df['VALUE OF SECURITY (ACQUIRED/DISPLOSED)'].replace('-', np.nan)
    df['VALUE OF SECURITY (ACQUIRED/DISPLOSED)'] = pd.to_numeric(df['VALUE OF SECURITY (ACQUIRED/DISPLOSED)'], errors='coerce')
    
    return df


In [53]:
df = load_and_clean_data("CF-Insider-Trading-equities-14-07-2022-to-14-07-2025.csv")

In [54]:
df.to_csv("Cleaned_Data.csv")

In [37]:
df.columns

Index(['SYMBOL', 'COMPANY', 'REGULATION', 'NAME OF THE ACQUIRER/DISPOSER',
       'CATEGORY OF PERSON', 'TYPE OF SECURITY (PRIOR)',
       'NO. OF SECURITY (PRIOR)', '% SHAREHOLDING (PRIOR)',
       'TYPE OF SECURITY (ACQUIRED/DISPLOSED)',
       'NO. OF SECURITIES (ACQUIRED/DISPLOSED)',
       'VALUE OF SECURITY (ACQUIRED/DISPLOSED)',
       'ACQUISITION/DISPOSAL TRANSACTION TYPE', 'TYPE OF SECURITY (POST)',
       'NO. OF SECURITY (POST)', '% POST',
       'DATE OF ALLOTMENT/ACQUISITION FROM',
       'DATE OF ALLOTMENT/ACQUISITION TO', 'DATE OF INITMATION TO COMPANY',
       'MODE OF ACQUISITION', 'DERIVATIVE TYPE SECURITY',
       'DERIVATIVE CONTRACT SPECIFICATION', 'NOTIONAL VALUE(BUY)',
       'NUMBER OF UNITS/CONTRACT LOT SIZE (BUY)', 'NOTIONAL VALUE(SELL)',
       'NUMBER OF UNITS/CONTRACT LOT SIZE  (SELL)', 'EXCHANGE', 'REMARK',
       'BROADCASTE DATE AND TIME', 'XBRL'],
      dtype='object')

## Data Preprocessing

In [38]:
def filter_relevant_data(df):
    """Filter for relevant insider trading data"""
    # Filter out rows with missing critical information
    df_filtered = df.dropna(subset=[
        'SYMBOL', 'DATE OF ALLOTMENT/ACQUISITION FROM', 
        'NO. OF SECURITIES (ACQUIRED/DISPLOSED)',
        'ACQUISITION/DISPOSAL TRANSACTION TYPE'
    ])
    
    # Filter for only Buy and Sell transactions
    df_filtered = df_filtered[df_filtered['ACQUISITION/DISPOSAL TRANSACTION TYPE'].isin(['Buy', 'Sell'])]
    
    # Filter for market transactions only
    market_modes = ['Market Sale', 'Market Purchase', 'Off Market']
    df_filtered = df_filtered[df_filtered['MODE OF ACQUISITION'].isin(market_modes)]
    
    # Filter for significant insider categories
    significant_categories = ['Promoters', 'Promoter Group', 'Director', 'Key Managerial Personnel', 'Employees']
    df_filtered = df_filtered[df_filtered['CATEGORY OF PERSON'].isin(significant_categories)]
    
    # Create transaction_type column (1 for Buy, -1 for Sell)
    df_filtered['transaction_type'] = df_filtered['ACQUISITION/DISPOSAL TRANSACTION TYPE'].map({
        'Buy': 1, 'Sell': -1
    })
    
    # Filter for reasonable date range (2022-2025)
    start_date = pd.to_datetime('2022-01-01')
    end_date = pd.to_datetime('2025-12-31')
    df_filtered = df_filtered[
        (df_filtered['DATE OF ALLOTMENT/ACQUISITION FROM'] >= start_date) & 
        (df_filtered['DATE OF ALLOTMENT/ACQUISITION FROM'] <= end_date)
    ]
    
    return df_filtered

## Feature Engineering

In [39]:
def create_insider_weights():
    """Create weights for different insider categories"""
    return {
        'Promoters': 4,
        'Promoter Group': 3,
        'Director': 2,
        'Key Managerial Personnel': 1
    }

def add_insider_weights(df):
    """Add insider weights to dataframe"""
    weights = create_insider_weights()
    df['insider_weight'] = df['CATEGORY OF PERSON'].map(weights)
    return df

def calculate_transaction_metrics(df):
    """Calculate additional metrics for each transaction"""
    # Calculate transaction value (if available)
    df['transaction_value'] = df['VALUE OF SECURITY (ACQUIRED/DISPLOSED)'].fillna(0)
    
    # Calculate weighted transaction score
    df['weighted_score'] = df['transaction_type'] * df['insider_weight'] * np.log1p(df['NO. OF SECURITIES (ACQUIRED/DISPLOSED)'])
    
    # Add transaction date as reference
    df['transaction_date'] = df['DATE OF ALLOTMENT/ACQUISITION FROM']
    
    return df


## Consensus Signal Generation

In [40]:
class ConsensusParameters:
    def __init__(self, lookback_window=3, min_insiders=1, min_net_score=0.02, 
        signal_hold_period=1, min_transaction_value=0):
        self.lookback_window = lookback_window  # days
        self.min_insiders = min_insiders        # minimum number of insiders needed
        self.min_net_score = min_net_score      # minimum weighted net score
        self.signal_hold_period = signal_hold_period  # days to hold signal
        self.min_transaction_value = min_transaction_value  # minimum transaction value

In [41]:
def generate_consensus_signals(df, params):
    """Generate consensus insider trading signals"""
    signals = []
    
    # Group by company symbol
    for symbol in df['SYMBOL'].unique():
        company_data = df[df['SYMBOL'] == symbol].copy()
        company_data = company_data.sort_values('transaction_date')
        
        # Create rolling window analysis
        signals.extend(analyze_company_consensus(company_data, params))
    
    return pd.DataFrame(signals)

def analyze_company_consensus(company_data, params):
    """Analyze consensus for a single company"""
    signals = []
    
    # Get unique transaction dates
    transaction_dates = company_data['transaction_date'].unique()
    
    for date in transaction_dates:
        # Define lookback window
        window_start = date - timedelta(days=params.lookback_window)
        window_end = date
        
        # Filter data within window
        window_data = company_data[
            (company_data['transaction_date'] >= window_start) & 
            (company_data['transaction_date'] <= window_end)
        ]
        
        # Calculate consensus metrics
        signal = calculate_consensus_signal(window_data, date, params)
        if signal:
            signals.append(signal)
    
    return signals

def calculate_consensus_signal(window_data, signal_date, params):
    """Calculate consensus signal for a time window"""
    if len(window_data) < params.min_insiders:
        return None
    
    # Count unique insiders
    unique_insiders = window_data['NAME OF THE ACQUIRER/DISPOSER'].nunique()
    if unique_insiders < params.min_insiders:
        return None
    
    # Calculate net weighted score
    net_score = window_data['weighted_score'].sum()
    
    # Calculate transaction direction consensus
    buy_score = window_data[window_data['transaction_type'] == 1]['weighted_score'].sum()
    sell_score = abs(window_data[window_data['transaction_type'] == -1]['weighted_score'].sum())
    
    # Determine signal direction
    if buy_score > sell_score and net_score > params.min_net_score:
        signal_direction = 'BUY'
    elif sell_score > buy_score and abs(net_score) > params.min_net_score:
        signal_direction = 'SELL'
    else:
        return None
    
    # Calculate signal strength
    signal_strength = abs(net_score) / params.min_net_score
    
    return {
        'symbol': window_data['SYMBOL'].iloc[0],
        'company': window_data['COMPANY'].iloc[0],
        'signal_date': signal_date,
        'signal_direction': signal_direction,
        'signal_strength': signal_strength,
        'net_score': net_score,
        'unique_insiders': unique_insiders,
        'total_transactions': len(window_data),
        'buy_score': buy_score,
        'sell_score': sell_score
    }


## Signal Validation and Filtering

In [42]:
def filter_overlapping_signals(signals_df, params):
    """Remove overlapping signals for same company"""
    filtered_signals = []
    
    for symbol in signals_df['symbol'].unique():
        company_signals = signals_df[signals_df['symbol'] == symbol].copy()
        company_signals = company_signals.sort_values('signal_date')
        
        last_signal_date = None
        for _, signal in company_signals.iterrows():
            if (last_signal_date is None or 
                (signal['signal_date'] - last_signal_date).days > params.signal_hold_period):
                filtered_signals.append(signal)
                last_signal_date = signal['signal_date']
    
    return pd.DataFrame(filtered_signals)


#### Quality Scoring

In [43]:
def add_quality_score(signals_df):
    """Add quality score to signals"""
    # Normalize signal strength
    max_strength = signals_df['signal_strength'].max()
    signals_df['normalized_strength'] = signals_df['signal_strength'] / max_strength
    
    # Calculate quality score based on multiple factors
    signals_df['quality_score'] = (
        0.4 * signals_df['normalized_strength'] +
        0.3 * (signals_df['unique_insiders'] / signals_df['unique_insiders'].max()) +
        0.3 * (signals_df['total_transactions'] / signals_df['total_transactions'].max())
    )
    
    return signals_df


## Portfolio Construction

In [44]:
def calculate_position_sizes(signals_df, total_capital=1000000):
    """Calculate position sizes based on signal quality"""
    # Filter for high quality signals
    high_quality_signals = signals_df[signals_df['quality_score'] >= 0.6]
    
    # Calculate position sizes
    total_quality_score = high_quality_signals['quality_score'].sum()
    high_quality_signals['position_size'] = (
        high_quality_signals['quality_score'] / total_quality_score * total_capital
    )
    
    return high_quality_signals


In [45]:
def apply_diversification_rules(portfolio_df, max_position_pct=0.15):
    """Apply diversification constraints"""
    total_capital = portfolio_df['position_size'].sum()
    max_position_size = total_capital * max_position_pct
    
    # Cap individual positions
    portfolio_df['position_size'] = portfolio_df['position_size'].clip(upper=max_position_size)
    
    # Rebalance to maintain total capital
    scale_factor = total_capital / portfolio_df['position_size'].sum()
    portfolio_df['position_size'] *= scale_factor
    
    return portfolio_df


## Backtesting 

In [46]:
def load_price_data(symbols):
    """Load historical price data for backtesting"""
    # This would typically connect to a financial data provider
    # For demonstration, we'll create a placeholder
    price_data = {}
    for symbol in symbols:
        # Placeholder for actual price data loading
        price_data[symbol] = generate_mock_price_data(symbol)
    return price_data

def generate_mock_price_data(symbol):
    """Generate mock price data for demonstration"""
    dates = pd.date_range('2022-01-01', '2025-07-15', freq='D')
    prices = 100 * np.cumprod(1 + np.random.normal(0, 0.02, len(dates)))
    return pd.DataFrame({'date': dates, 'price': prices})


In [47]:
def run_backtest(portfolio_df, price_data, params):
    """Run backtesting on the consensus strategy"""
    results = []
    
    for _, position in portfolio_df.iterrows():
        symbol = position['symbol']
        entry_date = position['signal_date']
        exit_date = entry_date + timedelta(days=params.signal_hold_period)
        
        # Get price data for this symbol
        symbol_prices = price_data.get(symbol)
        if symbol_prices is None:
            continue
        
        # Calculate returns
        entry_price = get_price_on_date(symbol_prices, entry_date)
        exit_price = get_price_on_date(symbol_prices, exit_date)
        
        if entry_price and exit_price:
            if position['signal_direction'] == 'BUY':
                return_pct = (exit_price - entry_price) / entry_price
            else:  # SELL
                return_pct = (entry_price - exit_price) / entry_price
            
            results.append({
                'symbol': symbol,
                'entry_date': entry_date,
                'exit_date': exit_date,
                'entry_price': entry_price,
                'exit_price': exit_price,
                'return_pct': return_pct,
                'position_size': position['position_size'],
                'pnl': return_pct * position['position_size']
            })
    
    return pd.DataFrame(results)

def get_price_on_date(price_data, target_date):
    """Get price on a specific date"""
    # ensure your date column is real datetime
    price_data['date'] = pd.to_datetime(price_data['date'])
    # set it as the index, sorted
    price_data = price_data.set_index('date').sort_index()

    # .asof will now look up the last index â‰¤ target_date
    price = price_data['price'].asof(target_date)
    return None if pd.isna(price) else price


## Performance Analysis

In [48]:
def calculate_performance_metrics(backtest_results):
    """Calculate comprehensive performance metrics"""
    if len(backtest_results) == 0:
        return {}
    
    returns = backtest_results['return_pct']
    pnl = backtest_results['pnl']
    
    metrics = {
        'total_trades': len(backtest_results),
        'total_return': pnl.sum(),
        'win_rate': (returns > 0).mean(),
        'avg_return': returns.mean(),
        'volatility': returns.std(),
        'sharpe_ratio': returns.mean() / returns.std() if returns.std() > 0 else 0,
        'max_drawdown': calculate_max_drawdown(pnl),
        'profit_factor': calculate_profit_factor(pnl),
        'avg_win': returns[returns > 0].mean() if (returns > 0).any() else 0,
        'avg_loss': returns[returns < 0].mean() if (returns < 0).any() else 0
    }
    
    return metrics

def calculate_max_drawdown(pnl_series):
    """Calculate maximum drawdown"""
    cumulative_pnl = pnl_series.cumsum()
    rolling_max = cumulative_pnl.expanding().max()
    drawdown = cumulative_pnl - rolling_max
    return drawdown.min()

def calculate_profit_factor(pnl_series):
    """Calculate profit factor"""
    gross_profit = pnl_series[pnl_series > 0].sum()
    gross_loss = abs(pnl_series[pnl_series < 0].sum())
    return gross_profit / gross_loss if gross_loss > 0 else np.inf


## Compile Implementation

In [49]:
class ConsensusInsiderStrategy:
    def __init__(self, params=None):
        self.params = params or ConsensusParameters()
        self.raw_data = None
        self.cleaned_data = None
        self.signals = None
        self.portfolio = None
        self.backtest_results = None
        self.performance_metrics = None
    
    def load_data(self, file_path):
        """Load and clean data"""
        self.raw_data = load_and_clean_data(file_path)
        self.cleaned_data = self.preprocess_data()
        return self
    
    def preprocess_data(self):
        """Preprocess the data"""
        df = filter_relevant_data(self.raw_data)
        df = add_insider_weights(df)
        df = calculate_transaction_metrics(df)
        return df
    
    def generate_signals(self):
        """Generate consensus signals"""
        self.signals = generate_consensus_signals(self.cleaned_data, self.params)
        self.signals = filter_overlapping_signals(self.signals, self.params)
        self.signals = add_quality_score(self.signals)
        return self
    
    def build_portfolio(self, total_capital=1000000):
        """Build portfolio"""
        self.portfolio = calculate_position_sizes(self.signals, total_capital)
        self.portfolio = apply_diversification_rules(self.portfolio)
        return self
    
    def run_backtest(self, price_data=None):
        """Run backtesting"""
        if price_data is None:
            symbols = self.portfolio['symbol'].unique()
            price_data = load_price_data(symbols)
        
        self.backtest_results = run_backtest(self.portfolio, price_data, self.params)
        self.performance_metrics = calculate_performance_metrics(self.backtest_results)
        return self
    
    def get_results(self):
        """Get comprehensive results"""
        return {
            'signals': self.signals,
            'portfolio': self.portfolio,
            'backtest_results': self.backtest_results,
            'performance_metrics': self.performance_metrics
        }


In [50]:
def run_parameter_sweep(param_sets, file_path='CF-Insider-Trading-equities-14-07-2022-to-14-07-2025.csv', 
                        total_capital=1000000):
    """
    Run the full strategy for multiple parameter sets and collect performance metrics.
    
    :param param_sets: List of dictionaries, each with keys like 'lookback_window', 'min_insiders', etc.
    :param file_path: Path to the data file.
    :param total_capital: Starting capital for portfolio building.
    :return: Dictionary where keys are parameter set indices, values are performance metrics dicts.
    """
    sweep_results = {}
    
    for i, params_dict in enumerate(param_sets):
        # Create parameters object with custom values
        params = ConsensusParameters(**params_dict)
        
        # Initialize and run strategy
        strategy = ConsensusInsiderStrategy(params=params)
        strategy.load_data(file_path)
        strategy.generate_signals()
        strategy.build_portfolio(total_capital=total_capital)
        strategy.run_backtest()
        
        # Get and store results
        results = strategy.get_results()
        sweep_results[f'Set_{i+1}'] = {
            'parameters': params_dict,
            'performance_metrics': results['performance_metrics']
        }
        
        # Print summary for this set
        print(f"\nResults for Parameter Set {i+1}: {params_dict}")
        for metric, value in results['performance_metrics'].items():
            print(f"{metric}: {value:.4f}")
    
    return sweep_results


In [51]:
# Initialize strategy
strategy = ConsensusInsiderStrategy()

# Load and process data
strategy.load_data('CF-Insider-Trading-equities-14-07-2022-to-14-07-2025.csv')

# Generate signals
strategy.generate_signals()

# Build portfolio
strategy.build_portfolio(total_capital=1000000)

# Run backtest
strategy.run_backtest()

# Get results
results = strategy.get_results()

# Display performance metrics
print("Performance Metrics:")
for metric, value in results['performance_metrics'].items():
    print(f"{metric}: {value:.4f}")


Performance Metrics:
total_trades: 9.0000
total_return: 9811.4293
win_rate: 0.7778
avg_return: 0.0100
volatility: 0.0151
sharpe_ratio: 0.6649
max_drawdown: -2849.5178
profit_factor: 4.4432
avg_win: 0.0162
avg_loss: -0.0117
