In [1]:
!pip install nengo nengo-loihi
import nengo
import nengo_loihi



In [2]:
import torch
val_path = 'final_project/val_dataset.pth'
train_path = 'final_project/train_dataset.pth'
# Load the model
test_val_dataset_loaded= torch.load(val_path)
train_dataset_loaded= torch.load(train_path)

In [None]:
import os
import numpy as np

def load_npz_events(file_path):
    try:
        data = np.load(file_path, allow_pickle=True)
        return {key: data[key] for key in ['t', 'x', 'y', 'p']}
    except Exception as e:
        print(f"Error loading the file {file_path}: {e}")
        return None

directory = 'extracted_content/content/CIFAR/events_np'

# List all subdirectories (categories) inside the "events_np" directory
subdirectories = [subdir for subdir in os.listdir(directory) if os.path.isdir(os.path.join(directory, subdir))]

# Iterate through each subdirectory
for subdir in subdirectories:
    subdir_path = os.path.join(directory, subdir)
    
    # List all files with the ".npz" extension in the subdirectory
    files = [file for file in os.listdir(subdir_path) if file.endswith('.npz')]
    
    # Iterate through each file in the subdirectory
    for file_name in files:
        file_path = os.path.join(subdir_path, file_name)
        
        # Use your load_npz_events function to load specific keys
        loaded_data = load_npz_events(file_path)
        if loaded_data is not None:
            print(f"Loaded data from {file_path}: {loaded_data}")

print("Loading of all valid files complete.")


In [5]:
def print_original_data_structure(event_data):
    print("Original Data Structure:")
    for key in event_data:
        print(f"{key}: shape = {event_data[key].shape}, sample = {event_data[key][:10]}")


event_data = np.load('extracted_content/content/CIFAR/events_np/airplane/cifar10_airplane_979.npz')
print_original_data_structure(event_data)


Original Data Structure:
t: shape = (148123,), sample = [ 0  2  7 11 16 21 25 29 34 38]
x: shape = (148123,), sample = [117 113 112 115 114 124 125 127 126 120]
y: shape = (148123,), sample = [60 60 60 60 60 60 60 60 60 60]
p: shape = (148123,), sample = [1 1 1 1 1 1 1 1 1 1]


In [6]:
# Debugging: Print loaded event_data and its keys
print("Loaded event_data keys:", list(event_data.keys()))
print("t:", event_data['t'][:10])
print("x:", event_data['x'][:10])
print("y:", event_data['y'][:10])
print("p:", event_data['p'][:10])


Loaded event_data keys: ['t', 'x', 'y', 'p']
t: [ 0  2  7 11 16 21 25 29 34 38]
x: [117 113 112 115 114 124 125 127 126 120]
y: [60 60 60 60 60 60 60 60 60 60]
p: [1 1 1 1 1 1 1 1 1 1]


In [7]:
def preprocess_cifar10_dvs_data(event_data, img_size=(128, 128)):
    """
    Preprocess CIFAR-10 DVS event data into a format suitable for the SNN.
    :param event_data: CIFAR-10 DVS data (npz file contents).
    :param img_size: Tuple of the image size (height, width).
    :return: Preprocessed data.
    """
    height, width = img_size
    processed_data = np.zeros((height, width, 2))  # Assuming 2 channels for polarity (ON and OFF)

    for t, x, y, p in zip(event_data['t'], event_data['x'], event_data['y'], event_data['p']):
        if 0 <= x < width and 0 <= y < height:
            processed_data[y, x, int(p)] = 1  # Set spike at (x, y) for the given polarity

    return processed_data  # Return the 3D array without flattening

def temporal_binning(event_data, bin_size, img_size=(128, 128)):
    """
    Temporally bin the CIFAR-10 DVS event data.
    :param event_data: CIFAR-10 DVS data (npz file contents).
    :param bin_size: Size of each time bin in microseconds.
    :param img_size: Tuple of the image size (height, width).
    :return: List of binned data.
    """
    if event_data is None or 't' not in event_data:
        return []  # Return an empty list if event_data is None or 't' key is missing

    height, width = img_size
    max_time = np.max(event_data['t'])
    if max_time <= 0:
        return []  # Return an empty list if max_time is non-positive

    num_bins = int(np.ceil(max_time / bin_size))
    binned_data = [np.zeros((height, width, 2)) for _ in range(num_bins)]

    for x, y, p in zip(event_data['x'], event_data['y'], event_data['p']):
        if 0 <= x < width and 0 <= y < height:
            bin_index = int(0)  # Ignore 't' and use a constant bin_index
            if 0 <= bin_index < num_bins:
                binned_data[bin_index][y, x, int(p)] = 1

    return [bin_data.flatten() for bin_data in binned_data]



