In [6]:
# Based on https://visualstudiomagazine.com/articles/2020/11/24/pytorch-accuracy.aspx

# banknote_bnn.py
# Banknote classification
# PyTorch 1.6.0-CPU Anaconda3-2020.02  Python 3.7.6
# Windows 10 

import os
import glob
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import normalize
import csv
import pandas as pd
import random
from collections import defaultdict
from joblib import dump, load

import numpy as np
import torch as T
device = T.device("cuda")  # apply to Tensor or Module
T.set_default_tensor_type(T.cuda.FloatTensor)

In [7]:
locations = ['not_crossing_wen', 'not_crossing_nick', 'not_crossing']
locations_crossing = ['crossing_wen', 'crossing_nick', 'crossing']

not_crossing = defaultdict(list)
crossing = defaultdict(list)

In [8]:
# Read only the valid data into dict

for location in locations:
    path = 'data/' + location + '/'
    for filename in glob.glob(path + '*.csv'):
        name = filename.replace(path, '')[:-4]

        df=pd.read_csv(filename)
        if len(df.columns) >= 5 * 2: # 2 columns per point for x & y
            for data in df.values:
                not_crossing[name].append(data)
                
for location in locations_crossing:
    path = 'data/' + location + '/'
    for filename in glob.glob(path + '*.csv'):
        name = filename.replace(path, '')[:-4]

        df=pd.read_csv(filename)
        if len(df.columns) >= 5 * 2: # 2 columns per point for x & y
            for data in df.values:
                crossing[name].append(data)

In [9]:
file = "567891011121314151617"

cross_arr = np.asarray(crossing[file])
cross_arr = [np.append(d, 0) for d in cross_arr]

not_cross_arr = np.asarray(not_crossing[file])
not_cross_arr = [np.append(d, 1) for d in not_cross_arr]

combined_arr = cross_arr + not_cross_arr

random.shuffle(combined_arr)

N = len(combined_arr)

