# Fixed Point Precision Study on LSTM-250


This note book is the final stage of the model preparation for benchmarking. The next stage after this notebook is to simply implement the model in C using the knowledge gained from this notebook.
The fixed-point operations defined here tries to simulate the computations performed in SPAR. This might change over-time.

**NOTE:**
The programs/code-snippets in this notebook follows C-like interfaces on purpose.
This is done so that, these code can be easily translated into C for the next stage of study.

**Goals:**
- Load the Model and Dataset from the SQLite3 databases.
- Validate numpy model on the dataset.
- Define Fixed-Point methods.
- Implement model in fixed-point.
- Experiment with Fixed-Point precisions.

# Load the Floating-Point Model and Dataset

Here, the model and the dataset exported by the Export-DB notebook is loaded and verified.

In [None]:
# Delete the cache and import sqlite3 utilities
!rm -rf __pycache__/
from utilsqlite3 import *

## Load the Dataset

In [None]:
# Load and check the dataset table
Dataset_path = './saved/timit_test_data-79.68p.s3db'

table_names = getTableNames(Dataset_path)
print('table_names:', table_names)

# Read the header table
header_records =  getRecords(Dataset_path, 'Header')
header_dict = {}
print('')
for r in header_records: 
    print(r[1:3])
    header_dict[r[1]] = r[2]
    

# Get the table names
Data_table = header_dict['dataset.table']
FeatureSeq_table = header_dict['feature_sequence.table']
Label_table = header_dict['labels.table']
print('')
print('Data_table:', Data_table)
print('Feature_table:', FeatureSeq_table)
print('Label_table:', Label_table)

In [None]:
# Read the label_to_index dictionary
labels_records = getRecords(Dataset_path, Label_table)
print('labels_records:', labels_records)

Label_to_index = {label:index for (index, label) in labels_records}
Index_to_label = {index:label for (index, label) in labels_records}
print('Label_to_index:', Label_to_index)
print('Index_to_label:', Index_to_label)

### Build the Dataset Array

In [None]:
import numpy as np
import dataclasses
from dataclasses import dataclass
from typing import List


# Dataset item class
@dataclass
class DataItem:
    label: str
    label_index: int
    predicted_index: int
    sequence_len: int
    feature_seq: List[List[float]]
        
    def getItemSummary(self):
        return str((self.label, self.label_index, self.predicted_index, self.sequence_len, self.feature_seq.shape))

In [None]:
# Read the features and data-item records then merge them
# Make the sequence_id:feature_sequence map
feat_records = getRecords(Dataset_path, FeatureSeq_table)
feat_records.sort()    # sort by (seq-id, row-index)
seq_rec_map = {}
for fitem in feat_records:    # fitem: (seq-id, row-index, col_0, col_1 ...)
    seq_id, row_index = fitem[0], fitem[1]
    feat_vec = fitem[2:]
    if seq_id not in seq_rec_map:
        seq_rec_map[seq_id] = []
    assert len(seq_rec_map[seq_id]) == row_index, "EROR: Row index not sorted"
    seq_rec_map[seq_id].append(feat_vec)

    
# Read the data-items and put them in DataItem array
Dataset = []
data_records = getRecords(Dataset_path, Data_table)
data_schema = getColNames(Dataset_path, Data_table)
print('data_schema:', data_schema)

In [None]:
for drec in data_records:
    seq_len = drec[4]
    seq_id = drec[5]
    feat_seq = np.array(seq_rec_map[seq_id])
    assert len(feat_seq) == seq_len, f"EROR: Feature Sequence length mismatch, seq_id: {seq_id}"
    label, label_index, pred_index = drec[1:4]   
    item = DataItem(label, label_index, pred_index, seq_len, feat_seq)
    Dataset.append(item)

item = Dataset[0]
print('Dataset:', len(Dataset))
print('item:', item.getItemSummary())

In [None]:
del table_names, seq_id, seq_len, seq_rec_map, row_index, r, pred_index
del label, label_index, labels_records, item, header_records, header_dict
del fitem, feat_vec, feat_seq, feat_records
del drec, data_schema, data_records

## Load the Trained Model

In [None]:
# Load and check the model parameters table
Model_path = './saved/trained-lstm250-79.68p.s3db'

