In this notebook we will go through the coffee roasting problem discussed in the course. First implementing it using tensorflow then only numpy.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential

In [2]:
def coffee_roasting_dataset():
    rng = np.random.default_rng(42)
    X = rng.random(400).reshape(-1,2)   # X will be 2-d array of shape (200,2)
    X[:,0] = X[:,0] * (285-150) + 150   # scaling to range (150,285) - temparature in C
    X[:,1] = X[:,1] * 5 + 11            # scaling to range (11,16) - time in mins
    Y = np.zeros(len(X))

    i=0
    for temp,time in X:
        y = -3/(260-175)*temp + 21      # (175-260,12-15) is good temp,time range, y=mx+c line defining upper bounds
        if (temp > 175 and temp < 260 and time > 12 and time < 15 and time<=y ):
            Y[i] = 1
        else:
            Y[i] = 0
        i += 1

    return (X, Y.reshape(-1,1))

In the above code cell, the coffee data we will work with is generated.
1. First we take 400 random numbers between 0 and 1, reshape it into 200 samples with 2 features, that wil be temparature and time duration.
2. The random numbers are scaled to take values between desired range for temparature and time duration.
3. We find the line that will help us define the upper bound by solving the following equations: $$ 12 = 260m+c \\ 15= 175m+c $$
4. We check if temp and time are in desired range, and put target labels on it.

In [3]:
def normalisation(X):
    norm1 = tf.keras.layers.Normalization(axis=1)
    norm1.adapt(X)
    Xnorm = norm1(X)
    return Xnorm

In [4]:
def normalization_summary(X,Xn):

    result = f"""
    Before normalisation: \n
    Minimum temparature: {np.min(X[:,0]):.2f}, Maximum temparature: {np.max(X[:,0]):.2f} \n
    Minimum duration: {np.min(X[:,1]):.2f}, Maximum duration: {np.max(X[:,1]):.2f}\n

    After normalisation: \n
    Minimum temparature: {np.min(Xn[:,0]):.2f}, Maximum temparature: {np.max(Xn[:,0]):.2f} \n
    Minimum duration: {np.min(Xn[:,1]):.2f}, Maximum duration: {np.max(Xn[:,1]):.2f}\n"""

    return result

### Data

In [5]:
X,Y = coffee_roasting_dataset()
print(X.shape, Y.shape)

(200, 2) (200, 1)


In [6]:
Xn = normalisation(X)
print(normalization_summary(X,Xn))


    Before normalisation: 

    Minimum temparature: 151.66, Maximum temparature: 282.24 

    Minimum duration: 11.04, Maximum duration: 15.96


    After normalisation: 

    Minimum temparature: -1.65, Maximum temparature: 1.74 

    Minimum duration: -1.77, Maximum duration: 1.70



In [7]:
# below we will tile to increase training sample size
Xt = np.tile(Xn,(1000,1))
Yt= np.tile(Y,(1000,1))
print(Xt.shape, Yt.shape)

(200000, 2) (200000, 1)


## Tensorflow implementation

In [8]:
tf.random.set_seed(1234)
model = Sequential(
    [
        tf.keras.Input(shape=(2,)),
        Dense(3, activation='sigmoid', name = 'layer1'),
        Dense(1, activation='sigmoid', name = 'layer2')
     ]
)

In [9]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 layer1 (Dense)              (None, 3)                 9         
                                                                 
 layer2 (Dense)              (None, 1)                 4         
                                                                 
Total params: 13 (52.00 Byte)
Trainable params: 13 (52.00 Byte)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [10]:
# The following weights are isntantiated in model
W1, b1 = model.get_layer("layer1").get_weights()
W2, b2 = model.get_layer("layer2").get_weights()
print(f"W1{W1.shape}:\n", W1, f"\n b1{b1.shape}:", b1)
print(f"W2{W2.shape}:\n", W2, f"\n b2{b2.shape}:", b2)

W1(2, 3):
 [[-0.01135743  0.17004311 -0.9350109 ]
 [-0.51178366  0.665051    0.06859589]] 
 b1(3,): [0. 0. 0.]
W2(3, 1):
 [[-0.89698267]
 [-0.83132803]
 [ 0.02676749]] 
 b2(1,): [0.]


In [11]:
model.compile(
    loss = tf.keras.losses.BinaryCrossentropy(),
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.01),
)

model.fit(Xt,Yt,epochs=10,)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7a9bf57a6980>

In [12]:
# Updating weights after model fit
W1, b1 = model.get_layer("layer1").get_weights()
W2, b2 = model.get_layer("layer2").get_weights()
print("W1:\n", W1, "\n b1:", b1)
print("W2:\n", W2, "\n b2:", b2)