def normalize_data(data, method='min-max'):
    """
    Normalize the input data.
    :param data: The data to be normalized (expected as a flattened array).
    :param method: The method of normalization ('min-max' or 'standard').
    :return: Normalized data.
    """
    if method == 'min-max':
        data_min = np.min(data)
        data_max = np.max(data)
        return (data - data_min) / (data_max - data_min)
    elif method == 'standard':
        data_mean = np.mean(data)
        data_std = np.std(data)
        return (data - data_mean) / data_std
    else:
        raise ValueError("Unknown normalization method.")


In [8]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split

# Define your custom dataset class
class CustomNPZDataset(Dataset):
    def __init__(self, root_dir, extensions=('.npz',), samples_per_class=1):
        self.file_paths = []
        self.labels = []
        self.classes = os.listdir(root_dir)
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        
        for cls in self.classes:
            cls_folder = os.path.join(root_dir, cls)
            if os.path.isdir(cls_folder):
                cls_files = [file for file in os.listdir(cls_folder) if file.endswith(extensions)]
                cls_files = cls_files[:samples_per_class]  # Select only a certain number of samples
                for file in cls_files:
                    file_path = os.path.join(cls_folder, file)
                    # Load the .npz file directly
                    try:
                        data = np.load(file_path, allow_pickle=True)
                        self.file_paths.append(file_path)
                        self.labels.append(self.class_to_idx[cls])
                    except Exception as e:
                        print(f"Error loading the file {file_path}: {e}")

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        data = load_npz_events(file_path)
        if data is None:
            # Handle the case where data couldn't be loaded
            pass
        label = self.labels[idx]
        return data, label

# Create an instance of the dataset
dataset = CustomNPZDataset(root_dir='extracted_content/content/CIFAR/events_np')

# Define your preprocessing and splitting function
def preprocess_and_split_data(dataset, bin_size=10000, normalization_method='min-max', split_ratios=(0.7, 0.15, 0.15)):
    """
    Preprocess the dataset and split it into training, validation, and testing sets.
    """
    # Preprocess all data
    preprocessed_data = []
    for i in range(len(dataset)):
        data, label = dataset[i]
        processed = preprocess_cifar10_dvs_data(data)
        binned = temporal_binning(processed, bin_size=bin_size)
        normalized = [normalize_data(frame.flatten(), method=normalization_method) for frame in binned]
        preprocessed_data.append((normalized, label))
    
    # Calculate split sizes
    total_size = len(preprocessed_data)
    train_size = int(split_ratios[0] * total_size)
    val_size = int(split_ratios[1] * total_size)
    test_size = total_size - train_size - val_size

    # Split the data
    train_data = preprocessed_data[:train_size]
    val_data = preprocessed_data[train_size:train_size + val_size]
    test_data = preprocessed_data[train_size + val_size:]

    return train_data, val_data, test_data




# Use the preprocess_and_split_data() function to preprocess and split the data
original_train_data, original_val_data, original_test_data = preprocess_and_split_data(dataset)


Error loading the file extracted_content/content/CIFAR/events_np/cat/cifar10_cat_629.npz: File is not a zip file
Error loading the file extracted_content/content/CIFAR/events_np/truck/cifar10_truck_598.npz: File is not a zip file
Error loading the file extracted_content/content/CIFAR/events_np/frog/cifar10_frog_564.npz: File is not a zip file


  if event_data is None or 't' not in event_data:


In [9]:
def print_processed_data_structure(processed_data):
    print("Processed Data Structure:")
    if isinstance(processed_data, list):  # For temporally binned data
        print(f"Number of bins: {len(processed_data)}")
        for i, bin_data in enumerate(processed_data[:3]):  # Print first few bins
            print(f"Bin {i}: shape = {bin_data.shape}, sample = {bin_data[:10]}")
    else:  # For flattened data
        print(f"Shape = {processed_data.shape}, sample = {processed_data[:10]}")

#Apply preprocessing and then print structure
processed_data = preprocess_cifar10_dvs_data(event_data)
print_processed_data_structure(processed_data)

#For temporal binning
binned_data = temporal_binning(event_data, bin_size=10000)
print_processed_data_structure(binned_data)

# For normalization
normalized_data = normalize_data(processed_data, method='min-max')
print_processed_data_structure(normalized_data)


