In [45]:
import numpy as np
import pandas as pd
import joblib
import os
from collections import Counter
from math import log, exp
np.random.seed(42)


# dataset + split + scaler
def generate_or_load_dataset(path='upi_fraud_dataset.csv', n_samples=3000, seed=42):
    if os.path.exists(path):
        df = pd.read_csv(path)
        print("Loaded dataset:", df.shape)
        return df
    np.random.seed(seed)
    data = {
        'trans_hour': np.random.randint(0, 24, n_samples),
        'trans_day': np.random.randint(1, 32, n_samples),
        'trans_month': np.random.randint(1, 13, n_samples),
        'trans_year': np.random.choice([2022, 2023], n_samples),
        'category': np.random.randint(0, 15, n_samples),
        'upi_number': np.random.randint(9000000000, 9999999999, n_samples),
        'age': np.random.randint(18, 80, n_samples),
        'trans_amount': np.random.exponential(1000, n_samples),
        'state': np.random.randint(1, 36, n_samples),
        'zip': np.random.randint(100000, 999999, n_samples),
    }
    fraud_prob = (
        (data['trans_hour'] < 6) * 0.3 +
        (data['trans_amount'] > 5000) * 0.4 +
        (data['age'] < 25) * 0.2 +
        np.random.random(n_samples) * 0.3
    )
    data['fraud_risk'] = (fraud_prob > 0.6).astype(int)
    df = pd.DataFrame(data)
    df.to_csv(path, index=False)
    print("Generated dataset:", df.shape)
    return df

def train_test_split_np(X, y, test_size=0.2, seed=42):
    np.random.seed(seed)
    n = X.shape[0]
    idx = np.random.permutation(n)
    test_n = int(n * test_size)
    return X[idx[test_n:]], X[idx[:test_n]], y[idx[test_n:]], y[idx[:test_n]]

class StandardScalerManual:
    def fit(self, X):
        self.mean_ = np.mean(X, axis=0)
        self.scale_ = np.std(X, axis=0, ddof=0)
        self.scale_[self.scale_ == 0] = 1.0
        return self
    def transform(self, X):
        return (X - self.mean_) / self.scale_
    def fit_transform(self, X):
        return self.fit(X).transform(X)
    def save(self, fname):
        joblib.dump({'mean': self.mean_, 'scale': self.scale_}, fname)
    @classmethod
    def load(cls, fname):
        data = joblib.load(fname)
        s = cls()
        s.mean_ = data['mean']
        s.scale_ = data['scale']
        return s