W1:
 [[ -0.4007091  18.571972  -23.231821 ]
 [-11.409808   19.832968   -0.4668565]] 
 b1: [-12.909796    1.5053629 -25.76176  ]
W2:
 [[-46.954582]
 [-62.214245]
 [-58.76812 ]] 
 b2: [21.383799]


We will use the above values of weights and bias for testing our numpy implementaiton.

### Predictions:

In [13]:
X_test = np.array([
    [200,13.9],  # postive example
    [200,17]])   # negative example
X_testn = normalisation(X_test)
predictions = model.predict(X_testn)
print("Predictions = \n", predictions)

Predictions = 
 [[9.9999732e-01]
 [1.8516688e-18]]


In [14]:
yhat = (predictions >= 0.5).astype(int)
print(f"decisions = \n{yhat}")

decisions = 
[[1]
 [0]]


## Numpy implementation

We will use the same code for dataset generation and normalisation.

In [15]:
def g(z):
    sig = 1/(1 + np.exp(-z))
    return sig

In [16]:
def my_dense(a_in, W, b, g):
    """
    Computes a dense layer
    Args:
      a_in (ndarray (n, )) : Data, 1 example
      W    (ndarray (n,j)) : Weight matrix, n features per unit, j units
      b    (ndarray (j, )) : bias vector, j units
      g    activation function (e.g. sigmoid, relu..)
    Returns
      a_out (ndarray (j,))  : j units
    """
    units = W.shape[1]        # number of neurons (units)
    a_out = np.zeros(units)
    for j in range(units):
        w = W[:,j]            # column-wise, neuron by neuron
        z = np.dot(w, a_in) + b[j]
        a_out[j] = g(z)       # sigmoid activation
    return(a_out)

In [17]:
def my_sequential(x, W1, b1, W2, b2):
    a1 = my_dense(x,  W1, b1, g)
    a2 = my_dense(a1, W2, b2, g)
    return(a2)

In [18]:
def my_predict(X, W1, b1, W2, b2):
    m = X.shape[0]
    p = np.zeros((m,1))
    for i in range(m):
        p[i,0] = my_sequential(X[i], W1, b1, W2, b2)
    return p

### Testing the numpy code
We will use the testing set above and model weights to check if numpy implementation is correct. It should give the same value as predicted using tf.

In [19]:
X_test = np.array([
    [200,13.9],  # postive example
    [200,17]])   # negative example
X_testn = normalisation(X_test)

In [20]:
my_predictions = my_predict(X_testn,W1,b1,W2,b2)
print("Predictions = \n", my_predictions)

yhat = (my_predictions >= 0.5).astype(int)
print(f"decisions = \n{yhat}")

Predictions = 
 [[9.99997288e-01]
 [1.85166542e-18]]
decisions = 
[[1]
 [0]]


Predictions are the same.

## Numpy Vectorized implementation

In [21]:
def my_vec_dense(a_in, W, b, g):
    """
    Computes a dense layer faster, because this is vectorized.
    Args:
      a_in (ndarray (n, )) : Data, 1 example
      W    (ndarray (n,j)) : Weight matrix, n features per unit, j units
      b    (ndarray (j, )) : bias vector, j units
      g    activation function (e.g. sigmoid, relu..)
    Returns
      a_out (ndarray (j,))  : j units
    """
    z = a_in@W +b
    a_out = g(z)
    return a_out

The following functions are same except they use vectorized dense (``my_vc_dense``) now.

In [22]:
def my_vec_sequential(x, W1, b1, W2, b2):
    a1 = my_vec_dense(x,  W1, b1, g)
    a2 = my_vec_dense(a1, W2, b2, g)
    return(a2)

In [23]:
def my_vec_predict(X, W1, b1, W2, b2):
    m = X.shape[0]
    p = np.zeros((m,1))
    for i in range(m):
        p[i,0] = my_vec_sequential(X[i], W1, b1, W2, b2)
    return p

Now we will test the vectorized code

In [24]:
X_testn.numpy() # converting to numpy array is not necessary here

array([[ 0., -1.],
       [ 0.,  1.]], dtype=float32)

In [25]:
my_vec_predictions = my_vec_sequential(X_testn.numpy(), W1, b1, W2, b2)
print("Predictions = \n", my_vec_predictions)

yhat = (my_vec_predictions >= 0.5).astype(int)
print(f"decisions = \n{yhat}")

Predictions = 
 [[9.9999726e-01]
 [1.8516690e-18]]
decisions = 
[[1]
 [0]]
