## Import Necessary Libraries

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import groq.api as g
import groq.runner.tsp as tsp
from groqflow import groqit
import onnxruntime as ort
from groq.runtime import driver as runtime
import groq.runtime
import time, timeit

import numpy as np

import os
import sys, re
import subprocess
import matplotlib.pyplot as plt

253539:INFO:[001.57GB/566.58GB]:<frozen importlib._bootstrap>:688 GroqAPI log level: INFO
253539:INFO:[001.57GB/566.58GB]:<frozen importlib._bootstrap>:688 random seed: 1118292051


Authorization required, but no authorization protocol specified
Authorization required, but no authorization protocol specified
Authorization required, but no authorization protocol specified



## Define Model

In [2]:
class Quantizer(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, n_bits):
        """Quantizes the input x to n_bits precision."""
        qmin, qmax = 0, 2 ** n_bits - 1
        scale = (x.max() - x.min()) / (qmax - qmin)
        x_q = torch.round(x / scale).clamp(qmin, qmax) * scale
        return x_q

    @staticmethod
    def backward(ctx, grad_output):
        """Straight-through estimator for gradient computation."""
        return grad_output, None

class ReconSmallPhaseModel(nn.Module):
    def __init__(self, nconv: int = 16, n_bits: int = 8):
        super(ReconSmallPhaseModel, self).__init__()
        self.nconv = nconv
        self.n_bits = n_bits  # Number of bits for quantization
        self.encoder = nn.Sequential( # Appears sequential has similar functionality as TF avoiding need for separate model definition and activ
            *self.down_block(1, self.nconv),
            *self.down_block(self.nconv, self.nconv * 2),
            *self.down_block(self.nconv * 2, self.nconv * 4),
            *self.down_block(self.nconv * 4, self.nconv * 8), 
            *self.down_block(self.nconv * 8, self.nconv * 16), 
            *self.down_block(self.nconv * 16, self.nconv * 32),
            #*self.down_block(self.nconv * 32, self.nconv * 32)
        )
        
        # amplitude model
        #self.decoder1 = nn.Sequential(
            #*self.up_block(self.nconv * 32, self.nconv * 32),
         #   *self.up_block(self.nconv * 32, self.nconv * 16),
          #  *self.up_block(self.nconv * 16, self.nconv * 8),
           # *self.up_block(self.nconv * 8, self.nconv * 8),
            #*self.up_block(self.nconv * 8, self.nconv * 4),
            #*self.up_block(self.nconv * 4, self.nconv * 2),
            #*self.up_block(self.nconv * 2, self.nconv * 1),
            #nn.Conv2d(self.nconv * 1, 1, 3, stride=1, padding=(1,1)),
        #)
        
        # phase model
        self.decoder2 = nn.Sequential(
            #*self.up_block(self.nconv * 32, self.nconv * 32),
            *self.up_block(self.nconv * 32, self.nconv * 16),
            *self.up_block(self.nconv * 16, self.nconv * 8),
            *self.up_block(self.nconv * 8, self.nconv * 8),
            *self.up_block(self.nconv * 8, self.nconv * 4),
            *self.up_block(self.nconv * 4, self.nconv * 2),
            *self.up_block(self.nconv * 2, self.nconv * 1),
            nn.Conv2d(self.nconv * 1, 1, 3, stride=1, padding=(1,1)),
            nn.Tanh()
        )
    def down_block(self, filters_in, filters_out):
        block = [
            nn.Conv2d(in_channels=filters_in, out_channels=filters_out, kernel_size=3, stride=1, padding=(1,1)),
            nn.ReLU(),
            nn.Conv2d(filters_out, filters_out, 3, stride=1, padding=(1,1)),
            nn.ReLU(),
            nn.MaxPool2d((2,2))
        ]
        return block
    
    
    def up_block(self, filters_in, filters_out):
        block = [
            nn.Conv2d(filters_in, filters_out, 3, stride=1, padding=(1,1)),
            nn.ReLU(),
            nn.Conv2d(filters_out, filters_out, 3, stride=1, padding=(1,1)),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear')
        ]
        return block
        
    
    def forward(self,x):
        with torch.cuda.amp.autocast():
            x = Quantizer.apply(x,self.n_bits)    # Apply quantization to input
            #print("Data type after quantization of inputs:", x.dtype)   # Print the data type after quantization
            
            x1 = self.encoder(x)
            x1 = Quantizer.apply(x1, self.n_bits) # Quantize feature maps
            #print("Size after encoder:", x1.size())  # Print size after encoder
            #print("Type after encoder:", x1.dtype)   # Print the data type after quantization
            
            #amp = self.decoder1(x1)
            ph = self.decoder2(x1)
            ph = Quantizer.apply(ph, self.n_bits)  #Quantize output

            #Restore -pi to pi range
            ph = ph*np.pi #Using tanh activation (-1 to 1) for phase so multiply by pi
            
        return ph

