# Deep Learning at the Frontier Market Comparing CNN and Classical Machine Learning for Stock Prediction on the Nairobi Securities Exchange

In [None]:
# Importing all relevant libraries
from collections import deque

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report, confusion_matrix

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

import os
import time
import psutil

from PIL import Image, ImageDraw
import gc
import psutil



** Viewing the raw file first few and last few entries for validity check**

In [None]:
# Quick peek into raw file to understand header rows
with open("NSE_merged_cleaned_2007_2020.csv") as f:
    for _ in range(10):
        print(f.readline())

In [None]:
# Quick peek into raw file to understand tai rows
with open("NSE_merged_cleaned_2007_2020.csv") as f:
    last_lines = deque(f, maxlen=10)

for line in last_lines:
    print(line, end="")

### Section 3.1: Data Collection and Cleaning


In [None]:

df = pd.read_csv("NSE_merged_cleaned_2007_2020.csv")
df = df.drop(index=0).reset_index(drop=True)
df.columns = df.columns.str.strip()


df['DATE'] = pd.to_datetime(df['DATE'], errors='coerce', dayfirst=False)
numeric_cols = ['12m Low', '12m High', 'Day Low', 'Day High',
                'Day Price', 'Previous', 'Change', 'Change%', 'Volume', 'Adjust']


df[numeric_cols] = df[numeric_cols].replace({',': '', '-': np.nan, '–': np.nan}, regex=True)
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
df = df.dropna(subset=['DATE', 'CODE', 'Day Price'])
df = df.sort_values(['CODE', 'DATE']).reset_index(drop=True)


print(df['DATE'].min(), df['DATE'].max())
print(df.shape)


In [None]:
df_check = pd.read_csv("NSE_merged_cleaned_2007_2020.csv")
df_check = df_check.drop(index=0).reset_index(drop=True)
df_check.columns = df_check.columns.str.strip()


bad_dates = df_check[~pd.to_datetime(df_check['DATE'], errors='coerce', dayfirst=False).notna()]
print("Number of rows with bad DATEs:", bad_dates.shape[0])
print("Sample bad DATE values:")
print(bad_dates['DATE'].unique()[:10])


In [None]:
df = pd.read_csv("NSE_merged_cleaned_2007_2020.csv")
df = df.drop(index=0).reset_index(drop=True)
df.columns = df.columns.str.strip()
df['DATE'] = df['DATE'].astype(str).str.strip()
df = df[~df['DATE'].str.lower().isin(['date'])]


def try_parse(date_str):
    for fmt in ("%m/%d/%Y", "%d-%b-%y"):
        try:
            return pd.to_datetime(date_str, format=fmt)
        except:
            continue
    return pd.NaT

df['DATE'] = df['DATE'].apply(try_parse)


numeric_cols = ['12m Low', '12m High', 'Day Low', 'Day High',
                'Day Price', 'Previous', 'Change', 'Change%', 'Volume', 'Adjust']
df[numeric_cols] = df[numeric_cols].replace({',': '', '-': np.nan, '–': np.nan}, regex=True)
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')


df = df.dropna(subset=['DATE', 'CODE', 'Day Price'])
df = df.sort_values(['CODE', 'DATE']).reset_index(drop=True)


print("Date range:", df['DATE'].min(), df['DATE'].max())
print("Total rows:", df.shape[0])



In [None]:
df

### Section 3.2: Exploratory Data Analysis (Optional Plots)

In [None]:
df['return_1d'] = df.groupby('CODE')['Day Price'].pct_change()
df['return_1d_clipped'] = df['return_1d'].clip(-0.1, 0.1)
df['spread_pct'] = (df['Day High'] - df['Day Low']) / df['Day Price']
df['price_up'] = df.groupby('CODE')['Day Price'].diff() > 0
df['vol_5d'] = df.groupby('CODE')['return_1d'].rolling(5).std().reset_index(level=0, drop=True)
df['year'] = df['DATE'].dt.year


print("Skewness:", df['return_1d'].skew())
print("Kurtosis:", df['return_1d'].kurt())
print("Date Range:", df['DATE'].min(), df['DATE'].max())


