In [102]:
import pandas as pd
import numpy as np
from tqdm import trange
import torch

## Load and Organize Data

In [103]:
cannabis_df = pd.read_csv("cannabis_full.csv")
cannabis_df.head()

Unnamed: 0,Strain,Type,Rating,Effects,Flavor,Creative,Energetic,Tingly,Euphoric,Relaxed,...,Ammonia,Minty,Tree,Fruit,Butter,Pineapple,Tar,Rose,Plum,Pear
0,100-Og,hybrid,4.0,"Creative,Energetic,Tingly,Euphoric,Relaxed","Earthy,Sweet,Citrus",1.0,1.0,1.0,1.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,98-White-Widow,hybrid,4.7,"Relaxed,Aroused,Creative,Happy,Energetic","Flowery,Violet,Diesel",1.0,1.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,1024,sativa,4.4,"Uplifted,Happy,Relaxed,Energetic,Creative","Spicy/Herbal,Sage,Woody",1.0,1.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,13-Dawgs,hybrid,4.2,"Tingly,Creative,Hungry,Relaxed,Uplifted","Apricot,Citrus,Grapefruit",1.0,0.0,1.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,24K-Gold,hybrid,4.6,"Happy,Relaxed,Euphoric,Uplifted,Talkative","Citrus,Earthy,Orange",0.0,0.0,0.0,1.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [104]:
def split_dataset(data, target_column, test_ratio):
    np.random.seed(42)
    num_test = int(len(data) * test_ratio)
    
    shuffled_indices = np.random.permutation(len(data))
    test_indices = shuffled_indices[:num_test]
    train_indices = shuffled_indices[num_test:]

    train_df = data.iloc[train_indices]
    test_df = data.iloc[test_indices]

    X_train = train_df.drop(target_column, axis=1)
    y_train = train_df[target_column]

    X_test = test_df.drop(target_column, axis=1)
    y_test = test_df[target_column]

    return X_train, y_train, X_test, y_test

In [105]:
c_df = cannabis_df.dropna()
c_df = c_df[c_df["Type"].isin(["sativa", "indica"])]
c_df["Type_is_sativa"] = np.where(c_df["Type"] == "sativa", 1, 0)
c_df = c_df.drop(["Strain", "Effects", "Flavor", "Type"], axis=1)

X_train, y_train, X_test, y_test = split_dataset(c_df, target_column="Type_is_sativa", test_ratio=0.2)
X_train["intercept"] = 1
X_test["intercept"] = 1

In [106]:
len(X_train.columns)

66

## NN Functions

### Numpy Version

In [107]:
def precision(y, yhat):
    true_positives = np.sum((y == 1) & (yhat == 1))
    predicted_positives = np.sum(yhat == 1)
    return true_positives / predicted_positives

def recall(y, yhat):
    true_positives = np.sum((y == 1) & (yhat == 1))
    actual_positives = np.sum(y == 1)
    return true_positives / actual_positives

def f1_score(precision, recall):
    return 2 * precision * recall / (precision + recall)

In [108]:
# Activation Functions

def relu(x):
    return np.maximum(0, x)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def softplus(x):
    return np.log(1 + np.exp(x))

In [109]:
# Loss Functions

def squared_error(y, yhat):
    n = len(y)
    l = 1 / n * np.power(np.sum(yhat - y), 2)
    return l

def svc_hinge_loss(y, yhat):
    n = len(y)
    l = 1 / n * np.sum(np.where(y * yhat) > 1, y * yhat, 0)
    return l

In [110]:
# Gradient Functions

def sigmoid_grad(z):
    left = np.power(1 / (1 + np.exp(-z)), 2)
    right = -np.exp(-z)
    return left * right

def relu_grad(u):
    return u if u > 0 else 0

def softplus_grad(n):
    num = np.exp(n)
    den = np.exp(n) + 1
    return num / den

def squared_error_grad(y, yhat):
    n = len(y)
    dl = 2 / n * np.sum(yhat - y)
    return dl

def svc_hinge_grad(y, yhat):
    n = len(y)
    dl = 1 / n * np.sum(np.where(y * yhat) > 1, yhat, 0)
    return dl

In [111]:
def forward_pass(X, W, V, activation_f):
    Z = X.dot(W)
    Q = activation_f(Z)
    n = Q.dot(V)
    p = activation_f(n)
    
    return Z, Q, n, p


def backpropogation(v, eta, activation_gradient_f, loss_gradient_f):
    dl = loss_gradient_f(v['y'], v['p'])
    dp = activation_gradient_f(v['n'])
    dn = v['Q']
    V_i = (dl * dp).dot(dn)

    dv = activation_gradient_f(v['Z'])
    # dv = v['Z']
    dz = v['X']
    W_i = ((V_i * dv).T @ dz).T

    assert v['W'].shape == W_i.shape
    assert v['V'].shape == V_i.shape

    V = v['V'] + (V_i * eta)
    W = v['W'] + (W_i * eta)

    return W, V


# https://numpy.org/doc/stable/reference/generated/numpy.allclose.html
# atol - absolute tolerance (additive)
# rtol - relative tolerance (multiplicative)
# atol more important than rtol here since values are small?
def stopping_condition(W, W_new, V, V_new):
    return np.allclose(W, W_new, atol=0.5, rtol=0.1) and np.allclose(V, V_new, atol=0.5, rtol=0.1)