In [46]:
# Logistic Regression
class LogisticRegressionSGD:
    def __init__(self, lr=0.02, n_iter=2000, l2=0.0, verbose=False):
        self.lr = lr; self.n_iter = n_iter; self.l2 = l2; self.verbose = verbose
    def _sigmoid(self, z):
        z = np.clip(z, -500, 500)
        return 1.0 / (1.0 + np.exp(-z))
    def fit(self, X, y):
        n, d = X.shape
        self.w = np.zeros(d)
        self.b = 0.0
        for it in range(self.n_iter):
            z = X.dot(self.w) + self.b
            preds = self._sigmoid(z)
            error = preds - y
            gw = (X.T.dot(error)) / n + self.l2 * self.w
            gb = np.sum(error) / n
            self.w -= self.lr * gw
            self.b -= self.lr * gb
            if self.verbose and it % (self.n_iter//5+1) == 0:
                loss = -np.mean(y*np.log(preds+1e-12) + (1-y)*np.log(1-preds+1e-12))
                print(f"[LogReg] iter {it} loss {loss:.4f}")
        return self
    def predict_proba(self, X):
        p = self._sigmoid(X.dot(self.w) + self.b)
        return np.vstack([1-p, p]).T
    def predict(self, X, thr=0.5):
        return (self.predict_proba(X)[:,1] >= thr).astype(int)
    

In [47]:
# SVM
class LinearSVM_SGD:
    def __init__(self, lr=0.005, n_iter=2000, C=1.0, verbose=False):
        self.lr = lr; self.n_iter = n_iter; self.C = C; self.verbose = verbose
    def fit(self, X, y):
        # y in {0,1} -> {-1,+1}
        y_ = (y*2 - 1).astype(float)
        n,d = X.shape
        self.w = np.zeros(d)
        self.b = 0.0
        for it in range(self.n_iter):
            margins = y_ * (X.dot(self.w) + self.b)
            idx = margins < 1
            if idx.any():
                grad_w = self.w - (self.C/n) * (X[idx].T.dot(y_[idx]))
                grad_b = - (self.C/n) * np.sum(y_[idx])
            else:
                grad_w = self.w
                grad_b = 0.0
            self.w -= self.lr * grad_w
            self.b -= self.lr * grad_b
            if self.verbose and it % (self.n_iter//5+1) == 0:
                loss = 0.5*np.dot(self.w,self.w) + self.C*np.mean(np.maximum(0,1-margins))
                print(f"[SVM] iter {it} loss {loss:.4f}")
        return self
    def decision_function(self, X): return X.dot(self.w) + self.b
    def predict(self, X): return (self.decision_function(X) >= 0).astype(int)
    def predict_proba(self, X):
        s = self.decision_function(X)
        p = 1.0/(1.0+np.exp(-s))
        return np.vstack([1-p,p]).T


In [48]:
# Decision Tree  + Random Forest
def gini(y):
    if len(y)==0: return 0.0
    ps = np.bincount(y)/len(y)
    return 1.0 - np.sum(ps*ps)

class DecisionTreeNode:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature=feature; self.threshold=threshold; self.left=left; self.right=right; self.value=value

class DecisionTree:
    def __init__(self, max_depth=10, min_samples_split=2, max_features=None):
        self.max_depth=max_depth; self.min_samples_split=min_samples_split; self.max_features=max_features
    def fit(self, X, y):
        self.n_features_ = X.shape[1]
        self.root = self._grow_tree(X,y,0)
    def _best_split(self, X, y):
        n,d = X.shape
        features = range(d) if self.max_features is None else np.random.choice(d, self.max_features, replace=False)
        best=None; best_gain=0.0
        parent = gini(y)
        for feat in features:
            vals = X[:,feat]; uniq = np.unique(vals)
            if uniq.shape[0]==1: continue
            thr_candidates = (uniq[:-1]+uniq[1:])/2.0
            for thr in thr_candidates:
                left_idx = vals <= thr; right_idx = ~left_idx
                if left_idx.sum()==0 or right_idx.sum()==0: continue
                g_left = gini(y[left_idx]); g_right = gini(y[right_idx])
                gain = parent - (left_idx.sum()/n)*g_left - (right_idx.sum()/n)*g_right
                if gain > best_gain:
                    best_gain = gain
                    best = (feat, thr, left_idx, right_idx)
        return best, best_gain
    def _grow_tree(self, X, y, depth):
        n = len(y)
        num_pos = np.sum(y==1)
        if depth >= self.max_depth or n < self.min_samples_split or num_pos==0 or num_pos==n:
            return DecisionTreeNode(value=int(round(np.mean(y))))
        split, gain = self._best_split(X,y)
        if split is None or gain<=0:
            return DecisionTreeNode(value=int(round(np.mean(y))))
        feat, thr, left_idx, right_idx = split
        left = self._grow_tree(X[left_idx], y[left_idx], depth+1)
        right = self._grow_tree(X[right_idx], y[right_idx], depth+1)
        return DecisionTreeNode(feature=feat, threshold=thr, left=left, right=right)
    def _predict_one(self, x, node):
        if node.value is not None: return node.value
        if x[node.feature] <= node.threshold:
            return self._predict_one(x, node.left)
        else:
            return self._predict_one(x, node.right)
    def predict(self, X):
        return np.array([self._predict_one(x, self.root) for x in X])
    def predict_proba(self, X):
        p = self.predict(X)
        return np.vstack([1-p, p]).T

class RandomForest:
    def __init__(self, n_estimators=100, max_depth=12, min_samples_split=5, max_features=None, seed=42):
        self.n_estimators=n_estimators; self.max_depth=max_depth; self.min_samples_split=min_samples_split
        self.max_features=max_features; self.seed=seed
    def fit(self, X, y):
        np.random.seed(self.seed)
        self.trees=[]
        n=X.shape[0]
        for i in range(self.n_estimators):
            idx = np.random.choice(n, n, replace=True)
            Xb = X[idx]; yb = y[idx]
            tree = DecisionTree(max_depth=self.max_depth, min_samples_split=self.min_samples_split, max_features=self.max_features)
            tree.fit(Xb, yb)
            self.trees.append(tree)
    def predict_proba(self, X):
        probs = np.array([t.predict_proba(X)[:,1] for t in self.trees])  # shape (n_trees, n_samples)
        avg = np.mean(probs, axis=0)
        return np.vstack([1-avg, avg]).T
    def predict(self, X):
        return (self.predict_proba(X)[:,1] >= 0.5).astype(int)



In [49]:
# Gradient Boosting
class GradientBoostingStumps:
    def __init__(self, n_estimators=50, learning_rate=0.1):
        self.n_estimators=n_estimators; self.lr=learning_rate
    def fit(self, X, y):
        n,d = X.shape
        p = np.clip(np.mean(y), 1e-6, 1-1e-6)
        F0 = 0.5 * np.log(p/(1-p))
        self.base_score = F0
        self.trees=[]; self.scalars=[]
        F = np.full(n, F0)
        for m in range(self.n_estimators):
            p = 1.0/(1.0+np.exp(-F))
            residual = y - p
            best=None; best_loss=float('inf')
            for feat in range(d):
                vals = X[:,feat]; uniq = np.unique(vals)
                if uniq.shape[0]==1: continue
                thr_cands = (uniq[:-1]+uniq[1:])/2.0
                for thr in thr_cands:
                    left = vals <= thr; right = ~left
                    if left.sum()==0 or right.sum()==0: continue
                    left_mean = residual[left].mean(); right_mean = residual[right].mean()
                    loss = ((residual[left]-left_mean)**2).sum() + ((residual[right]-right_mean)**2).sum()
                    if loss < best_loss:
                        best_loss = loss
                        best = (feat, thr, left_mean, right_mean)
            if best is None: break
            self.trees.append(best)
            self.scalars.append(1.0)
            feat, thr, lm, rm = best
            F += self.lr * (np.where(X[:,feat] <= thr, lm, rm))
        return self
    def predict_proba(self, X):
        n = X.shape[0]
        F = np.full(n, self.base_score)
        for (feat,thr,lm,rm), s in zip(self.trees, self.scalars):
            F += self.lr * (np.where(X[:,feat] <= thr, lm, rm) * s)
        p = 1.0/(1.0+np.exp(-F))
        return np.vstack([1-p, p]).T
    def predict(self, X):
        return (self.predict_proba(X)[:,1] >= 0.5).astype(int)


In [50]:
# MLP (NumPy) — dense network
def relu(x): return np.maximum(0, x)
def drelu(x): return (x > 0).astype(float)
def softmax(z):
    z = z - np.max(z, axis=1, keepdims=True)
    ez = np.exp(z)
    return ez / np.sum(ez, axis=1, keepdims=True)

class MLP_Numpy:
    def __init__(self, layer_sizes, lr=0.005, n_epochs=500, batch_size=42, verbose=True):
        self.layer_sizes = layer_sizes
        self.lr = lr; self.n_epochs=n_epochs; self.batch_size=batch_size; self.verbose=verbose
        self._init_weights()
    def _init_weights(self):
        rng = np.random.RandomState(123)
        self.weights=[]; self.biases=[]
        for i in range(len(self.layer_sizes)-1):
            in_dim=self.layer_sizes[i]; out_dim=self.layer_sizes[i+1]
            W = rng.normal(scale=np.sqrt(2.0/(in_dim+out_dim)), size=(in_dim,out_dim))
            b = np.zeros(out_dim)
            self.weights.append(W); self.biases.append(b)
    def fit(self, X, y):
        n = X.shape[0]
        # build one-hot
        y_enc = np.zeros((n,2))
        y_enc[np.arange(n), y] = 1
        for epoch in range(self.n_epochs):
            perm = np.random.permutation(n)
            X_sh = X[perm]; y_sh = y_enc[perm]
            for i in range(0, n, self.batch_size):
                xb = X_sh[i:i+self.batch_size]; yb = y_sh[i:i+self.batch_size]
                activations = [xb]; pre_acts=[]
                for W,b in zip(self.weights[:-1], self.biases[:-1]):
                    z = activations[-1].dot(W) + b
                    pre_acts.append(z)
                    activations.append(relu(z))
                z = activations[-1].dot(self.weights[-1]) + self.biases[-1]
                pre_acts.append(z)
                probs = softmax(z)
                activations.append(probs)
                delta = (probs - yb) / max(1, xb.shape[0])
                grads_w=[]; grads_b=[]
                # last layer grads
                a_prev = activations[-2]
                gw = a_prev.T.dot(delta); gb = delta.sum(axis=0)
                grads_w.append(gw); grads_b.append(gb)
                # backprop for remaining layers
                delta = delta.dot(self.weights[-1].T) * drelu(pre_acts[-2])
                for l in range(len(self.weights)-2, -1, -1):
                    a_prev = activations[l]
                    gw = a_prev.T.dot(delta); gb = delta.sum(axis=0)
                    grads_w.append(gw); grads_b.append(gb)
                    if l > 0:
                        delta = delta.dot(self.weights[l].T) * drelu(pre_acts[l-1])
                grads_w = grads_w[::-1]; grads_b = grads_b[::-1]
                for idx in range(len(self.weights)):
                    self.weights[idx] -= self.lr * grads_w[idx]
                    self.biases[idx] -= self.lr * grads_b[idx]
            if self.verbose and epoch % (self.n_epochs//5 + 1) == 0:
                preds = self.predict(X)
                acc = (preds == y).mean()
                print(f"[MLP] epoch {epoch} acc {acc:.4f}")
        return self
    def predict_proba(self, X):
        a = X
        for W,b in zip(self.weights[:-1], self.biases[:-1]):
            a = relu(a.dot(W) + b)
        logits = a.dot(self.weights[-1]) + self.biases[-1]
        return softmax(logits)
    def predict(self, X):
        return np.argmax(self.predict_proba(X), axis=1)
   

In [51]:
# CNN
class Conv1D_Simple:
    def __init__(self, in_channels, out_channels, kernel_size, lr=0.005):
        # in_channels should be 1 for our tabular shape (features,1)
        self.kernel_size = kernel_size
        self.out_channels = out_channels
        # kernel shape (out_channels, kernel_size)
        rng = np.random.RandomState(123)
        self.W = rng.normal(scale=0.1, size=(out_channels, kernel_size))
        self.b = np.zeros(out_channels)
        self.lr = lr
    def forward(self, X):
        # X shape (N, L, 1) -> treat last dim as channel 1
        N, L, C = X.shape
        K = self.kernel_size
        outL = L - K + 1
        Y = np.zeros((N, outL, self.out_channels))
        for n in range(N):
            for o in range(self.out_channels):
                for i in range(outL):
                    segment = X[n, i:i+K, 0]  # shape (K,)
                    Y[n, i, o] = np.dot(self.W[o], segment) + self.b[o]
        self.cache = (X, Y)
        return Y
    def backward(self, dY):
        X, Y = self.cache
        N, L, C = X.shape
        K = self.kernel_size
        outL = L - K + 1
        dW = np.zeros_like(self.W)
        db = np.zeros_like(self.b)
        dX = np.zeros_like(X)
        for n in range(N):
            for o in range(self.out_channels):
                for i in range(outL):
                    seg = X[n, i:i+K, 0]
                    dW[o] += dY[n, i, o] * seg
                    db[o] += dY[n, i, o]
                    dX[n, i:i+K, 0] += dY[n, i, o] * self.W[o]
        # update
        self.W -= self.lr * dW / N
        self.b -= self.lr * db / N
        return dX


In [52]:
def train_hybrid_rf_cnn_numPy(X_train, X_test, y_train, y_test, rf_model,
                              conv_filters=16, kernel_size=5, conv_lr=0.005, dense_lr=0.001, epochs=60):
    # Fit RF
    rf_model.fit(X_train, y_train)
    rf_train_probs = rf_model.predict_proba(X_train)[:,1].reshape(-1,1)
    rf_test_probs = rf_model.predict_proba(X_test)[:,1].reshape(-1,1)
    
    X_train_cnn = np.hstack([X_train, rf_train_probs])[:,:,np.newaxis]
    X_test_cnn  = np.hstack([X_test, rf_test_probs])[:,:,np.newaxis]
    N, L, _ = X_train_cnn.shape

    # Conv layer
    conv = Conv1D_Simple(in_channels=1, out_channels=conv_filters, kernel_size=kernel_size, lr=conv_lr)

    outL = L - kernel_size + 1
    hidden_dim = outL * conv_filters

    rng = np.random.RandomState(123)
    W1 = rng.normal(scale=0.1, size=(hidden_dim, 64))
    b1 = np.zeros(64)
    W2 = rng.normal(scale=0.1, size=(64, 1))
    b2 = np.zeros(1)

    def sigmoid(x): return 1/(1+np.exp(-x))

    for epoch in range(epochs):
        conv_out = conv.forward(X_train_cnn)
        flat = conv_out.reshape(N, -1)
        z1 = flat.dot(W1) + b1
        a1 = np.maximum(0, z1)
        logits = a1.dot(W2) + b2
        probs = sigmoid(logits).reshape(-1)
        loss = -np.mean(y_train*np.log(probs+1e-12) + (1-y_train)*np.log(1-probs+1e-12))
        
        # Backprop dense
        dlogits = (probs - y_train).reshape(-1,1)/N
        dW2 = a1.T.dot(dlogits)
        db2 = dlogits.sum(axis=0)
        da1 = dlogits.dot(W2.T)
        dz1 = da1 * (z1>0)
        dW1 = flat.T.dot(dz1)
        db1 = dz1.sum(axis=0)
        dflat = dz1.dot(W1.T)
        # Update dense
        W2 -= dense_lr * dW2
        b2 -= dense_lr * db2
        W1 -= dense_lr * dW1
        b1 -= dense_lr * db1
        # Backprop into conv
        dconv = dflat.reshape(conv_out.shape)
        conv.backward(dconv)
        
        if epoch % max(1, epochs//5)==0:
            preds = (probs>=0.5).astype(int)
            acc = (preds==y_train).mean()
            print(f"[HybridRF+CNN] epoch {epoch} loss {loss:.4f} train acc {acc:.4f}")

    # Evaluate
    conv_out_test = conv.forward(X_test_cnn)
    flat_test = conv_out_test.reshape(X_test_cnn.shape[0], -1)
    a1_test = np.maximum(0, flat_test.dot(W1) + b1)
    logits_test = a1_test.dot(W2) + b2
    probs_test = sigmoid(logits_test).reshape(-1)
    acc_test = ((probs_test>=0.5).astype(int) == y_test).mean()
    
    return acc_test*100


In [53]:
def train_hybrid_gb_dnn(X_train, X_test, y_train, y_test, gb_model=None,
                        mlp_layers=None, mlp_lr=0.005, mlp_epochs=300):
    # Increase n_estimators and reduce learning rate for GB
    if gb_model is None:
        gb_model = GradientBoostingStumps(n_estimators=300, learning_rate=0.05)
    gb_model.fit(X_train, y_train)
    
    gb_probs_train = gb_model.predict_proba(X_train)[:,1].reshape(-1,1)
    gb_probs_test  = gb_model.predict_proba(X_test)[:,1].reshape(-1,1)
    X_train_hybrid = np.hstack([X_train, gb_probs_train])
    X_test_hybrid  = np.hstack([X_test, gb_probs_test])
    
    if mlp_layers is None:
        mlp_layers = [X_train_hybrid.shape[1], 256, 128, 64, 2]  # deeper network
    
    mlp = MLP_Numpy(
        mlp_layers,
        lr=mlp_lr,
        n_epochs=mlp_epochs,
        batch_size=32,  # smaller batch helps convergence
        verbose=True
    )
    mlp.fit(X_train_hybrid, y_train)
    preds = mlp.predict(X_test_hybrid)
    acc = (preds == y_test).mean()
    return acc*100


In [54]:
# Train everything and print accuracies
os.makedirs('models', exist_ok=True)
df = generate_or_load_dataset(n_samples=2000)   # tweak n_samples if desired
X = df.iloc[:,:-1].values.astype(float)
y = df.iloc[:,-1].values.astype(int)

X_train, X_test, y_train, y_test = train_test_split_np(X,y,test_size=0.2,seed=42)
scaler = StandardScalerManual()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
scaler.save('models/scaler_manual_numPy.joblib')

scores = {}

# Logistic Regression (from scratch)
lr = LogisticRegressionSGD(lr=0.02, n_iter=2000)
lr.fit(X_train_scaled, y_train)
scores['Logistic Regression'] = round((lr.predict(X_test_scaled) == y_test).mean()*100, 2)

# Linear SVM (from scratch)
svm = LinearSVM_SGD(lr=0.005, n_iter=2000, C=1.0)
svm.fit(X_train_scaled, y_train)
scores['SVM'] = round((svm.predict(X_test_scaled) == y_test).mean()*100, 2)

# Random Forest (from scratch)
rf = RandomForest(
    n_estimators=100, 
    max_depth=12, 
    min_samples_split=5, 
    max_features=int(np.sqrt(X.shape[1])), 
    seed=42
)
rf.fit(X_train_scaled, y_train)
scores['Random Forest'] = round((rf.predict(X_test_scaled) == y_test).mean()*100, 2)

# Neural Network (from scratch)
mlp = MLP_Numpy(
    [X_train_scaled.shape[1], 128, 64, 2], 
    lr=0.005, 
    n_epochs=500, 
    batch_size=42, 
    verbose=True
)
mlp.fit(X_train_scaled, y_train)
scores['Neural Network'] = round((mlp.predict(X_test_scaled) == y_test).mean()*100, 2)

# Hybrid RF + CNN (from scratch CNN)
rf_for_hybrid = RandomForest(
    n_estimators=100, 
    max_depth=12, 
    min_samples_split=5, 
    max_features=int(np.sqrt(X.shape[1])), 
    seed=42
)
acc_rf_cnn = train_hybrid_rf_cnn_numPy(
    X_train_scaled, X_test_scaled, y_train, y_test, rf_for_hybrid,
    conv_filters=16, kernel_size=5, conv_lr=0.005, dense_lr=0.001, epochs=60
)
scores['Hybrid RF + CNN'] = round(acc_rf_cnn, 2)

# Hybrid GB + DNN (from scratch)
acc_gb_dnn = train_hybrid_gb_dnn(
    X_train_scaled, X_test_scaled, y_train, y_test,
    gb_model=GradientBoostingStumps(n_estimators=300, learning_rate=0.05),
    mlp_layers=[X_train_scaled.shape[1]+1, 256, 128, 64, 2],
    mlp_lr=0.005, mlp_epochs=300
)
scores['Hybrid GB + DNN'] = round(acc_gb_dnn, 2)



Loaded dataset: (2666, 11)
[MLP] epoch 0 acc 0.5241
[MLP] epoch 101 acc 0.8659
[MLP] epoch 202 acc 0.8889
[MLP] epoch 303 acc 0.8940
[MLP] epoch 404 acc 0.9025
[HybridRF+CNN] epoch 0 loss 0.7114 train acc 0.3812
[HybridRF+CNN] epoch 12 loss 0.7107 train acc 0.3835
[HybridRF+CNN] epoch 24 loss 0.7101 train acc 0.3872
[HybridRF+CNN] epoch 36 loss 0.7094 train acc 0.3915
[HybridRF+CNN] epoch 48 loss 0.7087 train acc 0.3910
[MLP] epoch 0 acc 0.6128
[MLP] epoch 61 acc 0.9006
[MLP] epoch 122 acc 0.9255
[MLP] epoch 183 acc 0.9541
[MLP] epoch 244 acc 0.9700