table_names = getTableNames(Model_path)
print('table_names:', table_names)

# Read the header table
header_records =  getRecords(Model_path, 'Header')
Header_dict = {}
print('')
for r in header_records: 
    print(r[1:3])
    Header_dict[r[1]] = r[2]

    
# Extract model performance summary
Accuracy = Header_dict['accuracy']
Correct_count = Header_dict['correct_count']
Model_perf = f'Model Performance:   accuracy: {Accuracy:.2f}%   correct_count: {Correct_count}'  # to be used later

In [None]:
# Read the Hparam table
hparam_records = getRecords(Model_path, 'Hparam_T')
Hparam = {}
for r in hparam_records: 
    print(r[1:3])
    Hparam[r[1]] = r[2]

In [None]:
# Returns a table saved using createMatrixTable() in Export-DB notebook as a list of tuples
def readMatrixTable(db_path, table_name):
    # read the records
    rec_list = getRecords(db_path, table_name)
    # build the matrix
    rec_list.sort()         # sort by row_no (first column)
    matrix = []
    for rec in rec_list:
        matrix.append(rec[1:])  # stripe off the row_no columns
    return matrix


# Returns the weights and biases as a dictionary
def readModelParam(db_path, table_names):
    model_params = {}
    for name in table_names:
        # read the matrix as a list of tuples
        mat = readMatrixTable(db_path, name)
        # Check if it is a matrix or a vector
        if len(mat)==1: is_vector = True
        else: is_vector = False
        # convert to numpy array
        if is_vector: mat = np.array(mat[0])    # make a 1D array for vectors
        else: mat = np.array(mat)
        # save it for returning
        model_params[name] = mat
    return model_params
        

# Get the parameter table names
param_tables = [
    'lstm_weight_ih_l0',
    'lstm_weight_hh_l0',
    'lstm_bias_ih_l0',
    'lstm_bias_hh_l0',
    'lstm_weight_ih_l1',
    'lstm_weight_hh_l1',
    'lstm_bias_ih_l1',
    'lstm_bias_hh_l1',
    'lstm_weight_ih_l2',
    'lstm_weight_hh_l2',
    'lstm_bias_ih_l2',
    'lstm_bias_hh_l2',
    'fc_weight',
    'fc_bias',
]

model_params = readModelParam(Model_path, param_tables)
for k, v in model_params.items():
    print(f'{k:20}:', v.shape, v.dtype)
    

# Delete names
del k, v

In [None]:
# Model Parameters class
@dataclass
class lstm250_Params:
    lstm_weight_ih_l0: np.ndarray
    lstm_weight_hh_l0: np.ndarray
    lstm_bias_ih_l0: np.ndarray
    lstm_bias_hh_l0: np.ndarray
    lstm_weight_ih_l1: np.ndarray
    lstm_weight_hh_l1: np.ndarray
    lstm_bias_ih_l1: np.ndarray
    lstm_bias_hh_l1: np.ndarray
    lstm_weight_ih_l2: np.ndarray
    lstm_weight_hh_l2: np.ndarray
    lstm_bias_ih_l2: np.ndarray
    lstm_bias_hh_l2: np.ndarray
    fc_weight: np.ndarray
    fc_bias: np.ndarray


# Instantiate the model parameter class with float32 datatype
Model_params = lstm250_Params(
    model_params['lstm_weight_ih_l0'].astype(np.float32),
    model_params['lstm_weight_hh_l0'].astype(np.float32),
    model_params['lstm_bias_ih_l0'].astype(np.float32),
    model_params['lstm_bias_hh_l0'].astype(np.float32),
    model_params['lstm_weight_ih_l1'].astype(np.float32),
    model_params['lstm_weight_hh_l1'].astype(np.float32),
    model_params['lstm_bias_ih_l1'].astype(np.float32),
    model_params['lstm_bias_hh_l1'].astype(np.float32),
    model_params['lstm_weight_ih_l2'].astype(np.float32),
    model_params['lstm_weight_hh_l2'].astype(np.float32),
    model_params['lstm_bias_ih_l2'].astype(np.float32),
    model_params['lstm_bias_hh_l2'].astype(np.float32),
    model_params['fc_weight'].astype(np.float32),
    model_params['fc_bias'].astype(np.float32),
)


