Import Libraries

In [40]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.onnx
import time
import pandas as pd
from scipy.ndimage import zoom
from giza_actions.action import action
from giza_actions.task import task
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Define the model


In [41]:
input_size = 99
output_size = 1

# Define hidden layers sizes and dropout rates
hidden_size = [256, 128, 64]
dropout_rates = [0.1, 0.1]
learning_rate = 0.001
num_epochs = 100
batch_size = 64

In [42]:
class RegressionNN(nn.Module):
    def __init__(self, input_size, hidden_size=[256, 128, 64], output_size=1, dropout_rates=[0.1, 0.1]):
        super(RegressionNN, self).__init__()
        self.layers = nn.ModuleList()
        self.batch_norms = nn.ModuleList()
        self.dropouts = nn.ModuleList()

        layer_sizes = [input_size] + hidden_size
        for i in range(len(layer_sizes) - 1):
            self.layers.append(nn.Linear(layer_sizes[i], layer_sizes[i+1]))
            self.batch_norms.append(nn.BatchNorm1d(layer_sizes[i+1]))
            self.dropouts.append(nn.Dropout(dropout_rates[min(i, len(dropout_rates)-1)]))
        
        self.output_layer = nn.Linear(hidden_size[-1], output_size)
        
    def forward(self, x):
        for i in range(len(self.layers)):
            x = self.layers[i](x)
            x = self.batch_norms[i](x)
            x = nn.ReLU()(x)
            x = self.dropouts[i](x)
            
        x = self.output_layer(x)
        return x


In [43]:
@task(name=f'Prepare Datasets')
def prepare_datasets(X, y):
    print("Prepare dataset...")
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    X_train_tensor = torch.tensor(X_train.values.astype(np.float32))
    y_train_tensor = torch.tensor(y_train.values.astype(np.float32)).view(-1, 1)
    X_test_tensor = torch.tensor(X_test.values.astype(np.float32))
    y_test_tensor = torch.tensor(y_test.values.astype(np.float32)).view(-1, 1)

    print("✅ Datasets prepared successfully")

    return X_train_tensor, y_train_tensor, X_test_tensor, y_test_tensor


 `@task(name='my_unique_name', ...)`


