In [1]:
import bmll2 as b2
b2.get_file('modules/auxiliary_functions.py')
!pip install 'powerlaw'

Collecting powerlaw
  Downloading powerlaw-2.0.0-py3-none-any.whl.metadata (9.9 kB)
Collecting mpmath (from powerlaw)
  Downloading mpmath-1.3.0-py3-none-any.whl.metadata (8.6 kB)
Downloading powerlaw-2.0.0-py3-none-any.whl (191 kB)
Downloading mpmath-1.3.0-py3-none-any.whl (536 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m536.2/536.2 kB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mpmath, powerlaw
Successfully installed mpmath-1.3.0 powerlaw-2.0.0


In [2]:
# can do this when i have converted the notebooks to .py files
# import auxiliary_functions
import auxiliary_functions as af

import random
import math
import pandas as pd
import numpy as np
from pandas import StringDtype

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from matplotlib.ticker import LogFormatterSciNotation

from statsmodels.sandbox.stats.runs import runstest_1samp 
import powerlaw
import itertools
import pylab
import scipy.stats
from scipy.optimize import curve_fit

import shutil
from pathlib import Path
import warnings

In [3]:
top40_tickers = [
    "ABG",  # ABSA
    "AGL",  # Anglo American
    "ANG",  # Anglogold Ashanti
    "ANH",  # AB InBev
    "APN",  # Aspen Pharmacare
    "BHG",  # BHP Group
    "BID",  # Bidcorp
    "BVT",  # Bidvest
    "BTI",  # British American Tobacco
    "CPI",  # Capitec
    "CLS",  # Clicks
    "DSY",  # Discovery  
    "EXX",  # Exxaro
    "FSR",  # FirstRand
    "GLN",  # Glencore
    "GFI",  # Gold Fields
    "GRT",  # Growthpoint
    "IMP",  # Impala Platinum
    "INL",  # Investec Ltd
    "INP",  # Investec PLC
    "MNP",  # Mondi
    "MRP",  # Mr Price
    "MTN",  # MTN Group
    "NPN",  # Naspers
    "NED",  # Nedbank
    "NRP",  # NEPI Rockcastle
    "OMU",  # Old Mutual
    "PRX",  # Prosus
    "RNI",  # Reinet Investments
    "REM",  # Remgro
    "RMH",  # RMB
    "SLM",  # Sanlam
    "SOL",  # Sasol
    "SHP",  # Shoprite
    "SBK",  # Standard Bank
    "VAL",  # Valterra platinum
    "VOD",  # Vodacom
    "WHL"   # Woolworths
]

In [4]:
%%time
files = b2.list_files(path = 'top_40')

for f in files:
    b2.get_file(f'top_40/{f}')
    
def load_stock(csv):
    df = pd.read_csv(csv, parse_dates = ['DateTime', 'Date'])
    df = df.rename(columns = {'Ticker': 'RIC'})
    return df
    
stocks = {
    ticker: load_stock(f'{ticker}.csv')
    for ticker in top40_tickers
}

# takes about 6 minutes to load all 40 stocks

CPU times: user 4min 17s, sys: 1min 8s, total: 5min 26s
Wall time: 5min 50s


In [5]:
def one_sided_runs_test(sample, runs_correction = 0):
    sample = pd.Series(sample)
    values = sample.unique()

    if (len(values) < 2):
        return 'Only 1 unique value present'
        
    a = values[0]
    b = values[1]

    N_a  = len(sample[sample == a])
    N_b  = len(sample[sample == b])
    N    = N_a + N_b
    mu   = ((2 * N_a * N_b) / N) + 1
    runs = itertools.groupby(sample)
    R    = sum(1 for _ in runs)
    R_corrected = R + runs_correction

    sigma = np.sqrt((2 * N_a * N_b * (2 * N_a * N_b - N)) / (N ** 2 * (N - 1)))
    z     = (R_corrected - mu) / sigma

    p_value = scipy.stats.norm.cdf(z)

    return (z, p_value, R)

In [16]:
%%time
stocks_trade_sequences    = [] # will be a 3 deep list. 40 stocks, all days, < 10 sequences per day, indexes of trades for ST traders
stocks_p_vals             = [] # will be a 3 deep list. 40 stocks, all days, < 10 p vals per day, a single value per ST trader
stocks_run_lengths        = [] # will be a 3 deep list. 40 stocks, all days, < 10 traders per day, run lengths of each metaorder
stocks_total_volumes      = [] # will be a 2 deep list, 40 stocks, all days, total volume traded per day of each stock
stocks_percentage_STs     = [] 
stocks_percentage_STs_vol = []
stocks_percentage_STs_num = []

for ric, stock_data in stocks.items():

    stock_data['Date'] = pd.to_datetime(stock_data['Date'])
    stock_data['Year'] = stock_data['Date'].dt.year

    for year, df_year in stock_data.groupby('Year'):
    
        trader_trade_sequences      = [] # will be a 2 deep list. all days, < 10 sequences per day, indexes of trades for ST traders
        trader_p_vals               = [] # will be a 2 deep list. all days, < 10 p vals per day, a single value per ST trader
        trader_run_lengths          = [] # will be a 2 deep list. all days, < 10 traders per day, run lengths of each metaorder
        trader_total_volumes        = [] # will be a 1 deep list. all days, total volume traded per day
        trader_percentage_STs       = []
        trader_percentage_STs_vol   = []
        trader_percentage_STs_num   = []
        
        N = 25
    
        f = af.trader_participation(N = N, method = 'power', alpha = 2, f_min = 1, f_max = stock_data.shape[0], seed = 1)
        c = af.cumulative_probs(f)
    
        if stock_data.empty:
            continue
    
        output = af.orders(N = N, trades = df_year, cumulative_probs = c)
        
        STs_volume = 0
        STs_num    = 0
        for n in range(N):
    
            trader_n_trades = df_year.iloc[output[n], ]
    
            if (trader_n_trades.shape[0] <= 2):
                continue
    
            if (trader_n_trades['Trade Sign'].nunique() < 2):
                continue
    
            day_breaks = 0
            prev_date  = None
            prev_sign  = None
            
            for idx, row in trader_n_trades.iterrows():
                current_date = row['Date']
                current_sign = row['Trade Sign']
                
                # If we've moved to a new day and the sign is THE SAME from end of previous day
                if prev_date is not None and current_date != prev_date and current_sign == prev_sign:
                    day_breaks += 1
                
                prev_date = current_date
                prev_sign = current_sign
    
            runs_test = one_sided_runs_test(trader_n_trades['Trade Sign'], runs_correction = day_breaks)
            p_val     = runs_test[1]
    
            # we need to decide on an appropriate p value here. 1% seems too strict
            if p_val <= 0.01:
    
                STs_volume = STs_volume + sum(trader_n_trades['Volume'])
                STs_num    = STs_num + trader_n_trades.shape[0]
                trader_trade_sequences.append(trader_n_trades)
                trader_p_vals.append(p_val)
                
                grouped_trade_signs = itertools.groupby(trader_n_trades['Trade Sign'])
                
                for key, group in grouped_trade_signs:
                    trader_run_lengths.append(len(list(group)))
    
            #STs_percentage_vol = round((STs_volume / sum(stock_data['Volume'])) * 100, 3)
            #STs_percentage_num = round((STs_num / stock_data.shape[0]) * 100, 3)
                                                  
            #trader_trade_sequences.append(days_trade_sequences)
            #trader_p_vals.append(days_p_vals)
            #trader_sequence_run_lengths.append(days_run_lengths)
            trader_total_volumes.append(sum(stock_data['Volume']))
            #trader_percentage_STs.append(round((len(trader_trade_sequences) / N) * 100, 3))
            #trader_percentage_STs_vol.append(STs_percentage_vol)
            #trader_percentage_STs_num.append(STs_percentage_num)

        STs_percentage_vol = round((STs_volume / sum(stock_data['Volume'])) * 100, 3)
        STs_percentage_num = round((STs_num / stock_data.shape[0]) * 100, 3)
        
        stocks_trade_sequences.append(trader_trade_sequences)
        stocks_p_vals.append(trader_p_vals)
        stocks_run_lengths.append(trader_run_lengths)
        stocks_total_volumes.append(trader_total_volumes)
        stocks_percentage_STs.append(round((len(trader_trade_sequences) / N) * 100, 3))
        stocks_percentage_STs_vol.append(STs_percentage_vol)
        stocks_percentage_STs_num.append(STs_percentage_num)
        print('done with', ric, 'for', year)

# takes about 2 hours to run

done with ABG for 2023
done with ABG for 2024
done with ABG for 2025
done with AGL for 2023
done with AGL for 2024
done with AGL for 2025
done with ANG for 2023
done with ANG for 2024
done with ANG for 2025
done with ANH for 2023
done with ANH for 2024
done with ANH for 2025
done with APN for 2023
done with APN for 2024
done with APN for 2025
done with BHG for 2023
done with BHG for 2024
done with BHG for 2025
done with BID for 2023
done with BID for 2024
done with BID for 2025
done with BVT for 2023
done with BVT for 2024
done with BVT for 2025
done with BTI for 2023
done with BTI for 2024
done with BTI for 2025
done with CPI for 2023
done with CPI for 2024
done with CPI for 2025
done with CLS for 2023
done with CLS for 2024
done with CLS for 2025
done with DSY for 2023
done with DSY for 2024
done with DSY for 2025
done with EXX for 2023
done with EXX for 2024
done with EXX for 2025
done with FSR for 2023
done with FSR for 2024
done with FSR for 2025
done with GLN for 2023
done with G

In [17]:
stocks_percentage_STs     = np.array(stocks_percentage_STs)
stocks_percentage_STs_vol = np.array(stocks_percentage_STs_vol)
stocks_percentage_STs_num = np.array(stocks_percentage_STs_num)

STs_percentage = stocks_percentage_STs[stocks_percentage_STs > 0]
STs_vols       = stocks_percentage_STs_vol[stocks_percentage_STs_vol > 0]
STs_nums       = stocks_percentage_STs_num[stocks_percentage_STs_num > 0]

ST_df          = pd.DataFrame({'% STs' : STs_percentage, 'STs volume' : STs_vols, 'STs number' : STs_nums})
ST_df.to_csv('ST_df_power_25.csv', index = False)
b2.put_file('ST_df_power_25.csv', 'test_data')

In [None]:
def extract_ST_run_lengths(stock_data, N = 100, p_threshold = 0.01):
    """
    Extracts metaorder run lengths L from statistically identified
    splitting traders (STs) for a given stock and time window.
    
    Returns
    -------
    run_lengths : list of int
        Metaorder lengths pooled across ST traders
    """
    run_lengths = []

    # Synthetic trader reconstruction (your existing method)
    f = af.trader_participation(N = N, method = 'power', alpha =  2, f_min = 1 ,f_max = stock_data.shape[0], seed = 1)
    c = af.cumulative_probs(f)
    output = af.orders(N = N, trades = stock_data, cumulative_probs = c)

    for n in range(N):
        trader_trades = stock_data.iloc[output[n]]

        # Too few trades → useless
        if trader_trades.shape[0] <= 2:
            continue

        # No sign variation → no runs test
        if trader_trades['Trade Sign'].nunique() < 2:
            continue

        # Correct runs across day boundaries
        day_breaks = 0
        prev_date  = None
        prev_sign  = None

        for idx, row in trader_trades.iterrows():
            if prev_date is not None:
                if row['Date'] != prev_date and row['Trade Sign'] == prev_sign:
                    day_breaks += 1
            prev_date = row['Date']
            prev_sign = row['Trade Sign']

        z, p_val, R = one_sided_runs_test(trader_trades['Trade Sign'], runs_correction = day_breaks)

        # Identify splitting traders
        if p_val <= p_threshold:
            for key, group in itertools.groupby(trader_trades['Trade Sign']):
                run_lengths.append(len(list(group)))

    return run_lengths

In [None]:
%%time
for ticker, stock_data in stocks.items():

    stock_data['Date'] = pd.to_datetime(stock_data['Date'])
    stock_data['Year'] = stock_data['Date'].dt.year

    for year, df_year in stock_data.groupby('Year'):

        if df_year.shape[0] < 1000:
            continue  # too small for power laws

        L = extract_ST_run_lengths(df_year)
        signs = df_year['Trade Sign']
        
        if len(L) == 0:
            continue
        
        out = pd.DataFrame({'L' : L})
        out.to_csv(f'{ticker}_run_lengths_yearly_{year}.csv', index = False)
        b2.put_file(f'{ticker}_run_lengths_yearly_{year}.csv', 'test_data')

        signs = pd.DataFrame({'Trade Sign' : signs})
        signs.to_csv(f'{ticker}_trade_signs_{year}.csv', index = False)
        b2.put_file(f'{ticker}_trade_signs_{year}.csv', 'test_data')
        
    print(f'saved yearly run lengths for {ticker}')

# takes about 2 hours

In [None]:
%%time
def assign_half_year(date):
    return 'H1' if date.month <= 6 else 'H2'

for ticker, stock_data in stocks.items():

    stock_data['Date'] = pd.to_datetime(stock_data['Date'])
    stock_data['Year'] = stock_data['Date'].dt.year
    stock_data['Half'] = stock_data['Date'].apply(assign_half_year)

    for (year, half), df_half in stock_data.groupby(['Year', 'Half']):

        if df_half.shape[0] < 1000:
            continue

        L = extract_ST_run_lengths(df_half)
        signs = df_half['Trade Sign']
        
        if len(L) == 0:
            continue

        out = pd.DataFrame({'L': L})
        out.to_csv(f'{ticker}_run_lengths_half_yearly_{year}_{half}.csv', index = False)
        b2.put_file(f'{ticker}_run_lengths_half_yearly_{year}_{half}.csv', 'test_data')

        signs = pd.DataFrame({'Trade Sign' : signs})
        signs.to_csv(f'{ticker}_trade_signs_{year}_{half}.csv', index = False)
        b2.put_file(f'{ticker}_trade_signs_{year}_{half}.csv', 'test_data')
        
    print(f'saved half-year run lengths for {ticker}')

# takes about 2 hours

In [None]:
#L = pd.read_csv('AGL_run_lengths_yearly_2025.csv')
#L = np.array(L)

run_lengths = pd.read_csv('AGL_run_lengths_yearly_2025.csv')
L = run_lengths['L'].to_numpy()

fit = powerlaw.Fit(L)#, xmin = 1)

alpha = fit.power_law.alpha
xmin  = fit.power_law.xmin

fig = fit.plot_ccdf(color = 'blue', linewidth = 2)
fit.power_law.plot_ccdf(color = 'red', linestyle = '--', ax = fig, label = rf'$P_{{>}}(L) = -{alpha - 1:.3f}$')
plt.xlabel(r'$L$')
plt.ylabel(r'$P_{>}(L)$')
plt.legend()
#plt.xscale('log')
#plt.yscale('log')

pylab.show()

In [None]:
#Some diagnostics from here on out to test for inaccuracies or bugs

In [None]:
AGL = data_retriever('CPI', jan_path, dec_path)
AGL[np.isinf(AGL['Trade Sign'])]

In [None]:
N = 10
for date, day_D in AGL.groupby('Date', sort = False):
    print(date)
    f = af.trader_participation(N = N, method = 'homogenous', alpha = 2, f_min = 1, f_max = 1000, seed = 1)
    c = af.cumulative_probs(f)
    trades = day_D
    if trades.empty:
        continue

    output = af.orders(N = N, trades = trades, cumulative_probs = c)
    
    days_trade_sequences = []
    days_p_vals = []
    daily_run_lengths = []

    for n in range(N):
        print(f"Trader {n}")
        trader_n_trades = trades.iloc[output[n], ]

        if (trader_n_trades.shape[0] <= 2):
            continue

        if (trader_n_trades['Trade Sign'].nunique() < 2):
            #print(f"  Skipping trader {n} - only {trader_n_trades['Trade Sign'].nunique()} unique value(s)")
            continue
        
        with warnings.catch_warnings(record = True) as w:
            warnings.simplefilter('always')
            runs_test = one_sided_runs_test(trader_n_trades['Trade Sign'])
            p_val = runs_test[1]
            
            if len(w) > 0:
                print(f"  *** WARNING on trader {n}: {w[0].message}")
                print(f"  len={len(trader_n_trades)}, unique={trader_n_trades['Trade Sign'].nunique()}")
                print(f"  Values: {trader_n_trades['Trade Sign'].values}")
        
        print(f"  p_val={p_val}")

In [None]:
one_sided_runs_test([-1 , 1])

In [None]:
# for each day at a time

stocks_trade_sequences    = [] # will be a 3 deep list. 40 stocks, all days, < 10 sequences per day, indexes of trades for ST traders
stocks_p_vals             = [] # will be a 3 deep list. 40 stocks, all days, < 10 p vals per day, a single value per ST trader
stocks_run_lengths        = [] # will be a 3 deep list. 40 stocks, all days, < 10 traders per day, run lengths of each metaorder
stocks_total_volumes      = [] # will be a 2 deep list, 40 stocks, all days, total volume traded per day of each stock
stocks_percentage_STs     = [] 
stocks_percentage_STs_vol = []
stocks_percentage_STs_num = []

for ticker in tickers:
    
    stock_data = data_retriever(ticker, jan_path, dec_path)

    daily_trade_sequences      = [] # will be a 2 deep list. all days, < 10 sequences per day, indexes of trades for ST traders
    daily_p_vals               = [] # will be a 2 deep list. all days, < 10 p vals per day, a single value per ST trader
    daily_sequence_run_lengths = [] # will be a 2 deep list. all days, < 10 traders per day, run lengths of each metaorder
    daily_total_volumes        = [] # will be a 1 deep list. all days, total volume traded per day
    daily_percentage_STs       = []
    daily_percentage_STs_vol   = []
    daily_percentage_STs_num   = []
    
    N = 20
    for date, day_D in stock_data.groupby('Date', sort = False):
    
            f = af.trader_participation(N = N, method = 'homogenous', alpha = 2, f_min = 1, f_max = stock_data.shape[0], seed = None)
            c = af.cumulative_probs(f)
    
            trades = day_D
            if trades.empty:
                continue
    
            output = af.orders(N = N, trades = trades, cumulative_probs = c)
            
            days_trade_sequences = []
            days_p_vals = []
            days_run_lengths = []
            #days_percentage_STs_vol = []
            #days_percentage_STs_num = []

            STs_volume = 0
            STs_num    = 0
            for n in range(N):
                
                trader_n_trades = trades.iloc[output[n], ]
    
                if (trader_n_trades.shape[0] <= 2):
                    continue
    
                if (trader_n_trades['Trade Sign'].nunique() < 2):
                    continue

                #runs_test = runstest_1samp(trader_n_trades['Trade Sign'], correction = False)
                runs_test = one_sided_runs_test(trader_n_trades['Trade Sign'])
                p_val     = runs_test[1]
    
                # we need to decide on an appropriate p value here. 1% seems too strict
                if p_val <= 0.05:

                    STs_volume = STs_volume + sum(trader_n_trades['Volume'])
                    STs_num    = STs_num + trader_n_trades.shape[0]
                    days_trade_sequences.append(trader_n_trades)
                    days_p_vals.append(p_val)
                    
                    grouped_trade_signs = itertools.groupby(trader_n_trades['Trade Sign'])
                    
                    for key, group in grouped_trade_signs:
                        days_run_lengths.append(len(list(group)))

            STs_percentage_vol = round((STs_volume / sum(trades['Volume'])) * 100, 3)
            STs_percentage_num = round((STs_num / trades.shape[0]) * 100, 3)
                                                  
            daily_trade_sequences.append(days_trade_sequences)
            daily_p_vals.append(days_p_vals)
            daily_sequence_run_lengths.append(days_run_lengths)
            daily_total_volumes.append(sum(trades['Volume']))
            daily_percentage_STs.append(round((len(days_trade_sequences) / N) * 100, 3))
            daily_percentage_STs_vol.append(STs_percentage_vol)
            daily_percentage_STs_num.append(STs_percentage_num)
    
    stocks_trade_sequences.append(daily_trade_sequences)
    stocks_p_vals.append(daily_p_vals)
    stocks_run_lengths.append(daily_sequence_run_lengths)
    stocks_total_volumes.append(daily_total_volumes)
    stocks_percentage_STs.append(daily_percentage_STs)
    stocks_percentage_STs_vol.append(daily_percentage_STs_vol)
    stocks_percentage_STs_num.append(daily_percentage_STs_num)
    print('done with', ticker)

STs_percentage = [item for sublist in stocks_percentage_STs for item in sublist]
STs_percentage = np.array(STs_percentage)

STs_vols = [item for sublist in stocks_percentage_STs_vol for item in sublist]
# this list comprehension works by item for (sublist in run_lengths) for item in sublist
STs_vols = np.array(STs_vols)

STs_nums = [item for sublist in stocks_percentage_STs_num for item in sublist]
# this list comprehension works by item for (sublist in run_lengths) for item in sublist
STs_nums = np.array(STs_nums)

STs_percentage = STs_percentage[STs_percentage > 0]
STs_vols       = STs_vols[STs_vols > 0]
STs_nums       = STs_nums[STs_nums > 0]

In [None]:
# Create the three histograms
fig, axes = plt.subplots(1, 3, figsize = (18, 5))

# Histogram 1: Percentage of traders identified as ST
axes[0].hist(STs_percentage, bins = 10, edgecolor = 'black', alpha = 0.7, color = 'blue')
axes[0].set_xlabel('% of Traders Classified as ST', fontsize = 11)
axes[0].set_ylabel('Frequency', fontsize = 11)
axes[0].set_title('ST Traders (by count)', fontsize = 12)
axes[0].grid(True, alpha = 0.3)

# Histogram 2: Volume percentage
axes[1].hist(STs_vols, bins = 30, edgecolor = 'black', alpha = 0.7, color = 'green')
axes[1].set_xlabel('% of Volume from ST Traders', fontsize = 11)
axes[1].set_ylabel('Frequency', fontsize = 11)
axes[1].set_title('ST Volume Share', fontsize = 12)
axes[1].grid(True, alpha = 0.3)

# Histogram 3: Number of trades percentage
axes[2].hist(STs_nums, bins = 30, edgecolor = 'black', alpha = 0.7, color = 'red')
axes[2].set_xlabel('% of Trades from ST Traders', fontsize = 11)
axes[2].set_ylabel('Frequency', fontsize = 11)
axes[2].set_title('ST Trade Count Share', fontsize = 12)
axes[2].grid(True, alpha = 0.3)

plt.tight_layout()
plt.show()

# Summary statistics
print(f"Total stock-days analyzed: {len(STs_percentage)}")
print(f"\nST Traders: Mean = {np.mean(STs_percentage):.2f}%, Median = {np.median(STs_percentage):.2f}%")
print(f"ST Volume: Mean = {np.mean(STs_vols):.2f}%, Median = {np.median(STs_vols):.2f}%")
print(f"ST Trades: Mean = {np.mean(STs_nums):.2f}%, Median = {np.median(STs_nums):.2f}%")

In [None]:
L = [item for sublist in daily_sequence_run_lengths for item in sublist]
# this list comprehension works by item for (sublist in run_lengths) for item in sublist
L = np.array(L)

fit = powerlaw.Fit(L, xmin = 1)

alpha = fit.power_law.alpha
xmin  = fit.power_law.xmin

fig = fit.plot_ccdf(color = 'blue', linewidth = 2)
fit.power_law.plot_ccdf(color = 'red', linestyle = '--', ax = fig, label = rf'$P_{{>}}(L) = -{alpha-1:.3f}$')
plt.xlabel(r'$L$')
plt.ylabel(r'$P_{>}(L)$')
plt.legend()
#plt.xscale('log')
#plt.yscale('log')

pylab.show()