## Gather Dataset

In [3]:
class PtychographyDataset(Dataset):
    def __init__(self, npz_files):

        # Initialize empty lists for inputs and labels from all files
        all_inputs = []
        all_labels = []

        # Loop through all 10 files
        for npz_file in npz_files:
            data = np.load(npz_file)
            inputs = torch.tensor(data["reciprocal"], dtype=torch.float32)
            labels = np.angle(data["real"])
            labels = torch.tensor(labels, dtype=torch.float32)

            # Ensure input and label shapes are correct 
            if len(inputs.shape) == 3:  
                inputs = inputs.unsqueeze(1)  # Add channel dim (C=1)
                labels = labels.unsqueeze(1)  # Add channel dim (C=1)

            # Append the data from file to lists
            all_inputs.append(inputs)
            all_labels.append(labels)

        # Concatenate all data 
        self.inputs = torch.cat(all_inputs, dim = 0)
        self.labels = torch.cat(all_labels, dim = 0)


    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.labels[idx]
    
npz_file = ["/home/sfowler/Downloads/NewScans/scan780.npz"]
checkpoint_path = "checkpoint_q.pth"

dataset = PtychographyDataset(npz_file)

# Create a DataLoader for the test dataset
test_dataloader = DataLoader(dataset, batch_size=4, shuffle=False)

## Export To Onnx and IOP

In [4]:
# Get a single input image tensor from the dataset
inputs, _ = next(iter(test_dataloader))  # Get the first batch
input_im_tensor = inputs[0].unsqueeze(0)  # Extract the first image from the batch

# Print the shape to verify it's correct
print(f"Input image shape: {input_im_tensor.shape}")

input_size = 128
model = ReconSmallPhaseModel()
onnx_model = "/home/sfowler/Groq_PtychoNN/Onnx/quantized_PtychoNN.onnx"
iop_file_path = "/home/sfowler/Groq_PtychoNN/IOP/quantized_PtychoNN.iop"

# Get current directory and print to the screen
current_directory = os.getcwd()
print(f"current_directory: {current_directory}")


# Export PyTorch Model to ONNX
torch.onnx.export(
    model,
    input_im_tensor,
    onnx_model,
    opset_version=14,
    input_names=["input_image"],
    output_names=["output"],
)


onnx_model_path = "/home/sfowler/Groq_PtychoNN/Onnx/quantized_PtychoNN.onnx"
iop_file_path = "/home/sfowler/Groq_PtychoNN/IOP/quantized_PtychoNN.iop"

# Use GroqFlow to compile the model
# try:
#     # groqit automatically compiles the model and returns the package path
#     package_path = groqit(onnx_model_path, inputs={"input_image": input_im_tensor}, cache_dir=os.path.join(current_directory, "../.cache"), groqview = True, rebuild="always", build_name = "edgePtychoNN")

#     # The .iop file is located in the generated package directory
#     print(f"Compilation successful. IOP file saved to: {iop_file_path}")

# except Exception as e:
#     print(f"Error during compilation: {e}")

Input image shape: torch.Size([1, 1, 128, 128])
current_directory: /home/sfowler/Groq_PtychoNN