df['return_1d'].hist(bins=100, range=(-0.1, 0.1), density=True, alpha=0.6)
plt.axvline(df['return_1d'].mean(), color='red', linestyle='--', label='Mean')
plt.title("Return Distribution")
plt.xlabel("1-Day Return")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()




In [None]:
most_traded = df.groupby('CODE')['Volume'].mean().sort_values(ascending=False).head(10)
most_traded.plot(kind='bar', title='Top 10 Most Traded Stocks by Volume')
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
scom = df[df['CODE'] == 'SCOM'].copy()
scom['30D MA'] = scom['Day Price'].rolling(30).mean()
plt.plot(scom['DATE'], scom['Day Price'], alpha=0.5, label='Price')
plt.plot(scom['DATE'], scom['30D MA'], color='red', label='30D MA')
plt.title("Safaricom Price and 30-Day MA")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
mean_returns = df.groupby('CODE')['return_1d'].mean().sort_values()
mean_returns.head(10).plot(kind='bar', color='crimson', title='Bottom 10 Stocks by Avg Daily Return')
plt.tight_layout(); plt.grid(True); plt.show()
mean_returns.tail(10).plot(kind='bar', color='green', title='Top 10 Stocks by Avg Daily Return')
plt.tight_layout(); plt.grid(True); plt.show()

In [None]:
spread_avg = df.groupby('CODE')['spread_pct'].mean().sort_values(ascending=False).head(10)
spread_avg.plot(kind='bar', title='Top 10 Stocks by Avg Intraday Spread', color='purple')
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
top3 = most_traded.head(3).index.tolist()
plt.figure(figsize=(12, 5))
for code in top3:
    tmp = df[df['CODE'] == code]
    plt.plot(tmp['DATE'], tmp['Volume'].rolling(20).mean(), label=code)
plt.title("20-Day Rolling Volume (Top 3 Stocks)")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
up_ratio = df.groupby('CODE')['price_up'].mean().sort_values(ascending=False).head(10)
up_ratio.plot(kind='bar', color='green', title='Top 10 by Upward Price Ratio')
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
top_stock = df['CODE'].value_counts().idxmax()
vol_data = df[df['CODE'] == top_stock].copy()
vol_data['rolling_vol'] = vol_data['return_1d'].rolling(10).std()
plt.plot(vol_data['DATE'], vol_data['rolling_vol'])
plt.title(f"10D Rolling Volatility: {top_stock}")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
spread_clusters = df.groupby('CODE')['spread_pct'].mean()
spread_clusters.plot(kind='hist', bins=30, edgecolor='black', title="Avg Spread Distribution")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
activity_table = df.groupby(['CODE', 'year']).size().unstack(fill_value=0)
display(activity_table.head(10))  # Show sample of stock activity

### Section 3.3: Feature Engineering

In [None]:
features_to_normalize = ['Day Price', 'Day High', 'Day Low', 'Previous', 'Volume']
df_scaled = df.copy() 


for code in df_scaled['CODE'].unique():
    mask = df_scaled['CODE'] == code
    subset = df_scaled.loc[mask, features_to_normalize]
    if subset.dropna().shape[0] > 1 and not (subset.nunique() == 1).any():
        scaler = StandardScaler()
        df_scaled.loc[mask, features_to_normalize] = scaler.fit_transform(subset)
    else:
        df_scaled.loc[mask, features_to_normalize] = np.nan


def add_features(group):
    group = group.sort_values("DATE").copy()
    group["momentum_5d"] = group["Day Price"].pct_change(periods=5, fill_method=None)
    group["ma_5"] = group["Day Price"].rolling(window=5).mean()
    group["ma_10"] = group["Day Price"].rolling(window=10).mean()
    group["ma_diff"] = group["ma_5"] - group["ma_10"]
    group["volatility_10d"] = group["return_1d_clipped"].rolling(window=10).std()
    group["hl_spread"] = (group["Day High"] - group["Day Low"]) / group["Day Price"]
    group["vol_zscore"] = (
        (group["Volume"] - group["Volume"].rolling(window=10).mean()) /
        group["Volume"].rolling(window=10).std()
    )
    return group