# Show the parameter info
for field in dataclasses.fields(Model_params):
    field_value = getattr(Model_params, field.name)
    print(f'{field.name:18}:', field_value.shape, field_value.dtype)

In [None]:
del table_names, r, param_tables, model_params, header_records
del field, field_value

# Verify Model on the Dataset

In [None]:
# Sigmoid activation in numpy
def npSigmoid(x):
    return 1 / (1 + np.exp(-x))


# Defining the LSTM cell
# weight_ih: weights for input x
# weight_hh: weights for hidden state h_prev
def npLSTMCell(x, h_prev, c_prev, weight_ih, weight_hh, bias_ih, bias_hh, input_size, hidden_size):
    gates = weight_ih @ x  +  weight_hh @ h_prev  +  bias_ih + bias_hh

    i = npSigmoid(gates[:hidden_size])
    f = npSigmoid(gates[hidden_size:2*hidden_size])
    g = np.tanh(gates[2*hidden_size:3*hidden_size])   # equivalent of ~Ct in above figure
    o = npSigmoid(gates[3*hidden_size:])             # equivalent of Ot in above figure
    c = f * c_prev + i * g   # Ct
    h = o * np.tanh(c)    # Ht
    return h, c


# Implements 3 layers of LSTM
def npLSTM3(x, h_prev, c_prev, lstm_weights, input_size, hidden_size):
    # Perform compatability checks
    layer_count=3
    assert len(h_prev) == layer_count, "You need to provide initial values for all layers"
    assert len(c_prev) == layer_count, "You need to provide initial values for all layers"
    assert len(x) == input_size, "Input size mismatch"
    # Layer-1
    h0_cur, c0_cur = npLSTMCell(x, h_prev[0], c_prev[0], 
                                lstm_weights.lstm_weight_ih_l0, lstm_weights.lstm_weight_hh_l0,
                                lstm_weights.lstm_bias_ih_l0, lstm_weights.lstm_bias_hh_l0,
                                input_size, hidden_size)
    # Layer-2
    h1_cur, c1_cur = npLSTMCell(h0_cur, h_prev[1], c_prev[1], 
                                lstm_weights.lstm_weight_ih_l1, lstm_weights.lstm_weight_hh_l1,
                                lstm_weights.lstm_bias_ih_l1, lstm_weights.lstm_bias_hh_l1,
                                hidden_size, hidden_size)
    # Layer-3
    h2_cur, c2_cur = npLSTMCell(h1_cur, h_prev[2], c_prev[2], 
                                lstm_weights.lstm_weight_ih_l2, lstm_weights.lstm_weight_hh_l2,
                                lstm_weights.lstm_bias_ih_l2, lstm_weights.lstm_bias_hh_l2,
                                hidden_size, hidden_size)
    return (h0_cur, h1_cur, h2_cur), (c0_cur, c1_cur, c2_cur)
    
    

# Implementation of ully connected layer 
def npFClayer(x, weight, bias):
    return weight @ x + bias

In [None]:
# Complete Model using numpy
def npModel(feat_seq):
    # Initial states
    h0_3 = np.zeros((3, Hparam['hidden_size']))
    c0_3 = np.zeros((3, Hparam['hidden_size']))

    # Pass sequence through the LSTM cell
    h_prev, c_prev = h0_3, c0_3
    for tok in feat_seq:
        ht, ct = npLSTM3(tok, h_prev, c_prev, Model_params, Hparam['input_size'], Hparam['hidden_size'])
        h_prev, c_prev = ht, ct
    last_hidden = ht[2]
    out = npFClayer(last_hidden, Model_params.fc_weight, Model_params.fc_bias)
    return out


# Select an item to run through the numpy model for validation
item = Dataset[0]
out_np = npModel(item.feature_seq)
print('out_np:', out_np.shape)

## Validate the Model on the Entire Dataset

In [None]:
# Given a numpy array, returns the index of the maximum value
def get_likely_index_np(nparray):
    return nparray.argmax()


# Driver for manual numpy based model
def predict_npModel(item):
    # Use the model to predict the label of the waveform
    output = npModel(item.feature_seq)
    pred = get_likely_index_np(output)   # indexing to get the prediction from batch
    return pred


