[![Fixel Algorithms](https://fixelalgorithms.co/images/CCExt.png)](https://fixelalgorithms.gitlab.io)

# AI Program

## Deep Learning - Convolution Neural Network - MNIST Stroke

> Notebook by:
> - Royi Avital RoyiAvital@fixelalgorithms.com

## Revision History

| Version | Date       | User        |Content / Changes                                                   |
|---------|------------|-------------|--------------------------------------------------------------------|
| 1.0.000 | 30/08/2025 | Royi Avital | First version                                                      |

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/FixelAlgorithmsTeam/FixelCourses/blob/master/AIProgram/2024_02/0002PointLine.ipynb)

In [None]:
# Import Packages

# General Tools
import numpy as np
import scipy as sp
import pandas as pd

# Scientific Python

# Image Processing & Computer Vision
import skimage as ski

# Machine Learning
from sklearn.datasets import fetch_openml

# Deep Learning
import torch
import torch.nn            as nn
import torch.nn.functional as F
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader, Dataset
import torchinfo
import torchvista

# Miscellaneous
import json
import math
import os
import pickle
from platform import python_version
import random

import onedrivedownloader

# Typing 
from typing import Callable, Dict, List, Literal, Optional, Self, Tuple
from numpy.typing import NDArray

# Visualization
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
import seaborn as sns

# Jupyter
from IPython import get_ipython

## Notations

* <font color='red'>(**?**)</font> Question to answer interactively.
* <font color='blue'>(**!**)</font> Simple task to add code for the notebook.
* <font color='green'>(**@**)</font> Optional / Extra self practice.
* <font color='brown'>(**#**)</font> Note / Useful resource / Food for thought.

Code Notations:

```python
someVar    = 2; #<! Notation for a variable
vVector    = np.random.rand(4) #<! Notation for 1D array
mMatrix    = np.random.rand(4, 3) #<! Notation for 2D array
tTensor    = np.random.rand(4, 3, 2, 3) #<! Notation for nD array (Tensor)
tuTuple    = (1, 2, 3) #<! Notation for a tuple
lList      = [1, 2, 3] #<! Notation for a list
dDict      = {1: 3, 2: 2, 3: 1} #<! Notation for a dictionary
oObj       = MyClass() #<! Notation for an object
dfData     = pd.DataFrame() #<! Notation for a data frame
dsData     = pd.Series() #<! Notation for a series
hObj       = plt.Axes() #<! Notation for an object / handler / function handler
```

### Code Exercise

 - Single line fill

```python
valToFill = ???
```

 - Multi Line to Fill (At least one)

 ```python
 # You need to start writing
 ?????
 ```

 - Section to Fill

```python
#===========================Fill This===========================#
# 1. Explanation about what to do.
# !! Remarks to follow / take under consideration.
mX = ???

?????
#===============================================================#
```

In [None]:
# Configuration
# %matplotlib inline

# warnings.filterwarnings('ignore')

seedNum = 512
np.random.seed(seedNum)
random.seed(seedNum)

# Matplotlib default color palette
lMatPltLibclr = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']
# sns.set_theme() #>! Apply SeaBorn theme
# sns.set_palette('tab10')

runInGoogleColab = 'google.colab' in str(get_ipython())

In [None]:
# Constants

FIG_SIZE_DEF    = (8, 8)
ELM_SIZE_DEF    = 50
CLASS_COLOR     = ('b', 'r')
EDGE_COLOR      = 'k'
MARKER_SIZE_DEF = 10
LINE_WIDTH_DEF  = 2

TU_MNIST_IMG_SIZE = (28, 28)

π = math.pi

BASE_NAME   = 'FixelCourses'
DATA_FOLDER = 'DataSets'

BASE_PATH = os.getcwd()[:(len(os.getcwd()) - (os.getcwd()[::-1].lower().find(BASE_NAME.lower()[::-1])))]
DATA_PATH = os.path.join(BASE_PATH, DATA_FOLDER)

In [None]:
# Course Packages


In [None]:
# Auxiliary Functions

def ParseMnistStrokeSample( sampleIdx: int, /, *, dataPath: str = '.', dataSet: Literal['Train', 'Test'] = 'Train' ) -> Tuple[List[NDArray], int]:

    fileName = f'{dataSet}_{(sampleIdx):05d}.json' #<! Filenames are 0-59_999

    with open(os.path.join(dataPath, fileName), 'r') as hFile:
        dData = json.load(hFile)
    
    lS       = dData['strokes']
    labelIdx = dData['label']

    numStrokes = len(lS)
    lXy        = []
    for ii in range(numStrokes):
        lSi = lS[ii]
        numPts = len(lSi)
        mXy = np.zeros(shape = (numPts, 2))
        for jj in range(numPts):
            mXy[jj] = lSi[jj]['x'], lSi[jj]['y']
        
        lXy.append(mXy)

    return lXy, labelIdx

def PlotStroke( lXy: List[NDArray], /, *, labelIdx: Optional[int] = None, hA: Optional[plt.Axes] = None, tFigSize: Tuple[float, float] = (6.4, 4.8) ) -> plt.Axes:

    if hA is None:
        hF, hA = plt.subplots(figsize = tFigSize)

    hA.set_aspect('equal')

    numStrokes = len(lXy)

    for ii in range(numStrokes):
        hA.scatter(lXy[ii][:, 0], lXy[ii][:, 1], label = f'Stroke: {(ii + 1):02d}')

    tuYLim = hA.get_ylim()
    if tuYLim[1] > tuYLim[0]:
        hA.invert_yaxis()
    
    hA.set_xlim((0, 27))
    hA.set_ylim((27, 0))

    if labelIdx is not None:
        hA.set_title(f'Label: {labelIdx}')
    
    return hA

def TransformStroke( lXy: List[NDArray], numGridPts: int, /, *, interpCls: Callable = sp.interpolate.make_smoothing_spline ) -> NDArray:

    mXY = np.concatenate(lXy, axis = 0) #<! Concatenate all strokes into a single "stroke"
    vT  = np.linspace(0, 1, mXY.shape[0]) #<! Parametric curve
    vTT = np.linspace(0, 1, numGridPts) #<! Parametric curve to be aligned to

    oIntrp = interpCls(vT, mXY) #<! Interpolator

    mXY = oIntrp(vTT)

    return mXY

def PlotMnistImages( mX: NDArray, vY: NDArray, numRows: int, numCols: Optional[int] = None, tuImgSize: Tuple = (28, 28), randomChoice: bool = True, lClasses: Optional[List] = None, hF: Optional[plt.Figure] = None ) -> plt.Figure:

    numSamples  = mX.shape[0]
    numPx       = mX.shape[1]

    if numCols is None:
        numCols = numRows

    tFigSize = (numCols * 3, numRows * 3)

    if hF is None:
        hF, hA = plt.subplots(numRows, numCols, figsize = tFigSize)
    else:
        hA = hF.axes
    
    hA = np.atleast_1d(hA) #<! To support numImg = 1
    hA = hA.flat
    
    for kk in range(numRows * numCols):
        idx = np.random.choice(numSamples) if randomChoice else kk
        mI  = np.reshape(mX[idx, :], tuImgSize)
    
        # hA[kk].imshow(mI.clip(0, 1), cmap = 'gray')
        if len(tuImgSize) == 2:
            hA[kk].imshow(mI, cmap = 'gray')
        elif len(tuImgSize) == 3:
            hA[kk].imshow(mI)
        else:
            raise ValueError(f'The length of the image size tuple is {len(tuImgSize)} which is not supported')
        hA[kk].tick_params(axis = 'both', left = False, top = False, right = False, bottom = False, 
                           labelleft = False, labeltop = False, labelright = False, labelbottom = False)
        if lClasses is None:
            hA[kk].set_title(f'Index = {idx}, Label = {vY[idx]}')
        else:
            hA[kk].set_title(f'Index = {idx}, Label = {lClasses[vY[idx]]}')
    
    return hF

def PlotLabelsHistogram( vY: NDArray, hA: Optional[plt.Axes] = None, lClass: Optional[List] = None, xLabelRot: Optional[int] = None ) -> plt.Axes:

    if hA is None:
        hF, hA = plt.subplots(figsize = (8, 6))
    
    vLabels, vCounts = np.unique(vY, return_counts = True)

    hA.bar(vLabels, vCounts, width = 0.9, align = 'center')
    hA.set_title('Histogram of Classes / Labels')
    hA.set_xlabel('Class')
    hA.set_xticks(vLabels, [f'{labelVal}' for labelVal in vLabels])
    hA.set_ylabel('Count')
    if lClass is not None:
        hA.set_xticklabels(lClass)
    
    if xLabelRot is not None:
        for xLabel in hA.get_xticklabels():
            xLabel.set_rotation(xLabelRot)

    return hA

## 1D Convolutional Neural Network (CNN)


* <font color='brown'>(**#**)</font> _Data Leakage_ is a common mistake during the feature engineering phase.

### Features for 1D Signal Classification

One way to classify different features of 1D signals would be:

 - Statistical Features  
   Treat the data as a set of values.    
   Summarize data using descriptive statistics.  
   Insensitive to the ordering of observations are included in this set.  
   <font color='magenta'>Example</font>: Mean, Variance, Skewness, Kurtosis, Percentiles, Entropy.
 - Temporal Features  
   Features analyze the changes and patterns in data over time.  
   Sensitive to the order of the samples.  
   Captures temporal correlations, trends and rate of changes.  
   <font color='magenta'>Example</font>: Mean, Variance, Skewness, Kurtosis, Percentiles.
 - Spectral Features
 - Structural Features


In [None]:
# Parameters

fileUrl     = 'https://technionmail-my.sharepoint.com/:u:/g/personal/royia_technion_ac_il/EUXCDJ40oItKofZ9E5tmSfMB_QZlZ3-N_-uc7WYGafQf8Q?e=rxEgx0' #<! OneDrive
dataSetName = 'MNISTStroke'

numSamplesTrain = 10_000
numSamplesTest  = 1_000

numImg = 3

# Features
numGridPts   = 32
interpModel  = sp.interpolate.PchipInterpolator
flatFeatures = False

# Cross Validation
numKFold = 5

# Visualization
exportFig = False

## Generate Data


### The MNIST Stroke Dataset

The MNIST Data Set s the "Hello World" dataset of Machine Learning.


* <font color='red'>(**?**)</font> Will the solution of the Squared Euclidean Distance be the same as the Euclidean Distance?

In [None]:
# Generate / Load Data 

mX, vY = fetch_openml('mnist_784', version = 1, return_X_y = True, as_frame = False, parser = 'auto')
vY = vY.astype(np.int_) #<! The labels are strings, convert to integer

print(f'The features data shape: {mX.shape}')
print(f'The labels data shape: {vY.shape}')
print(f'The unique values of the labels: {np.unique(vY)}')

In [None]:
# Generate / Load Data 

if (not os.path.isdir(os.path.join(DATA_PATH, dataSetName))):
    dataSetPath = onedrivedownloader.download(fileUrl, filename = os.path.join(DATA_PATH, dataSetName + '.zip'), unzip = True, unzip_path = DATA_PATH)
    dataSetPath = os.path.join(dataSetPath, dataSetName)
else:
    dataSetPath = os.path.join(DATA_PATH, dataSetName)

In [None]:
# Parse Single File

sampleIdx = random.randrange(60_000)
lXy, labelIdx = ParseMnistStrokeSample(sampleIdx, dataPath = dataSetPath)

In [None]:
# Plot the Sample
hF, vHa = plt.subplots(nrows = 1, ncols = 2, figsize = (12.8, 4.8))
vHa = vHa.flat

hA = vHa[0]
hA.imshow(np.reshape(mX[sampleIdx], TU_MNIST_IMG_SIZE), cmap = 'gray', vmin = 0, vmax = 255)

hA = vHa[1]
hA = PlotStroke(lXy, hA = hA)
hA.legend();

In reality the order of points matter (First to last).

Idea:
 - Data Level:
    - Padding.
    - Interpolation.
    - Clustering.
 - Model
    - Model for _various length_ sequence (RNN, Transformers).

In [None]:
# Interpolation

lXXYY = [TransformStroke(lXy, numGridPts, interpCls = interpModel)]

hF, vHa = plt.subplots(nrows = 1, ncols = 3, figsize = (19.2, 4.8))
vHa = vHa.flat

hA = vHa[0]
hA.imshow(np.reshape(mX[sampleIdx], TU_MNIST_IMG_SIZE), cmap = 'gray', vmin = 0, vmax = 255)

hA = vHa[1]
hA = PlotStroke(lXy, hA = hA)
hA.legend();

hA = vHa[2]
hA = PlotStroke(lXXYY, hA = hA)
hA.legend();

In [None]:
# Interpolator Effect

tuInterpModel = (
    ('Cubic Spline', sp.interpolate.CubicSpline),
    ('Akima', sp.interpolate.Akima1DInterpolator),
    ('PChip', sp.interpolate.PchipInterpolator),
    ('BSpline', sp.interpolate.make_interp_spline),
    ('Piece Wise Linear', lambda x, y: sp.interpolate.make_interp_spline(x, y, k = 1)),
    ('Smooth Spline', sp.interpolate.make_smoothing_spline),
)

hF, vHa = plt.subplots(nrows = 1, ncols = 1 + len(tuInterpModel), figsize = (18, 4))
vHa = vHa.flat

hA = vHa[0]
hA = PlotStroke(lXy, hA = hA)
hA.set_title('Original Strokes')
hA.legend();

for ii, (interpModelName, oInterpModel) in enumerate(tuInterpModel):
    hA = vHa[ii + 1]
    lXXYY = [TransformStroke(lXy, numGridPts, interpCls = oInterpModel)]
    PlotStroke(lXXYY, hA = hA)
    hA.set_title(interpModelName);

In [None]:
# PyTorch Data Loader
class MNISTStrokeDataset(Dataset):
    oDefInt = sp.interpolate.PchipInterpolator
    def __init__( self, dataPath: str, dataSet: Literal['Test', 'Train'], /, *, numGridPts: int = 32, interpModel: Callable = oDefInt, flatFeatures: bool = False ) -> None:
        TEST_FILE_NAME  = 'TEST.pkl'
        TRAIN_FILE_NAME = 'TRAIN.pkl'

        if dataSet not in ['Test', 'Train']:
            raise ValueError(f'The value of `"dataSet"` = {dataSet} must be either `"dataSet"` or `"Test"`')
        
        match dataSet:
            case 'Test':
                dataFileName = TEST_FILE_NAME
            case 'Train':
                dataFileName = TRAIN_FILE_NAME
        
        dataFilePath = os.path.join(dataPath, dataFileName)
        if os.path.isfile(dataFilePath):
            # Load RAW data
            with open(dataFilePath, 'rb') as hFile:
                dData = pickle.load(hFile)
                lS = dData['lStrokes']
                lY = dData['lY']                    
        else:
            # Generate RAW data and save
            lFiles = os.listdir(dataPath)
            lFiles = [f for f in lFiles if dataSet in f]
            lFiles.sort()

            lS = [] #<! Strokes per Image
            lY = [] #<! Labels

            for ii in range(len(lFiles)):
                lXy, labelIdx = ParseMnistStrokeSample(ii, dataPath = dataPath, dataSet = dataSet)
                lS.append(lXy)
                lY.append(labelIdx)
            
            dData = {'lStrokes': lS, 'lY': lY}
            # Save RAW data
            with open(dataFilePath, 'wb') as hFile:
                pickle.dump(dData, hFile)
        
        lX = [] #<! Features
        for ii in range(len(lS)):
            lXy = lS[ii]
            mXY = TransformStroke(lXy, numGridPts, interpCls = interpModel)
            lX.append(mXY)
        
        self.dataPath     = dataPath
        self.dataSet      = dataSet
        self.numGridPts   = numGridPts
        self.interpModel  = interpModel
        self.flatFeatures = flatFeatures
        
        self.lS = lS
        self.lX = lX
        self.lY = lY
        self.numSamples = len(lX)

    def __len__( self: Self ) -> int:
        
        return self.numSamples

    def __getitem__( self: Self, idx: int ) -> Tuple[NDArray, int]:
        
        mX   = self.lX[idx] #<! Features (numGridPts, 2)
        valY = self.lY[idx] #<! Label

        mX = mX.astype(np.float32) #<! PyTorch default float on GPU's

        if self.flatFeatures:
            # Return a flat vector of features
            return np.ravel(mX), valY
        
        return mX, valY

In [None]:
# Define PyTorch Dataset
dsTrain = MNISTStrokeDataset(dataSetPath, 'Train', numGridPts = numGridPts, interpModel = interpModel, flatFeatures = flatFeatures)
dsTest  = MNISTStrokeDataset(dataSetPath, 'Test', numGridPts = numGridPts, interpModel = interpModel, flatFeatures = flatFeatures)

In [None]:
# DataSet as Iterator

sampleIdx = random.randrange(len(dsTrain))
mX, valY = dsTrain[sampleIdx]

hF, hA = plt.subplots(nrows = 1, ncols = 1, figsize = (6.4, 4.8))

hA = PlotStroke([mX], hA = hA)
hA.set_title(f'Label: {valY}');

In [None]:
# Pre Processing

# The image is in the range {0, 1, ..., 255}
# We scale it into [0, 1]

#===========================Fill This===========================#
# 1. Scale the values into the [0, 1] range.
# mX = mX / 255.0

#===============================================================#

In [None]:
# Train Test (Validation) Split

#===========================Fill This===========================#
# 1. Split the data such that the Train Data has `numSamplesTrain`.
# 2. Split the data such that the Test Data has `numSamplesTest`.
# 3. The distribution of the classes must match the original data.

# numClass = len(np.unique(vY))
# mXTrain, mXTest, vYTrain, vYTest = train_test_split(mX, vY, test_size = numSamplesTest, train_size = numSamplesTrain, shuffle = True, stratify = vY)

#===============================================================#

# print(f'The training features data shape: {mXTrain.shape}')
# print(f'The training labels data shape  : {vYTrain.shape}')
# print(f'The test features data shape    : {mXTest.shape}')
# print(f'The test labels data shape      : {vYTest.shape}')
# print(f'The unique values of the labels : {np.unique(vY)}')

### Explore the Data

In [None]:
# Plot the Data

# hF = PlotMnistImages(mX, vY, numImg)

In [None]:
# Distribution of Labels

# hA = PlotLabelsHistogram(vY)
# plt.show()

In [None]:
# Mean Image per Class

# tI = np.zeros(shape = (numClass, ) + TU_MNIST_IMG_SIZE)

# for ii in range(numClass):
#     vIdx = vY == ii
#     vF = np.mean(mX[vIdx], axis = 0) #<! (numFeatures, )
#     tI[ii] = np.reshape(vF, TU_MNIST_IMG_SIZE)