df_features_list = []
for code, group in df_scaled.groupby("CODE"):
    features = add_features(group)
    df_features_list.append(features)

df_features = pd.concat(df_features_list, axis=0)


df_features['ma_20'] = df_features.groupby("CODE")['Day Price'].transform(lambda x: x.rolling(window=20).mean())


df_features = df_features.dropna(subset=[
    "momentum_5d", "ma_5", "ma_10", "ma_diff",
    "volatility_10d", "hl_spread", "vol_zscore", "ma_20"
])


print(df_features[['CODE', 'DATE', 'Day Price', 'momentum_5d', 'ma_diff', 'volatility_10d']].head())


### Section 3.4: Chart-Based Image Construction for CNN Training

In [None]:

start_time = time.time()
process = psutil.Process(os.getpid())
print("📌 Starting final image generation with OHLC style...")


output_dir = "cnn_final_OHLC_BW"
os.makedirs(output_dir, exist_ok=True)
window = 20
label_horizon = 5
img_size = 32
chart_height = 18
volume_start = 20
label_data = []


csv_path = "NSE_merged_cleaned_2007_2020.csv"
df = pd.read_csv(csv_path, skiprows=[1])
df.columns = df.columns.str.strip()
df['DATE'] = pd.to_datetime(df['DATE'], errors='coerce')

numeric_cols = ['12m Low', '12m High', 'Day Low', 'Day High',
                'Day Price', 'Previous', 'Change', 'Change%', 'Volume', 'Adjust']
df[numeric_cols] = df[numeric_cols].replace(',', '', regex=True).apply(pd.to_numeric, errors='coerce')
df = df.dropna(subset=['DATE', 'CODE', 'Day Price', 'Day High', 'Day Low', 'Volume'])
df = df.sort_values(['CODE', 'DATE']).reset_index(drop=True)
df['ma_20'] = df.groupby("CODE")['Day Price'].transform(lambda x: x.rolling(window=20).mean())
df = df.dropna(subset=['ma_20'])


for code in df['CODE'].unique():
    stock = df[df['CODE'] == code].reset_index(drop=True)
    if len(stock) < window + label_horizon:
        continue

    for i in range(len(stock) - window - label_horizon):
        snippet = stock.iloc[i:i + window]
        if snippet[['Day Price', 'Day High', 'Day Low', 'Volume', 'ma_20']].isnull().any().any():
            continue

        price_cols = ['Day Price', 'ma_20', 'Day High', 'Day Low']
        price_min = snippet[price_cols].min().min()
        price_max = snippet[price_cols].max().max()
        vol_max = snippet['Volume'].max()

        def scale(series, out_min, out_max):
            return ((series - price_min) / (price_max - price_min + 1e-6) *
                    (out_max - out_min) + out_min).clip(out_min, out_max)

        close_scaled = scale(snippet['Day Price'], 0, chart_height)
        ma_scaled = scale(snippet['ma_20'], 0, chart_height - 2)
        high_scaled = scale(snippet['Day High'], 0, chart_height)
        low_scaled = scale(snippet['Day Low'], 0, chart_height)
        volume_scaled = (snippet['Volume'] / (vol_max + 1e-6) *
                         (img_size - volume_start - 1)).clip(0, img_size - volume_start - 1)

        img = Image.new("1", (img_size, img_size), 0)
        draw = ImageDraw.Draw(img)
        x_step = img_size / window

        
        for j in range(window):
            x = int(j * x_step)
            high_y = max(0, int(chart_height - high_scaled.iloc[j]))
            low_y = min(chart_height, int(chart_height - low_scaled.iloc[j]))
            close_y = min(chart_height, max(0, int(chart_height - close_scaled.iloc[j])))

            
            draw.line([(x, high_y), (x, low_y)], fill=1)
            
            draw.line([(x - 1, close_y), (x + 1, close_y)], fill=1)

        
        for j in range(1, window):
            x0 = int((j - 1) * x_step)
            x1 = int(j * x_step)
            y0 = int(chart_height - ma_scaled.iloc[j - 1])
            y1 = int(chart_height - ma_scaled.iloc[j])
            steps = max(abs(x1 - x0), abs(y1 - y0))
            for s in range(0, steps, 2):  
                xi = int(x0 + s * (x1 - x0) / steps)
                yi = int(y0 + s * (y1 - y0) / steps)
                if 0 <= xi < img_size and 0 <= yi < volume_start:
                    img.putpixel((xi, yi), 1)

        
        for j in range(window):
            x = int(j * x_step)
            v_height = int(volume_scaled.iloc[j])
            vol_top = img_size - 1
            vol_bottom = img_size - 1 - v_height
            draw.line([(x, vol_bottom), (x, vol_top)], fill=1)

        
        current_price = stock.iloc[i + window]['Day Price']
        future_price = stock.iloc[i + window + label_horizon - 1]['Day Price']
        label = int(future_price > current_price)

        date_str = snippet['DATE'].iloc[-1].strftime('%Y%m%d')
        fname = f"{code}_{date_str}.png"
        img.save(os.path.join(output_dir, fname))
        label_data.append((fname, label))

    gc.collect()