# item: (label, label_index, sequence_length, feature_length, predicted_index, feature_seqeunce)
item = Dataset[120]
print(item.getItemSummary())

pred = predict_npModel(item)
print('pred:', pred)

In [None]:
from tqdm.auto import tqdm


run_validation = False

# Validate the Given model on the whole dataset
# ptmodel: set it to True for the PyTorch model
def validateModel(predict_fn):
    dataset = Dataset
    expect_miss = 0      # keeps track of no. of mismatche between prediction in dataset vs model prediction
    total_count = 0
    correct_count = 0
    for item in tqdm(dataset):
        pred_index = item.predicted_index
        lbl_index  = item.label_index
        pred = predict_fn(item)
        if pred != pred_index: expect_miss += 1    # prediction does not match prediction in dataset
        if pred == lbl_index: correct_count += 1   # prediction matched the actual label-index
        total_count += 1
    # Compute and print statistics
    accuracy = (100.0 * correct_count) / total_count
    print(f'Validation accuracy: {accuracy:.2f}%   correct_count: {correct_count}   expected-miss: {expect_miss}   total_count: {total_count}')
    return accuracy, correct_count, expect_miss, total_count


# Run on entire test-dataset
if run_validation:
    accuracy, correct_count, expect_miss, total_count = validateModel(predict_npModel)
    print('Expected', Model_perf)
    del accuracy, correct_count, expect_miss, total_count
else:
    print("INFO: Not running validation here")

# Define Fixed-Point Methods

In [None]:
# Delete the cache before importing
!rm -rf __pycache__/
from AK_FixedPoint import *

# Run Unit tests to make sure everything is okay
!python3 unittest_fxp.py

In [None]:
#Run unit tests to make sure everything is okay
!python3 unittest_fxp_math.py

## Matrix Operations

In [None]:
from math import inf as INF


# Performs matrix-vector multiplication and keeps track of error.
# status_obj: instance of fxp_Status to get status back
# Returns the output vector and a tupel with intermediate results for debugging: resutl, (...)
def fxp_matmul_mv(fxp_mat, fxp_vec, status_obj=None, debug=False):
    # Make sure all assumptions are met
    assert len(fxp_mat._data.shape) == 2, "fxp_mat must be built from a 2D Numpy array"
    assert len(fxp_vec._data.shape) == 1, "fxp_mat must be built from a 1D Numpy array"
    assert fxp_mat._data.shape[1] == fxp_vec._data.shape[0], "Matrix column count not equal vector length"
    
    # Get the data-type parameters
    t_width = fxp_vec._total_width
    f_width = fxp_vec._frac_width
    compute_status = True if status_obj != None else False
    
    # multiply row-wise
    prod_np = (fxp_mat._data * fxp_vec._data)   # multiplying raw values
    # compute error status for multiplying into 2x wider result (less likely to have errors in this step)
    fxp_prod = fxp_makeWider(fxp_mat, 2)  # build 2x wider fxp object
    fxp_prod._data = prod_np              # copy the raw product values
    prod_stat = fxp_fitData(fxp_prod, compute_status)   # now fit within this precision
    if compute_status: 
        if debug: print('prod_stat:', prod_stat)
        fxp_accumulateStatus(status_obj, prod_stat)  # record the multiplication errors
        
    # Now scale down to original precision before accumulation; record error status
    prod_np_down = prod_np >> f_width       # discard lower fraction bits
    fxp_prod_down = fxp_makeSame(fxp_mat)   # fxp object with original precision
    fxp_prod_down._data = prod_np_down
    prod_down_stat = fxp_fitData(fxp_prod_down, compute_status)
    if compute_status: 
        if debug: print('prod_down_stat:', prod_down_stat)
        fxp_accumulateStatus(status_obj, prod_down_stat)  # accumulate the scaling errors
        
    # accumulate along rows; record error status
    accum_np = np.sum(fxp_prod_down._data, axis=1)
    fxp_accum = fxp_makeSame(fxp_vec)
    fxp_accum._data = accum_np
    accum_stat = fxp_fitData(fxp_accum, compute_status)
    if compute_status: 
        if debug: print('accum_stat:', accum_stat)
        fxp_accumulateStatus(status_obj, accum_stat)  # accumulate the scaling errors
    return fxp_accum, (prod_np, fxp_prod, prod_np_down, fxp_prod_down, accum_np, fxp_accum)
    
    
    