In [44]:
@task(name=f'Create Loaders')
def create_data_loaders(x_train, y_train, x_test, y_test):
    print("Create loaders...")

    train_loader = DataLoader(TensorDataset(x_train, y_train), batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(TensorDataset(x_test, y_test), batch_size=batch_size, shuffle=False)

    print("✅ Loaders created!")

    return train_loader, test_loader


 `@task(name='my_unique_name', ...)`


In [45]:
@task(name=f'Train model')
def train_model(train_loader):
    print("Train model...")

    model = RegressionNN(input_size, hidden_size, output_size=1).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()
        train_loss = 0.0
        for i, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
    print("✅ Model trained successfully")
    return model


 `@task(name='my_unique_name', ...)`


In [46]:
@task(name=f'Test model')
def test_model(model, test_loader):
    predictions = []
    actuals = []
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            predictions.extend(outputs.view(-1).cpu().numpy())  # Flatten outputs and move to CPU
            actuals.extend(targets.view(-1).cpu().numpy())  # Flatten targets and move to CPU

    predictions = np.array(predictions)
    actuals = np.array(actuals)

    # Calculate MAE
    mae = np.mean(np.abs(predictions - actuals))

    # Calculate MAPE
    mape = np.mean(np.abs((actuals - predictions) / actuals)) * 100
    print('Mean absolute error:', mae)
    print('MAPE:', mape)
            


 `@task(name='my_unique_name', ...)`


Action the Task to train, test the model and convert it to onnx format

In [47]:
@task(name=f'Convert To ONNX')
def convert_to_onnx(model, X_train, onnx_file_path):
    # Make sure the model is in evaluation mode
    model.eval()

    sample_input = torch.randn(1, X_train.shape[1]).to(device)
    # Export the model to ONNX format
    torch.onnx.export(model,                        
                  sample_input,                 
                  onnx_file_path,              
                  export_params=True,           
                  opset_version=12,             
                  do_constant_folding=True,     
                  input_names=['input'],        
                  output_names=['output'],      
                  dynamic_axes={'input': {0: 'batch_size'}, 
                                'output': {0: 'batch_size'}})

    print(f"Model has been converted to ONNX and saved as {onnx_file_path}")

@action(name=f'Execution', log_prints=True )
def execution():
    #read csv
    sales = pd.read_csv('sales.csv')
    X = sales[pd.Series(sales.columns)[pd.Series(sales.columns)!='amount_original'].values]
    y = sales['amount_original']  # Your target vector
    x_train, y_train, x_test, y_test = prepare_datasets(X, y)
    train_loader, test_loader = create_data_loaders(
        x_train, y_train, x_test, y_test)
    model = train_model(train_loader)
    test_model(model, test_loader)


    onnx_file_path = "cryptopunks.onnx"
    convert_to_onnx(model, x_train, onnx_file_path) 
    
execution()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `tuple`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `tuple`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `RegressionNN`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

In [61]:
from giza_actions.model import GizaModel
import requests
from utils.utils import *
MODEL_ID = 404  
VERSION_ID = 1

#Punk ID for prediction
ID_PUNK = 11

relevant_columns_model = ['amount_original', 'CP Price',
       'Category', 'Front Beard Dark', 'VR', 'Clown Eyes Green',
       'Buck Teeth', 'Wild Hair', 'Silver Chain', 'Cigarette',
       'Purple Eye Shadow', 'Pigtails', 'Handlebars', 'Normal Beard',
       'Blonde Bob', 'Muttonchops', 'Smile', 'Shaved Head', 'Mustache',
       'Mohawk Dark', 'Straight Hair', 'Choker', 'Regular Shades',
       'Peak Spike', 'Tassle Hat', 'Dark Hair', 'Knitted Cap', 'Bandana',
       'Pink With Hat', 'Gold Chain', 'Mohawk', 'Welding Goggles',
       'Cap Forward', 'Tiara', 'Purple Lipstick', 'Small Shades',
       'Stringy Hair', 'Do-rag', 'Wild White Hair', 'Frown', 'Red Mohawk',
       'Half Shaved', 'Clown Hair Green', 'Vampire Hair', 'Beanie',
       'Clown Nose', 'Messy Hair', 'Blonde Short', 'Mole', 'Purple Hair',
       'Chinstrap', 'Orange Side', 'Hot Lipstick', 'Horned Rim Glasses', 'Cap',
       'Green Eye Shadow', 'Nerd Glasses', 'Rosy Cheeks', 'Pilot Helmet',
       'Straight Hair Dark', 'Medical Mask', 'Frumpy Hair', 'Wild Blonde',
       'Hoodie', 'Earring', 'Big Shades', 'Spots', 'Headband', 'Goat',
       'Big Beard', 'Classic Shades', 'Clown Eyes Blue', 'Blue Eye Shadow',
       'Cowboy Hat', 'Luxurious Beard', 'Crazy Hair', 'Normal Beard Black',
       'Fedora', 'Straight Hair Blonde', 'Vape', 'Mohawk Thin', 'Front Beard',
       '3D Glasses', 'Police Cap', 'Top Hat', 'Shadow Beard', 'Eye Mask',
       'Black Lipstick', 'Eye Patch', 'Pipe', 'relative_month_number',  
       'skin_Arab', 'skin_Black', 'skin_Latino', 'skin_white', 'Gender_Alien',
        'Gender_Ape','Gender_Female', 'Gender_Male', 'Gender_Zombie' ]

############### Non-Verifiable ###############
@task(name=f'Prediction from local onnx ')    
def prediction(id_punk, eth_price, floor_price, relative_month_number, MODEL_ID, VERSION_ID):

    metadata = pd.read_csv('punk_metadata_for_pred.csv')
    
    punk_metadata = metadata[metadata.ID == id_punk]

    predict_price_punk_today = punk_metadata.copy()
    predict_price_punk_today['CP Price'] = floor_price
    predict_price_punk_today['relative_month_number'] = relative_month_number

    
    entry = predict_price_punk_today[pd.Series(relevant_columns_model)[pd.Series(relevant_columns_model)!='amount_original']]

    model = GizaModel(model_path="./cryptopunks.onnx")

    result = model.predict(
        input_feed={"input": entry.iloc[0].astype(np.float32).values.reshape(1, len(entry.columns))}, verifiable=False
    )

    # return result
    return result, punk_metadata.url.iloc[0]

@action(name=f'Execution: Prediction from local onnx', log_prints=True)
def execution():    
    floor_price = get_punk_floor_today()
    eth = get_ethereum_price_history()
    eth_price = eth.iloc[-1]['Ethereum_Price']
    sales = pd.read_csv('sales.csv')

    relative_month_number = sales.relative_month_number.max()

    result, url = prediction(ID_PUNK, eth_price, floor_price, relative_month_number, MODEL_ID, VERSION_ID)
    
    

    return f"Result: {result[0][0]} ETH for punk {ID_PUNK} with url {url}"

execution()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


'Result: 58.99652099609375 ETH for punk 11 with url https://www.larvalabs.com/cryptopunks/cryptopunk11.png'