train_arr = combined_arr[:4*N//5]
test_arr = combined_arr[4*N//5:]

np.savetxt('train.csv', train_arr, delimiter=',')
np.savetxt('test.csv', test_arr, delimiter=',')

num_features = len(cross_arr[0])-1

In [12]:
class BestDataset(T.utils.data.Dataset):

	def __init__(self, src_file, num_rows=None):
		all_data = np.loadtxt(src_file, delimiter=",", skiprows=0,
			dtype=np.float32)  # strip IDs off
		all_data = np.loadtxt(open(src_file, "rb"), delimiter=",", skiprows=0)
		print(len(all_data[0]))
		self.x_data = T.tensor(all_data[:,0:num_features],
			dtype=T.float32).to(device)
		self.y_data = T.tensor(all_data[:,num_features],
			dtype=T.float32).to(device)

		# n_vals = len(self.y_data)
		# self.y_data = self.y_data.reshape(n_vals,1)
		self.y_data = self.y_data.reshape(-1,1)

	def __len__(self):
		return len(self.x_data)

	def __getitem__(self, idx):
		if T.is_tensor(idx):
			idx = idx.tolist()
		preds = self.x_data[idx,:]  # idx rows, all 4 cols
		lbl = self.y_data[idx,:]    # idx rows, the 1 col
		sample = { 'predictors' : preds, 'target' : lbl }
		# sample = dict()   # or sample = {}
		# sample["predictors"] = preds
		# sample["target"] = lbl
# 		print(sample)
		return sample

# ---------------------------------------------------------

def accuracy(model, ds):
	# ds is a iterable Dataset of Tensors
	n_correct = 0; n_wrong = 0

	# alt: create DataLoader and then enumerate it
	for i in range(len(ds)):
		inpts = ds[i]['predictors']
		target = ds[i]['target']    # float32  [0.0] or [1.0]
		with T.no_grad():
			oupt = model(inpts)

		# avoid 'target == 1.0'
		if target < 0.5 and oupt < 0.5:  # .item() not needed
			n_correct += 1
		elif target >= 0.5 and oupt >= 0.5:
			n_correct += 1
		else:
			n_wrong += 1

	return (n_correct * 1.0) / (n_correct + n_wrong)

# ---------------------------------------------------------

def acc_coarse(model, ds):
	inpts = ds[:]['predictors']  # all rows
	targets = ds[:]['target']    # all target 0s and 1s
	with T.no_grad():
		oupts = model(inpts)         # all computed ouputs
	pred_y = oupts >= 0.5        # tensor of 0s and 1s
	num_correct = T.sum(targets==pred_y)
	acc = (num_correct.item() * 1.0 / len(ds))  # scalar
	return acc

# ----------------------------------------------------------

def my_bce(model, batch):
	# mean binary cross entropy error. somewhat slow
	sum = 0.0
	inpts = batch['predictors']
	targets = batch['target']
	with T.no_grad():
		oupts = model(inpts)
	for i in range(len(inpts)):
		oupt = oupts[i]
		# should prevent log(0) which is -infinity
		if targets[i] >= 0.5:  # avoiding == 1.0
			sum += T.log(oupt)
		else:
			sum += T.log(1 - oupt)

	return -sum / len(inpts)

# ----------------------------------------------------------

class Net(T.nn.Module):
	def __init__(self):
		super(Net, self).__init__()
		self.hid1 = T.nn.Linear(num_features, 8)  # 4-(8-8)-1
		self.hid2 = T.nn.Linear(8, 8)
		self.oupt = T.nn.Linear(8, 1)

		T.nn.init.xavier_uniform_(self.hid1.weight) 
		T.nn.init.zeros_(self.hid1.bias)
		T.nn.init.xavier_uniform_(self.hid2.weight) 
		T.nn.init.zeros_(self.hid2.bias)
		T.nn.init.xavier_uniform_(self.oupt.weight) 
		T.nn.init.zeros_(self.oupt.bias)

	def forward(self, x):
		z = T.tanh(self.hid1(x)) 
		z = T.tanh(self.hid2(z))
		z = T.sigmoid(self.oupt(z)) 
		return z

# ----------------------------------------------------------

def main():
	# 0. get started
	print("\nBanknote authentication using PyTorch \n")
	T.manual_seed(1)
	np.random.seed(1)

	# 1. create Dataset and DataLoader objects
	print("Creating Banknote train and test DataLoader ")

# 	train_file = "./banknote_k20_train.txt"
# 	test_file = "./banknote_k20_test.txt"

# 	train_ds = BanknoteDataset(train_file)  # all rows
# 	test_ds = BanknoteDataset(test_file)

	train_ds = BestDataset("train.csv")
	test_ds = BestDataset("test.csv")# all rows

	bat_size = 10
	train_ldr = T.utils.data.DataLoader(train_ds,
		batch_size=bat_size, shuffle=True)
	# test_ldr = T.utils.data.DataLoader(test_ds,
	#   batch_size=1, shuffle=False)  # not needed

	# 2. create neural network
	print("Creating 4-(8-8)-1 binary NN classifier ")
	net = Net().to(device)

	# 3. train network
	print("\nPreparing training")
	net = net.train()  # set training mode
	lrn_rate = 0.01
	loss_obj = T.nn.BCELoss()  # binary cross entropy
	optimizer = T.optim.SGD(net.parameters(),
		lr=lrn_rate)
	max_epochs = 500
	ep_log_interval = 10
	print("Loss function: " + str(loss_obj))
	print("Optimizer: SGD")
	print("Learn rate: 0.01")
	print("Batch size: 10")
	print("Max epochs: " + str(max_epochs))

	print("\nStarting training")
	for epoch in range(0, max_epochs):
		epoch_loss = 0.0            # for one full epoch
		epoch_loss_custom = 0.0
		num_lines_read = 0

		for (batch_idx, batch) in enumerate(train_ldr):
			X = batch['predictors']  # [10,4]  inputs
			Y = batch['target']      # [10,1]  targets
			oupt = net(X)            # [10,1]  computeds 

			loss_val = loss_obj(oupt, Y)   # a tensor
			epoch_loss += loss_val.item()  # accumulate
			# epoch_loss += loss_val  # is OK
			# epoch_loss_custom += my_bce(net, batch)

			optimizer.zero_grad() # reset all gradients
			loss_val.backward()   # compute all gradients
			optimizer.step()      # update all weights

		if epoch % ep_log_interval == 0:
			print("epoch = %4d   loss = %0.4f" % \
				(epoch, epoch_loss))
			# print("custom loss = %0.4f" % epoch_loss_custom)
			# print("")
	print("Done ")

# ----------------------------------------------------------

	# 4. evaluate model
	net = net.eval()
	acc_train = accuracy(net, train_ds)
	print("\nAccuracy on train data = %0.2f%%" % \
		(acc_train * 100))
	acc_test = accuracy(net, test_ds)
	print("Accuracy on test data = %0.2f%%" % \
		(acc_test * 100))

	# acc_train_c = acc_coarse(net, train_ds)
	# print("Accuracy on train data = %0.2f%%" % \
	#  (acc_train_c * 100))
	# acc_test_c = acc_coarse(net, test_ds)
	# print("Accuracy on test data = %0.2f%%" % \
	#  (acc_test_c * 100))

	# 5. save model
	print("\nSaving trained model state_dict \n")
	path = "./Models/banknote_sd_model.pth"
	T.save(net.state_dict(), path)

	# print("\nSaving entire model \n")
	# path = ".\\Models\\banknote_full_model.pth"
	# T.save(net, path

	# print("\nSaving trained model as ONNX \n")
	# path = ".\\Models\\banknote_onnx_model.onnx"
	# dummy = T.tensor([[0.5, 0.5, 0.5, 0.5]],
	#   dtype=T.float32).to(device)
	# T.onnx.export(net, dummy, path,
	#   input_names=["input1"],
	#  output_names=["output1"])

	# model = Net()  # later . . 
	# model.load_state_dict(T.load(path))

	# 6. make a prediction 
	raw_inpt = np.array([[4.670006930828095038e-01,2.772299349308013916e-01,4.581054151058197021e-01,2.819681465625762939e-01,4.726324081420897882e-01,3.808141648769378662e-01,4.454198181629180908e-01,3.854113519191741943e-01,4.704093933105468750e-01,4.648146331310272217e-01,4.108781218528748114e-01,4.509174823760985773e-01,4.516118168830872137e-01,4.790165424346923828e-01,4.657082557678223211e-01,4.795077741146088202e-01,4.269051551818848211e-01,6.205608248710632324e-01,4.691285192966461182e-01,6.179753541946411133e-01,4.600642025470734198e-01,7.593259215354919434e-01,4.798959791660308838e-01,7.504850029945373535e-01,4.634187221527100164e-01,2.808338105678558350e-01]],
		dtype=np.float32) # should be crossing
# 	norm_inpt = raw_inpt / 20
	unknown = T.tensor(raw_inpt,
		dtype=T.float32).to(device) 

	print("Setting normalized inputs to:")
	for x in unknown[0]:
		print("%0.3f " % x, end="")

	net = net.eval()
	with T.no_grad():
		raw_out = net(unknown)    # a Tensor
	pred_prob = raw_out.item()  # scalar, [0.0, 1.0]

	print("\nPrediction prob = %0.6f " % pred_prob)
	if pred_prob < 0.5:
		print("Prediction = crossing")
	else:
		print("Prediction = not_crossing")

	print("\nEnd Banknote demo")



In [13]:
if __name__== "__main__":
	main()


Banknote authentication using PyTorch 

Creating Banknote train and test DataLoader 
27
27
Creating 4-(8-8)-1 binary NN classifier 

Preparing training
Loss function: BCELoss()
Optimizer: SGD
Learn rate: 0.01
Batch size: 10
Max epochs: 500

Starting training
epoch =    0   loss = 85.5967
epoch =   10   loss = 69.4232
epoch =   20   loss = 51.9655
epoch =   30   loss = 46.9999
epoch =   40   loss = 45.3146
epoch =   50   loss = 44.1043
epoch =   60   loss = 44.0869
epoch =   70   loss = 43.4022
epoch =   80   loss = 42.2456
epoch =   90   loss = 41.7404
epoch =  100   loss = 39.9229
epoch =  110   loss = 39.0081
epoch =  120   loss = 38.5179
epoch =  130   loss = 36.7395
epoch =  140   loss = 35.9684
epoch =  150   loss = 34.1167
epoch =  160   loss = 32.0421
epoch =  170   loss = 33.0791
epoch =  180   loss = 32.5502
epoch =  190   loss = 30.4329
epoch =  200   loss = 29.3044
epoch =  210   loss = 29.1486
epoch =  220   loss = 27.0744
epoch =  230   loss = 26.7962
epoch =  240   loss 

In [5]:
# Below is code to load and use model in any python code, with dependencies and for CUDA of course

import numpy as np
import torch
device = torch.device("cuda")  # apply to Tensor or Module
torch.set_default_tensor_type(torch.cuda.FloatTensor)

num_features = 26

class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.hid1 = torch.nn.Linear(num_features, 8)  # 4-(8-8)-1
        self.hid2 = torch.nn.Linear(8, 8)
        self.oupt = torch.nn.Linear(8, 1)

        torch.nn.init.xavier_uniform_(self.hid1.weight) 
        torch.nn.init.zeros_(self.hid1.bias)
        torch.nn.init.xavier_uniform_(self.hid2.weight) 
        torch.nn.init.zeros_(self.hid2.bias)
        torch.nn.init.xavier_uniform_(self.oupt.weight) 
        torch.nn.init.zeros_(self.oupt.bias)

    def forward(self, x):
        z = torch.tanh(self.hid1(x)) 
        z = torch.tanh(self.hid2(z))
        z = torch.sigmoid(self.oupt(z)) 
        return z

path = "./Models/banknote_sd_model.pth"
model = Net()  # later . . 
model.load_state_dict(torch.load(path))
model.eval()
raw_inpt = np.array([[5.064111351966857910e-01,3.407363891601562500e-01,4.424785971641541082e-01,3.454131484031677246e-01,5.245444178581237793e-01,4.076796472072600763e-01,4.272206425666809082e-01,4.171701073646545410e-01,5.400559306144714355e-01,4.632427692413329523e-01,4.187252819538117010e-01,4.752222895622252863e-01,4.997749328613281250e-01,4.866386353969573975e-01,4.575124680995941162e-01,4.877757132053375244e-01,5.110443830490112305e-01,5.903016328811645508e-01,4.606600701808929998e-01,5.934466123580932617e-01,5.149625539779663086e-01,6.890525221824645996e-01,4.601623713970183771e-01,6.958268284797668457e-01,4.746918976306914728e-01,3.439677357673645020e-01]],
    dtype=np.float32) # should be not_crossing
inpt = torch.tensor(raw_inpt, dtype=torch.float32).to(device) 
raw_out = model(inpt)
pred_prob = raw_out.item()
if pred_prob < 0.5:
    print("Prediction = crossing")
else:
    print("Prediction = not_crossing")

Prediction = not_crossing