In [None]:
# Test
mat_inp = [
    [1, 2, 3, 4],
    [2, 5, 7, 2],
    [9, 3, 5, 0],
]
vec_inp = [4, 8, 1, 2]
mat_np = np.array(mat_inp)
vec_np = np.array(vec_inp)
res_np = mat_np @ vec_np
print(res_np)

total_width = 10
frac_width = 4
stat = fxp_Status(False, 0, -INF, INF, -INF, INF)
fxp_mat_inp, _ = fxp_ctor(total_width, frac_width, mat_np)
fxp_vec_inp, _ = fxp_ctor(total_width, frac_width, vec_np)
fxp_result, dbg = fxp_matmul_mv(fxp_mat_inp, fxp_vec_inp, stat, debug=True)


print('Overall status:', stat)

print('')
fxp_printInfo(fxp_result)
fxp_printValue(fxp_result)

In [None]:
# Check the intermediate results
prod_np, fxp_prod, prod_np_down, fxp_prod_down, accum_np, fxp_accum = dbg
print(mat_np * vec_np)
print('')

print(prod_np >> (2*frac_width))

print('')
fxp_printInfo(fxp_prod)
fxp_printValue(fxp_prod)

print('')
fxp_printInfo(fxp_prod_down)
fxp_printValue(fxp_prod_down)


print('')
fxp_printInfo(fxp_accum)
fxp_printValue(fxp_accum)


In [None]:
del vec_np, vec_inp, total_width, stat, res_np, prod_np, prod_np_down, pred, out_np
del mat_np, mat_inp, item, hparam_records, fxp_vec_inp, fxp_prod, fxp_prod_down, fxp_mat_inp
del fxp_accum, frac_width, dbg, accum_np

## Activation Functions

In [None]:
# The most accurate fxp-sigmoid implementation for the given precision
def fxpSigmoid_accurate(fxp_num):
    x = fxp_getAsFloat(fxp_num)    # convert to float
    fl_sig = npSigmoid(x)          # compute sigmoid in float
    # Convert to fixed point with original precision and return
    fxp_sig, stat = fxp_ctor(fxp_num._total_width, fxp_num._frac_width, fl_sig)
    return fxp_sig, stat


# Test sigmoid activation
inp_vec = [0, 0.1, 0.2, 0.3, 1, 2, 3]

inp_vec_np = np.array(inp_vec)
out_vec_np = npSigmoid(inp_vec_np)
print('out_vec_np:', out_vec_np)

fxp_inp, _ = fxp_ctor(30, 15, inp_vec)
fxp_sig, _ = fxpSigmoid_accurate(fxp_inp)
print('fxp_sig:', fxp_getAsFloat(fxp_sig))


# assertion test
tolerance = 1e-4
diff = np.abs(out_vec_np - fxp_getAsFloat(fxp_sig))
if (diff > tolerance).any(): assert 0, f"EROR: Sigmoid mismatch {diff}"
    

# Delete names
del inp_vec, inp_vec_np, out_vec_np, fxp_inp, fxp_sig, tolerance, diff

In [None]:
# The most accurate fxp-sigmoid implementation for the given precision
def fxpTanh_accurate(fxp_num):
    x = fxp_getAsFloat(fxp_num)    # convert to float
    fl_tanh = np.tanh(x)            # compute sigmoid in float
    # Convert to fixed point with original precision and return
    fxp_tanh, stat = fxp_ctor(fxp_num._total_width, fxp_num._frac_width, fl_tanh)
    return fxp_tanh, stat


# Test sigmoid activation
inp_vec = [0, 0.1, -0.1, 0.2, -0.2, 0.3, -0.3, 1, 2, 3, -1, -2, -3]

inp_vec_np = np.array(inp_vec)
out_vec_np = np.tanh(inp_vec_np)
print('out_vec_np:', out_vec_np)

fxp_inp,  _ = fxp_ctor(30, 15, inp_vec)
fxp_tanh, _ = fxpTanh_accurate(fxp_inp)
print('fxp_tanh:', fxp_getAsFloat(fxp_tanh))