labels_df = pd.DataFrame(label_data, columns=["filename", "label"])
labels_df.to_csv(os.path.join(output_dir, "labels.csv"), index=False)


elapsed = time.time() - start_time
ram_used = process.memory_info().rss / 1024 / 1024
print(f"\n✅ Final image generation complete.")
print(f"⏱️ Runtime: {elapsed:.2f} seconds")
print(f"🧠 RAM Used: {ram_used:.2f} MB")


In [None]:
img_dir = "cnn_final_OHLC_BW"
labels_path = os.path.join(img_dir, "labels.csv")


labels_df = pd.read_csv(labels_path)


labels_df["exists"] = labels_df["filename"].apply(lambda x: os.path.exists(os.path.join(img_dir, x)))


total = len(labels_df)
missing = (~labels_df["exists"]).sum()
print(f"✅ Total samples in labels.csv: {total}")
print(f"⚠️ Missing image files: {missing}")
print("\nLabel Distribution:")
print(labels_df["label"].value_counts())


missing_files = labels_df[~labels_df["exists"]]
if not missing_files.empty:
    print("\nMissing Files Preview:")
    print(missing_files.head())



### Section 3.5: CNN Model Training & Evaluation


In [None]:

start_time = time.time()
process = psutil.Process(os.getpid())


label_path = "cnn_bw_images_v2/labels.csv"
label_df = pd.read_csv(label_path)


def load_bw_image(filepath, size=(32, 32)):
    img = Image.open(filepath).convert('L').resize(size)
    arr = np.array(img)
    arr = np.where(arr == 255, 1, 0)  
    return np.expand_dims(arr, axis=-1)


label_df = label_df[label_df['filename'].apply(lambda f: os.path.exists(f))]


X = np.array([load_bw_image(f) for f in label_df['filename']])
y = label_df['label'].values


X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)


model = Sequential([
    Input(shape=(32, 32, 1)),
    Conv2D(32, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Flatten(),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=0.001),
              loss='binary_crossentropy',
              metrics=['accuracy'])


history = model.fit(
    X_train, y_train,
    epochs=10,
    batch_size=32,
    validation_split=0.2
)


test_loss, test_acc = model.evaluate(X_test, y_test)
y_pred_prob = model.predict(X_test).ravel()
y_pred_binary = (y_pred_prob > 0.5).astype(int)

print("✅ Test Accuracy:", test_acc)
print("✅ ROC AUC:", roc_auc_score(y_test, y_pred_prob))
print("\nClassification Report:\n", classification_report(y_test, y_pred_binary))
print("\nConfusion Matrix:\n", confusion_matrix(y_test, y_pred_binary))


plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title("CNN Training Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


elapsed_time = time.time() - start_time
ram_used = process.memory_info().rss / 1024 / 1024

print(f"\n⏱️ Total runtime: {elapsed_time:.2f} seconds")
print(f"🧠 RAM used: {ram_used:.2f} MB")
