In [6]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import Sequential
from tensorflow.keras.losses import MeanSquaredError, BinaryCrossentropy
from tensorflow.keras.activations import sigmoid
import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
tf.autograph.set_verbosity(0)

2023-05-18 17:54:45.989681: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [76]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

# def sigmoid_derivative(z):
#     return np.exp(-z) / ((1 + np.exp(-z)) ** 2)

def sigmoid_derivative_2(a):
    return a * (1 - a)

def cost_squared(y_expected, y_predicted):
    return (y_expected - y_predicted) ** 2 / 2

def cost_squared_derivative(y_expected, y_predicted):
    return -(y_expected - y_predicted)

def cost_inner(y_expected, y_predicted):
    return y_expected * np.log(y_predicted) + (1 - y_expected) * np.log(1 - y_predicted)

def cost_derivative(y_expected, y_predicted):
    top = y_expected - y_predicted
    bottom = y_predicted * (1 - y_predicted)
    return top / bottom

# I expect x_train to be an array of [x_1, x_2] vectors
# y_train is an array of scalar values
# W_A, W_B and W_C are [w1, w2] vectors
def model(x_train, y_train, W_A, W_B, W_C, training_rate):    
    B_A = B_B = B_C = 0
    m = x_train.shape[0]
    J = 0
    dwa = np.zeros(2)
    dwb = np.zeros(2)
    dwc = np.zeros(2)

    for i in range(m):
        a0_1 = x_train[i][0]
        a0_2 = x_train[i][1]

        z1_1 = W_A[0] * a0_1 + W_A[1] * a0_2 + B_A
        z1_2 = W_B[0] * a0_1 + W_B[1] * a0_2 + B_B

        dz1_1_da0_1 = W_A[0]
        dz1_1_da0_2 = W_A[1]
        dz1_2_db0_1 = W_B[0]
        dz1_2_db0_2 = W_B[1]

        a1_1 = sigmoid(z1_1)
        a1_2 = sigmoid(z1_2)

        # da1_1_z1_1 = sigmoid_derivative(z1_1)
        # da1_2_z1_2 = sigmoid_derivative(z1_2)
        da1_1_z1_1 = sigmoid_derivative_2(a1_1)
        da1_2_z1_2 = sigmoid_derivative_2(a1_2)

        z2_1 = W_C[0] * a1_1 + W_C[1] * a1_2 + B_C

        dz2_1_a1_1 = W_C[0]
        dz2_1_a1_2 = W_C[1]

        dz2_1_w_c0 = a1_1
        dz2_1_w_c1 = a1_2

        a2_1 = sigmoid(z2_1)
        
        # da2_1_z2_1 = sigmoid_derivative(z2_1)
        da2_1_z2_1 = sigmoid_derivative_2(a2_1)

        # diff = -1 * cost_inner(y_train[i], a2_1)
        # diff = a2_1 - y_train[i]
        # J = J + diff / m
        # dDiff_da2_1 = cost_derivative(y_train[i], a2_1)
        # dDiff_da2_1 = diff

        # lets try squated error
        # diff_sq = cost_squared(y_train[i], a2_1)
        # J = J + diff_sq / m

        diff_plain = a2_1 - y_train[i]
        J = J + diff_plain / m

        dj_squared = cost_squared_derivative(y_train[i], a2_1)
        dDiff_da2_1 = dj_squared

        dj_dwc_0 = dDiff_da2_1 * da2_1_z2_1 * dz2_1_w_c0
        dj_dwc_1 = dDiff_da2_1 * da2_1_z2_1 * dz2_1_w_c1
        dwc[0] = dwc[0] + dj_dwc_0 / m
        dwc[1] = dwc[1] + dj_dwc_1 / m

        dj_dwb_0 = dDiff_da2_1 * da2_1_z2_1 * dz2_1_a1_1 * da1_1_z1_1 * dz1_2_db0_1
        dj_dwb_1 = dDiff_da2_1 * da2_1_z2_1 * dz2_1_a1_2 * da1_2_z1_2 * dz1_2_db0_2
        dwb[0] = dwb[0]  + dj_dwb_0 / m
        dwb[1] = dwb[1]  + dj_dwb_1 / m

        dj_dwa_0 = dDiff_da2_1 * da2_1_z2_1 * dz2_1_a1_1 * da1_1_z1_1 * dz1_1_da0_1
        dj_dwa_1 = dDiff_da2_1 * da2_1_z2_1 * dz2_1_a1_2 * da1_2_z1_2 * dz1_1_da0_2
        dwa[0] = dwa[0]  + dj_dwa_0 / m
        dwa[1] = dwa[1]  + dj_dwa_1 / m

    return (W_A - training_rate * dwa, W_B - training_rate * dwb, W_C - training_rate * dwc, J)


In [77]:
def predict(x1, x2, W_A, W_B, W_C):    
    B_A = B_B = B_C = 0

    a0_1 = x1
    a0_2 = x2

    z1_1 = W_A[0] * a0_1 + W_A[1] * a0_2 + B_A
    z1_2 = W_B[0] * a0_1 + W_B[1] * a0_2 + B_B

    a1_1 = sigmoid(z1_1)
    a1_2 = sigmoid(z1_2)

    z2_1 = W_C[0] * a1_1 + W_C[1] * a1_2 + B_C

    a2_1 = sigmoid(z2_1)

    return a2_1

In [67]:
def create_coffee_dataset():
    """
    Copied from lab_coffee_utils
    """
    rng = np.random.default_rng(2)
    X = rng.random(400).reshape(-1,2)
    X[:,1] = X[:,1] * 4 + 11.5          # 12-15 min is best
    X[:,0] = X[:,0] * (285-150) + 150  # 350-500 F (175-260 C) is best
    Y = np.zeros(len(X))
    
    i=0
    for t,d in X:
        y = -3/(260-175)*t + 21
        if (t > 175 and t < 260 and d > 12 and d < 15 and d<=y ):
            Y[i] = 1
        else:
            Y[i] = 0
        i += 1

    return (X, Y.reshape(-1,1))

In [68]:
x_train, y_train = create_coffee_dataset()
print(x_train.shape, y_train.shape)

(200, 2) (200, 1)


In [78]:
W_A = np.random.randn(2) #np.array([0.3092, 0.134])
W_B = np.random.randn(2) #np.array([0.233, 0.7862])
W_C = np.random.randn(2) #np.array([0.3674, 0.63])
J = 0

norm_l = tf.keras.layers.Normalization(axis=-1)
norm_l.adapt(x_train)

sample_data = norm_l(x_train).numpy()

epochs = 5000
training_rate = 0.01

for e in range(epochs):
    W_A, W_B, W_C, J = model(sample_data, y_train, W_A, W_B, W_C, training_rate)
    # if e % 100 == 0:
    #     print(f'J[{e}] = {J}')

print(W_A, W_B, W_C, J)

[-1.64939025  4.72355177] [1.71382025 0.50859957] [-0.68456199 -1.05665233] [0.09039993]


In [81]:
test_data = np.array([
    [200,13.9],  # postive example
    [200,17] # negative example
])   

sample1 = norm_l(np.array([[200,0.1]])).numpy()[0]
result1 = predict(sample1[0], sample1[1], W_A, W_B, W_C)

sample2 = norm_l(np.array([[200,140]])).numpy()[0]
result2 = predict(sample2[0], sample2[1], W_A, W_B, W_C)

print(result1, result2)

0.49970873924664855 0.1491587599901085