# assertion test
tolerance = 1e-4
diff = np.abs(out_vec_np - fxp_getAsFloat(fxp_tanh))
if (diff > tolerance).any(): assert 0, f"EROR: Tanh mismatch {diff}"
    

# Delete names
del inp_vec, inp_vec_np, out_vec_np, fxp_inp, fxp_tanh, tolerance, diff

# Implement Model in Fixed-Point

In [None]:
from dataclasses import asdict


# Convert Model Parameters to fixed point
Fxp_total_width = 30
Fxp_frac_width = 25


# Given an instance of lstm250_Params, convert the parameters into fixed point numbers
# params: instance of lstm250_Params
def convertParamsFxp(params, total_width, frac_width):
    param_dict = asdict(params)
    fxp_param_dict = {}
    for pname, value in param_dict.items():
        fxp_value, stat = fxp_ctor(total_width, frac_width, value)
        if stat.overflow: f"WARN: Overflow of {pname}, count: {stat.overflow_count}"
        fxp_param_dict[pname] = fxp_value
    fxp_params = lstm250_Params(**fxp_param_dict)
    return fxp_params


Fxp_model_param = convertParamsFxp(Model_params, Fxp_total_width, Fxp_frac_width)
fxp_printInfo(Fxp_model_param.fc_weight)
Fxp_model_param.fc_bias._data

## Model Definition in Fixed-Point

In [None]:
# Defining the LSTM cell
# weight_ih: weights for input x
# weight_hh: weights for hidden state h_prev
def fxp_LSTMCell(x, h_prev, c_prev, weight_ih, weight_hh, bias_ih, bias_hh, input_size, hidden_size):
    # Compute gates vector
    # gates = weight_ih @ x  +  weight_hh @ h_prev  +  bias_ih + bias_hh
    x1, _ = fxp_matmul_mv(weight_ih, x)
    x2, _ = fxp_matmul_mv(weight_hh, h_prev)
    x3    = fxp_add(x1, x2)
    x4    = fxp_add(bias_ih, bias_hh)
    gates = fxp_add(x3, x4)
    
    # Separate the gates
    ig = fxp_copy(gates)
    fg = fxp_copy(gates)
    gg = fxp_copy(gates)
    og = fxp_copy(gates)
    ig._data = ig._data[:hidden_size]
    fg._data = fg._data[hidden_size:2*hidden_size]
    gg._data = gg._data[2*hidden_size:3*hidden_size]
    og._data = og._data[3*hidden_size:]

    # Compute gate outputs
    i, _ = fxpSigmoid_accurate(ig)  # npSigmoid(gates[:hidden_size])
    f, _ = fxpSigmoid_accurate(fg)  # npSigmoid(gates[hidden_size:2*hidden_size])
    g, _ = fxpTanh_accurate(gg)     # np.tanh(gates[2*hidden_size:3*hidden_size])   # equivalent of ~Ct in above figure
    o, _ = fxpSigmoid_accurate(og)  # npSigmoid(gates[3*hidden_size:])             # equivalent of Ot in above figure
    
    # compute new states
    # c = f * c_prev + i * g   # Ct
    x1 = fxp_mult(f, c_prev)
    x2 = fxp_mult(i, g)
    c = fxp_add(x1, x2)
    # h = o * np.tanh(c)    # Ht
    x1, _ = fxpTanh_accurate(c)
    h = fxp_mult(o, x1)
    return h, c