def fit(X, y, eta, activation_funcs, loss_funcs, max_iter=1e6):
    W = np.random.random((X.shape[1], 3))
    V = np.random.random(3)
    for _ in trange(int(max_iter)):
        Z, Q, n, p = forward_pass(X, W, V, activation_funcs['activation_f'])

        values = {'W': W, 'V': V, 'Z': Z, 'Q': Q, 'n': n, 'p': p, 'X': X, 'y': y}

        W_new, V_new = backpropogation(values, eta, activation_funcs['gradient_f'], loss_funcs['gradient_f'])

        while not stopping_condition(W, W_new, V, V_new):
            print("stopping")
            return W_new, V_new
        
        W = W_new
        V = V_new

    return W, V

In [115]:
relu_funs = {'activation_f': relu, 'gradient_f': relu_grad}
sigmoid_funcs = {'activation_f': sigmoid, 'gradient_f': sigmoid_grad}
softplus_funcs = {'activation_f': softplus, 'gradient_f': softplus_grad}

squared_error_funcs = {'loss_f': squared_error, 'gradient_f': squared_error_grad}
svc_hinge_funcs = {'loss_f': svc_hinge_loss, 'gradient_f': svc_hinge_grad}

### Torch Version

In [29]:
# Activation Functions

def relu(x):
    return torch.maximum(0, x)

def sigmoid(x):
    return 1 / (1 + torch.exp(-x))

def softplus(x):
    return torch.log(1 + torch.exp(x))

In [30]:
# Loss Functions

def squared_error(y, yhat):
    n = len(y)
    l = 1 / n * torch.pow(torch.sum(yhat - y), 2)
    return l

In [31]:
# Gradient Functions

def squared_error_grad(y, yhat):
    n = len(y)
    dl = 2 / n * torch.sum(yhat - y)
    return dl

def sigmoid_grad(z):
    left = torch.pow(1 / (1 + torch.exp(-z)), 2)
    right = -torch.exp(-z)
    return left * right

def softplus_grad(n):
    num = torch.exp(n)
    den = torch.exp(n) + 1
    return num / den

In [47]:
def forward_pass(X, W, V, activation_f):
    Z = torch.matmul(X, W)
    Q = activation_f(Z)
    n = torch.matmul(Q, V)
    p = activation_f(n)
    
    return Z, Q, n, p


def backpropogation(v, eta, activation_gradient_f, loss_gradient_f):
    dl = loss_gradient_f(v['y'], v['p'])
    dp = activation_gradient_f(v['n'])
    dn = v['Q']
    V_i = (dl * dp).matmul(dn)

    dv = activation_gradient_f(v['Z'])
    dz = v['X']
    W_i =  (V_i * dv).t().matmul(dz).t()

    assert v['W'].shape == W_i.shape
    assert v['V'].shape == V_i.shape

    V = v['V'] + (V_i * eta)
    W = v['W'] + (W_i * eta)

    return W, V


# https://numpy.org/doc/stable/reference/generated/numpy.allclose.html
# atol - absolute tolerance (additive)
# rtol - relative tolerance (multiplicative)
# atol more important than rtol here since values are small?
def stopping_condition(W, W_new, V, V_new):
    return torch.allclose(W, W_new, atol=1, rtol=1) and torch.allclose(V, V_new, atol=1, rtol=1)


def fit(X, y, eta, activation_funcs, loss_funcs, max_iter=1e6):
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    X = torch.tensor(X, dtype=torch.float32).to(device)
    y = torch.tensor(y, dtype=torch.float32).to(device)
    W = torch.rand((X.shape[1], 3), device=device)
    V = torch.rand(3, device=device)
    for _ in trange(int(max_iter)):
        Z, Q, n, p = forward_pass(X, W, V, activation_funcs['activation_f'])

        values = {'W': W, 'V': V, 'Z': Z, 'Q': Q, 'n': n, 'p': p, 'X': X, 'y': y}

        W_new, V_new = backpropogation(values, eta, activation_funcs['gradient_f'], loss_funcs['gradient_f'])

        while not stopping_condition(W, W_new, V, V_new):
            print("stopping")
            return W_new, V_new
        
        W = W_new
        V = V_new

    return W, V

sigmoid_funcs = {'activation_f': sigmoid, 'gradient_f': sigmoid_grad}
squared_error_funcs = {'loss_f': squared_error, 'gradient_f': squared_error_grad}
W, V = fit(X_train.values, y_train.values, eta=0.001, activation_funcs=sigmoid_funcs, loss_funcs=squared_error_funcs, max_iter=5e6)


  0%|          | 1840/5000000 [00:16<12:14:04, 113.48it/s]


KeyboardInterrupt: 

## NN with Squared-Error Loss

CHECK DOUBLE SIGMOID... sus

In [112]:

W, V = fit(X_train.values, y_train.values, eta=0.0001, activation_funcs=sigmoid_funcs, loss_funcs=squared_error_funcs, max_iter=1e4)

100%|██████████| 10000/10000 [00:03<00:00, 2788.90it/s]


In [113]:
_, _, _, p = forward_pass(X_test.values, W, V, sigmoid)
predictions = np.where(p > 0.5, 1, 0)

pr = precision(predictions, y_test.values)
re = recall(predictions, y_test.values)
f1 = f1_score(pr, re)
pr, re, f1

  return true_positives / actual_positives


(0.0, nan, nan)

In [114]:
p.max(), p.min()

(0.3798647419819812, 0.3648935234972355)

## NN with SVC Loss

In [None]:
sigmoid_funcs = {'activation_f': sigmoid, 'gradient_f': sigmoid_grad}
svc_funcs = {'loss_f': squared_error, 'gradient_f': squared_error_grad}
W, V = fit(X_train.values, y_train.values, eta=0.00001, activation_funcs=sigmoid_funcs, loss_funcs=squared_error_funcs, max_iter=1e6)