Processed Data Structure:
Shape = (128, 128, 2), sample = [[[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 0.]]

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [0. 0.]
  [1. 0.]
  [1. 1.]]

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 0.]]

 ...

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 1.]]

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 0.]]

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 0.]]]
Processed Data Structure:
Number of bins: 135
Bin 0: shape = (32768,), sample = [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
Bin 1: shape = (32768,), sample = [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Bin 2: shape = (32768,), sample = [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Processed Data Structure:
Shape = (128, 128, 2), sample = [[[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 0.]]

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [0. 0.]
  [1. 0.]
  [1. 1.]]

 [[1. 1.]
  [1. 1.]
  [1. 1.]
  ...
  [1. 0.]
  [1. 0.]
  [1. 0.]]

 ...

 [[1. 1.]
  [1. 

In [10]:
# Define batch sizes
batch_size = 2  # You can adjust this value as needed

# Create DataLoader instances with the specified batch size
train_loader = DataLoader(original_train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(original_val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(original_test_data, batch_size=batch_size, shuffle=False)


In [11]:
import nengo
from nengo_loihi import Simulator

# Define the network structure
with nengo.Network() as net:
    # Loihi compatible LIF neuron parameters
    lif_params = {
        'tau_rc': 0.02,  # membrane time constant
        'tau_ref': 0.002, # refractory period
        # other parameters as needed
    }

    # Input layer - adapt size as needed
    input_layer = nengo.Ensemble(n_neurons=128*128*2, dimensions=1, neuron_type=nengo.LIF(**lif_params))

    # Hidden layers
    hidden_layer_1 = nengo.Ensemble(128, 1, neuron_type=nengo.LIF(**lif_params))
    hidden_layer_2 = nengo.Ensemble(128, 1, neuron_type=nengo.LIF(**lif_params))

    # Output layer
    output_layer = nengo.Ensemble(10, 1, neuron_type=nengo.LIF(**lif_params))

    # Connections
    nengo.Connection(input_layer, hidden_layer_1)
    nengo.Connection(hidden_layer_1, hidden_layer_2)
    nengo.Connection(hidden_layer_2, output_layer)


In [None]:
import nengo
from nengo_loihi import Simulator
import numpy as np
import torch
from torch.utils.data import DataLoader

# Create a Nengo Simulator
with Simulator(net, precompute=True) as sim:
    num_epochs = 5  # Define the number of training epochs

    # Define a learning rule if you are using a custom one
    learning_rule = nengo_loihi.learning_rules.PES()

    # Create a connection between output_layer and a learning rule if needed
    nengo.Connection(output_layer.neurons, learning_rule.learning_signal)

    # Create a connection from learning_rule to output_layer for weight updates
    nengo.Connection(learning_rule.learning_signal, output_layer.neurons, transform=0.001)  # Adjust the transform as needed

    # Training loop
    for epoch in range(num_epochs):
        total_loss = 0.0
        num_batches = len(train_loader)

        for batch_idx, (inputs, targets) in enumerate(train_loader):
            # Reset the simulator at the beginning of each batch
            sim.reset()

            for input_frame in inputs:
                # Run the network with the current input frame
                sim.run(input_frame)

                # Compute the loss and apply learning rules here
                # You need to define how to compute loss based on the network's output
                # You may need to backpropagate errors if using gradient-based learning

                # Calculate loss (modify this according to your specific loss function)
                output_data = sim.data[output_layer].clip(0, 1)  # Clip to prevent issues with the range
                loss = your_loss_function(output_data, targets)

                # Apply learning rule (PES, STDP, etc.)
                learning_rule.error = loss
                sim.step()

                # Accumulate the loss for the epoch
                total_loss += loss

        # Print or log the average loss for the current epoch
        average_loss = total_loss / num_batches
        print(f"Epoch [{epoch+1}/{num_epochs}] - Loss: {average_loss:.4f}")

    # Validation
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, targets in val_loader:
            sim.reset()

            for input_frame in inputs:
                sim.run(input_frame)

            # Evaluate the network's output and calculate accuracy
            # You need to define how to evaluate the network's predictions
            # Here, we assume a simple classification task
            output_data = sim.data[output_layer].clip(0, 1)
            predicted_labels = np.argmax(output_data, axis=1)
            correct_labels = targets.numpy()
            num_correct = np.sum(predicted_labels == correct_labels)

            total_correct += num_correct
            total_samples += len(inputs)

    accuracy = total_correct / total_samples
    print(f"Validation Accuracy: {accuracy*100:.2f}%")

    # Testing (similar to validation)
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, targets in test_loader:
            sim.reset()

            for input_frame in inputs:
                sim.run(input_frame)

            # Evaluate the network's output and calculate accuracy
            # You need to define how to evaluate the network's predictions
            # Here, we assume a simple classification task
            output_data = sim.data[output_layer].clip(0, 1)
            predicted_labels = np.argmax(output_data, axis=1)
            correct_labels = targets.numpy()
            num_correct = np.sum(predicted_labels == correct_labels)

            total_correct += num_correct
            total_samples += len(inputs)

    accuracy = total_correct / total_samples
    print(f"Test Accuracy: {accuracy*100:.2f}%")



In [None]:
import os
import numpy as np

def load_npz_events(file_path, time_scale=0.01):
    try:
        data = np.load(file_path, allow_pickle=True)
        
        # Check if 't' is in the keys before scaling down
        if 't' in data:
            # Scale down the 't' values by a factor of 100
            data['t'] = data['t'] * time_scale
        
        return data
    except Exception as e:
        if 'not a zip file' in str(e):
            print(f"Skipped non-zip file: {file_path}")
        else:
            print(f"Error loading the file {file_path}: {e}")
        return None

directory = 'extracted_content/content/CIFAR/events_np'

# List all subdirectories (categories) inside the "events_np" directory
subdirectories = [subdir for subdir in os.listdir(directory) if os.path.isdir(os.path.join(directory, subdir))]

# Specify the time scale factor
time_scale_factor = 0.01  # Adjust as needed

# Iterate through each subdirectory
for subdir in subdirectories:
    subdir_path = os.path.join(directory, subdir)
    
    # List all files with the ".npz" extension in the subdirectory
    files = [file for file in os.listdir(subdir_path) if file.endswith('.npz')]
    
    # Iterate through each file in the subdirectory
    for file_name in files:
        file_path = os.path.join(subdir_path, file_name)
        
        # Use your load_npz_events function to load and downsample the time values
        data = load_npz_events(file_path, time_scale_factor)
        if data is not None:
            print(f"Loaded and downscaled data from {file_path}: {data}")


In [4]:
import os
import numpy as np

def load_npz_events(file_path, downsample_factor=10):
    try:
        data = np.load(file_path, allow_pickle=True)
        
        # Create a new dictionary for downsampled data
        downsampled_data = {
            'x': data['x'] // downsample_factor,
            'y': data['y'] // downsample_factor,
            'p': data['p'] , # Keep 'p' unchanged
            't': data['t'] // (downsample_factor*10) #reduce time by 100
        }
        
        return downsampled_data
    except Exception as e:
        print(f"Error loading the file {file_path}: {e}")
        return None

directory = 'extracted_content/content/CIFAR/events_np'

# List all subdirectories (categories) inside the "events_np" directory
subdirectories = [subdir for subdir in os.listdir(directory) if os.path.isdir(os.path.join(directory, subdir))]

# Specify the downsample factor
downsample_factor = 10

# Iterate through each subdirectory
for subdir in subdirectories:
    subdir_path = os.path.join(directory, subdir)
    
    # List all files with the ".npz" extension in the subdirectory
    files = [file for file in os.listdir(subdir_path) if file.endswith('.npz')]
    
    # Iterate through each file in the subdirectory
    for file_name in files:
        file_path = os.path.join(subdir_path, file_name)
        
        # Use your load_npz_events function to load and downsample the data
        data = load_npz_events(file_path, downsample_factor)
        if data is not None:
            print(f"Loaded and downsampled data from {file_path}: {data}")


Loaded and downsampled data from extracted_content/content/CIFAR/events_np/automobile/cifar10_automobile_939.npz: {'x': array([10, 10,  3, ..., 10,  8,  9], dtype=uint32), 'y': array([ 8,  8, 10, ...,  5, 11, 11], dtype=uint32), 'p': array([1, 0, 0, ..., 0, 1, 1]), 't': array([    0,     0,     0, ..., 12578, 12578, 12578], dtype=uint32)}
Loaded and downsampled data from extracted_content/content/CIFAR/events_np/automobile/cifar10_automobile_973.npz: {'x': array([ 9, 11,  9, ...,  4,  7,  3], dtype=uint32), 'y': array([ 7,  9, 12, ...,  8, 10,  9], dtype=uint32), 'p': array([0, 0, 1, ..., 1, 0, 1]), 't': array([    0,     0,     0, ..., 12832, 12832, 12832], dtype=uint32)}
Error loading the file extracted_content/content/CIFAR/events_np/automobile/cifar10_automobile_16.npz: File is not a zip file
Error loading the file extracted_content/content/CIFAR/events_np/automobile/cifar10_automobile_101.npz: File is not a zip file
Loaded and downsampled data from extracted_content/content/CIFAR/