# Implements 3 layers of LSTM
def fxp_LSTM3(x, h_prev, c_prev, lstm_weights, input_size, hidden_size):
    # Perform compatability checks
    layer_count=3
    assert len(h_prev) == layer_count, "You need to provide initial values for all layers"
    assert len(c_prev) == layer_count, "You need to provide initial values for all layers"
    assert len(x._data) == input_size, "Input size mismatch"
    # Layer-1
    h0_cur, c0_cur = fxp_LSTMCell(x, h_prev[0], c_prev[0], 
                                lstm_weights.lstm_weight_ih_l0, lstm_weights.lstm_weight_hh_l0,
                                lstm_weights.lstm_bias_ih_l0, lstm_weights.lstm_bias_hh_l0,
                                input_size, hidden_size)
    # Layer-2
    h1_cur, c1_cur = fxp_LSTMCell(h0_cur, h_prev[1], c_prev[1], 
                                lstm_weights.lstm_weight_ih_l1, lstm_weights.lstm_weight_hh_l1,
                                lstm_weights.lstm_bias_ih_l1, lstm_weights.lstm_bias_hh_l1,
                                hidden_size, hidden_size)
    # Layer-3
    h2_cur, c2_cur = fxp_LSTMCell(h1_cur, h_prev[2], c_prev[2], 
                                lstm_weights.lstm_weight_ih_l2, lstm_weights.lstm_weight_hh_l2,
                                lstm_weights.lstm_bias_ih_l2, lstm_weights.lstm_bias_hh_l2,
                                hidden_size, hidden_size)
    return (h0_cur, h1_cur, h2_cur), (c0_cur, c1_cur, c2_cur)
    
    

# Implementation of ully connected layer 
def fxp_FClayer(x, weight, bias):
    x1, _ = fxp_matmul_mv(weight, x)
    x2 = fxp_add(x1, bias)
    return x2



def lstm250_forward_fxp(params, feat_seq, debug=False):
    # Initial states
    h0_3 = np.zeros((3, Hparam['hidden_size']))
    c0_3 = np.zeros((3, Hparam['hidden_size']))
    # convert to fixed-point
    h0_3 = [fxp_ctor(Fxp_total_width, Fxp_frac_width, h0)[0]  for h0 in h0_3]
    c0_3 = [fxp_ctor(Fxp_total_width, Fxp_frac_width, c0)[0]  for c0 in c0_3]

    # Pass sequence through the LSTM cell
    h_prev, c_prev = h0_3, c0_3
    for tok in feat_seq:
        ht, ct = fxp_LSTM3(tok, h_prev, c_prev, Fxp_model_param, Hparam['input_size'], Hparam['hidden_size'])
        h_prev, c_prev = ht, ct
    last_hidden = ht[2]
    out = fxp_FClayer(last_hidden, Fxp_model_param.fc_weight, Fxp_model_param.fc_bias)
    return out


# Uses the forward pass and converts the result into predicted_index
def lstm150_predict_fxp(params, feat_seq_fxp, debug=False):
    out_vec = lstm250_forward_fxp(Fxp_model_param, feat_seq_fxp, debug=debug)
    return np.argmax(out_vec._data)   # return the index of the highest probable class

### Unit Tests on Model Parts

In [None]:
def test01_fxp_LSTMCell():
    print('\n---- test01_fxp_LSTMCell ----')
    # Initial states
    h0 = np.zeros(Hparam['hidden_size'])
    c0 = np.zeros(Hparam['hidden_size'])
    item = Dataset[0]
    # Compute floating point results
    x = item.feature_seq[0]
    h, c = npLSTMCell(x, h0, c0, Model_params.lstm_weight_ih_l0, 
                                 Model_params.lstm_weight_hh_l0, 
                                 Model_params.lstm_bias_ih_l0,
                                 Model_params.lstm_bias_hh_l0,
                                 Hparam['input_size'],
                                 Hparam['hidden_size'])   
    # Compute fixed point results
    twidth, fwidth = Fxp_total_width, Fxp_frac_width
    h0, _ = fxp_ctor(twidth, fwidth, h0)
    c0, _ = fxp_ctor(twidth, fwidth, c0)
    x, _  = fxp_ctor(twidth, fwidth, item.feature_seq[0])
    hx, cx = fxp_LSTMCell(x, h0, c0, Fxp_model_param.lstm_weight_ih_l0, 
                                     Fxp_model_param.lstm_weight_hh_l0, 
                                     Fxp_model_param.lstm_bias_ih_l0,
                                     Fxp_model_param.lstm_bias_hh_l0,
                                     Hparam['input_size'],
                                     Hparam['hidden_size'])  
    # Compare
    tolerance = 1e-5
    diff_h = np.abs(h - fxp_getAsFloat(hx))
    print('diff_h min, max:', np.min(diff_h), np.max(diff_h))
    if (diff_h > tolerance).any(): assert 0, f"EROR: diff_h.max: {np.max(diff_h)}"

    
    
    
def test02_fxp_FClayer():
    print('\n---- test02_fxp_FClayer ----')
    item = Dataset[0]
    x = item.feature_seq[0]
    wt = Model_params.lstm_weight_ih_l0
    bs = Model_params.lstm_bias_ih_l0
    
    # compute floating-point result
    fc_fl = npFClayer(x, wt, bs)
    
    # compute fixed-point result
    x , _ = fxp_ctor(Fxp_total_width, Fxp_frac_width, x)
    wt, _ = fxp_ctor(Fxp_total_width, Fxp_frac_width, wt)
    bs, _ = fxp_ctor(Fxp_total_width, Fxp_frac_width, bs)
    fc_fxp = fxp_FClayer(x, wt, bs)
    
    # Compare
    tolerance = 1e-5
    diff_fc = np.abs(fc_fl - fxp_getAsFloat(fc_fxp))
    print('diff_fc min, max:', np.min(diff_fc), np.max(diff_fc))
    if (diff_fc > tolerance).any(): assert 0, f"EROR: diff_fc.max: {np.max(diff_h)}"

        
        
        
def test03_fxp_LSTM3():
    print('\n---- test03_fxp_LSTM3 ----')
    h0_3 = np.zeros((3, Hparam['hidden_size']))
    c0_3 = np.zeros((3, Hparam['hidden_size']))
    x = Dataset[2].feature_seq[0]
    # compute floating point result
    h3, c3 = npLSTM3(x, h0_3, c0_3, Model_params, Hparam['input_size'], Hparam['hidden_size'])
    all_fl = h3 + c3
    
    # Compute fixed point result
    x, _ = fxp_ctor(Fxp_total_width, Fxp_frac_width, x)
    h0_3 = [fxp_ctor(Fxp_total_width, Fxp_frac_width, h0)[0]  for h0 in h0_3]
    c0_3 = [fxp_ctor(Fxp_total_width, Fxp_frac_width, c0)[0]  for c0 in c0_3]
    h3x, c3x = fxp_LSTM3(x, h0_3, c0_3, Fxp_model_param, Hparam['input_size'], Hparam['hidden_size'])
    all_fxp = h3x + c3x
    
    # Compare
    tolerance = 1e-5
    for i in range(len(all_fl)):
        diff = np.abs(all_fl[i] - fxp_getAsFloat(all_fxp[i]))    
        print(f'diff[{i}] min, max:', np.min(diff), np.max(diff))
        if (diff > tolerance).any(): assert 0, f"EROR: diff[{i}].max: {np.max(diff)}"
    

def test04_fxp_lstm250_forward():
    print('\n---- test04_fxp_lstm250_forward ----')
    item = Dataset[0]
    feat_seq = item.feature_seq[:15]
    
    # Compute floating point result
    out_fl = npModel(feat_seq)
    
    # Compute fixed-point result
    feat_seq_fxp = [fxp_ctor(Fxp_total_width, Fxp_frac_width, feat_vec)[0] for feat_vec in feat_seq]
    out_fxp = lstm250_forward_fxp(Fxp_model_param, feat_seq_fxp)
    
    # Compare
    tolerance = 1e-4
    assert out_fl.shape == out_fxp._data.shape, "EROR: Output shape mismatch"
    diff = np.abs(out_fl - fxp_getAsFloat(out_fxp))
    print('diff min, max:', np.min(diff), np.max(diff))
    if (diff > tolerance).any(): assert 0, f"EROR: diff.max: {np.max(diff)}"

        
def test05_fxp_lstm250_predict():
    print('\n---- test05_fxp_lstm250_predict ----')
    item = Dataset[0]
    pred_fl = predict_npModel(item)
    print('pred_fl:', pred_fl)
    
    # Compute fixed point prediction
    feat_seq = item.feature_seq
    feat_seq_fxp = [fxp_ctor(Fxp_total_width, Fxp_frac_width, feat_vec)[0] for feat_vec in feat_seq]
    pred_fxp = lstm150_predict_fxp(Fxp_model_param, feat_seq_fxp)
    print('pred_fxp:', pred_fxp)
    
    
    
# Run tests
test01_fxp_LSTMCell()
test02_fxp_FClayer()
test03_fxp_LSTM3()
test04_fxp_lstm250_forward()
test05_fxp_lstm250_predict()