# Using Croissant in Machine Learning Pipelines 🥐

Croissant provides a single-file JSON-LD format for Machine Learning (ML) datasets that contains information about data sources, data structure and relevant additional metadata. The standardized format aims to improve the discoverability, accessibility, and interoperability of ML datasets. In this notebook we'll demonstrate using an example croissant file (linked to a dataset from the UKCEH Environment Information Data Centre (EIDC)) in an ML-pipeline.

In [1]:
# Installing necessary libraries
%%capture --no-display
# Install mlcroissant from the source
!apt-get install -y python3-dev graphviz libgraphviz-dev pkg-config
!pip install "git+https://github.com/${GITHUB_REPOSITORY:-mlcommons/croissant}.git@${GITHUB_HEAD_REF:-main}#subdirectory=python/mlcroissant&egg=mlcroissant[dev]"
!pip install array_record
!pip install tfds-nightly
!pip install tensorflow
!pip install torch
!apt-get install tree

UsageError: Line magic function `%%capture` not found.


In [4]:
# Importing necessary libraries
from mlcroissant import Dataset
import tensorflow_datasets as tfds
import torch
from tqdm import tqdm
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score

ModuleNotFoundError: No module named 'torch'

## Loading the data

Currently the underlying data described in the croissant file can be loaded directly using either the [mlcroissant](https://github.com/mlcommons/croissant/tree/main/python/mlcroissant) python library or the [tensorflow croissant builder](https://www.tensorflow.org/datasets/format_specific_dataset_builders#croissantbuilder). Here we'll demonstrate both.

In [3]:
# Load the dataset from the croissant file using mlcroissant
croissant_file_path = "../../croissantSpikeZip.json" #"../../croissantSpikeZip.json"
dataset = Dataset(jsonld=croissant_file_path)  # Use mlc.Dataset to parse Croissant metadata
metadata = dataset.metadata.to_json() # Convert the metadata to a JSON object
records = dataset.records(record_set="rs-abberfraw") # Extract records from the dataset
print(metadata['description']) # Display the description of the dataset
df = pd.DataFrame(records) # Convert the records to a pandas dataframe 
print(df.dtypes) # Display the datatypes of the columns
df[:5] # Display the first 5 records

This data contains values of bare sand area, modelled wind speed, aspect and slope at a 2.5 m spatial resolution for four UK coastal dune fields, Abberfraw (Wales), Ainsdale (England), Morfa Dyffryn (Wales), Penhale (England). Data is stored as a .csv file. Data is available for 620,756.25 m2 of dune at Abberfraw, 550,962.5 m2 of dune at Ainsdale, 1,797,756.25 m2 of dune at Morfa Dyffryn and 2,275,056.25 m2 of dune at Penhale. All values were calculated from aerial imagery and digital terrain models collected between 2014 and 2016.
id                int64
X               float64
Y               float64
Aspect          float64
Slope           float64
WindSpeed       float64
BareSand_it1    float64
dtype: object


Unnamed: 0,id,X,Y,Aspect,Slope,WindSpeed,BareSand_it1
0,91619,235341.25,368183.75,147.39559,5.947186,1.552845,34.0
1,91620,235341.25,368181.25,183.392563,7.696039,1.610589,40.0
2,91621,235341.25,368178.75,174.296432,5.17079,1.56776,16.0
3,91622,235341.25,368176.25,264.810909,2.708107,1.461571,16.0
4,91623,235341.25,368173.75,172.195933,11.810599,1.431683,4.0


In [2]:
# Load the dataset from the croissant file using tensorflow custom builder
builder = tfds.core.dataset_builders.CroissantBuilder(
    jsonld="../../croissantSpikeZip.json",
    record_set_ids=["rs-abberfraw"],
    file_format='array_record',
    data_dir="../../mlworkflow-examples/default_examples/ukceh/data/croissant_ukceh",
)
print(f"Dataset's description:\n{builder.info.description}\n")
print(f"Dataset's citation:\n{builder.info.citation}\n")
print(f"Dataset's features:\n{builder.info.features}")

builder.download_and_prepare() # Download and prepare the dataset
train,test = builder.as_data_source(split=['default[:75%]','default[75%:]'])

print(f"Train dataset size: {len(train)}")
print(f"Test dataset size: {len(test)}")

for i in range(5):
  print(train[i])

Dataset's description:
This data contains values of bare sand area, modelled wind speed, aspect and slope at a 2.5 m spatial resolution for four UK coastal dune fields, Abberfraw (Wales), Ainsdale (England), Morfa Dyffryn (Wales), Penhale (England). Data is stored as a .csv file. Data is available for 620,756.25 m2 of dune at Abberfraw, 550,962.5 m2 of dune at Ainsdale, 1,797,756.25 m2 of dune at Morfa Dyffryn and 2,275,056.25 m2 of dune at Penhale. All values were calculated from aerial imagery and digital terrain models collected between 2014 and 2016.

Dataset's citation:
Smyth, T.A.G. (2022). Bare sand, wind speed, aspect and slope at four English and Welsh coastal sand dunes, 2014-2016. NERC EDS Environmental Information Data Centre. https://doi.org/10.5285/972599af-0cc3-4e0e-a4dc-2fab7a6dfc85

Dataset's features:
FeaturesDict({
    'Aspect': float32,
    'BareSand_it1': float32,
    'Slope': float32,
    'WindSpeed': float32,
    'X': float32,
    'Y': float32,
    'id': int64,
}

usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                             [--project PROJECT] [--job_name JOB_NAME]
                             [--staging_location STAGING_LOCATION]
                             [--temp_location TEMP_LOCATION] [--region REGION]
                             [--service_account_email SERVICE_ACCOUNT_EMAIL]
                             [--no_auth]
                             [--template_location TEMPLATE_LOCATION]
                             [--label LABELS] [--update]
                             [--transform_name_mapping TRANSFORM_NAME_MAPPING]
                             [--enable_streaming_engine]
                             [--dataflow_kms_key DATAFLOW_KMS_KEY]
                             [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                             [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                             [--dataflow_service_option DATAFLOW_SERVICE_OPTIONS]
                           

SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## Machine Learning Pipelines

The focus here is simply to demonstrate that after loading the data from the croissant file it can then easily be put through various ML frameworks. To highlight this we show a scikit-learn pipeline that ingests a pandas dataframe and we show a PyTorch pipeline that ingests the data source provided by the tensorflow croissant builder. It is noted that the specific details of the model and inputs/outputs are not significant and aren't related to a sensible scientific question.

### sci-kit learn Pipeline

The provided code defines and trains a neural network model to predict the proportion of sand in an image from the wind speed, degree of slope and aspect. The multi-layer perceptron regressor from scikit-learn is used. 

In [None]:
# Define features and target variable
X = df[["WindSpeed", "Aspect", "Slope"]]
y = df["BareSand_it1"]

# Split dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a simple regression model
model = MLPRegressor(hidden_layer_sizes=(64, 32), activation='relu', solver='adam', max_iter=500, random_state=42)
model.fit(X_train, y_train)

# Make predictions
y_train_pred = model.predict(X_train)  # Predictions on training set
y_test_pred = model.predict(X_test)  # Predictions on test set

# Evaluate the model
mse_train = mean_squared_error(y_train, y_train_pred)
r2_train = r2_score(y_train, y_train_pred)
mse_test = mean_squared_error(y_test, y_test_pred)
r2_test = r2_score(y_test, y_test_pred)

print(f"Training Set - Mean Squared Error: {mse_train}")
print(f"Training Set - R-squared Value: {r2_train}")
print(f"Test Set - Mean Squared Error: {mse_test}")
print(f"Test Set - R-squared Value: {r2_test}")

The MSE and R-squared performance metrics are poor, which is as expected as we're not trying to demonstrate sensible scientific research but just how the data can be loaded an put through a ML pipeline using the Croissant format. 

## PyTorch Pipeline

The provided code defines and trains a neural network model using PyTorch for a regression task on tabular data.

First we set-up data loaders, which allow us to define batches and the number of samples that will be processed together in one forward and backward pass through the model. In this case, the batch size is set to 128. The RandomSampler is used to randomly sample elements from the training dataset. The num_samples parameter is set to the length of the training dataset, ensuring that all samples are included in each epoch.

For the test set no sampler is specified (sampler=None), so the data will be loaded sequentially. The batch_size parameter is also set to 128.

In [None]:
batch_size = 128 
train_sampler = torch.utils.data.RandomSampler(train, num_samples=len(train)) 
train_loader = torch.utils.data.DataLoader( 
    train,
    sampler=train_sampler,
    batch_size=batch_size,
)
test_loader = torch.utils.data.DataLoader(
    test,
    sampler=None,
    batch_size=batch_size,
)

for i, batch in enumerate(train_loader):
  print(batch)
  break

We'll use the TabularRegressor class to define a neural network with two hidden layers. The first hidden layer has 64 neurons, and the second hidden layer has 32 neurons. Both layers use the ReLU activation function. The final layer is a linear layer that outputs a single value for regression.

The forward method defines the forward pass of the model, where the input features are passed through the hidden layers and activation functions, and finally through the regression layer to produce the output.

In [None]:
# Defining the model

class TabularRegressor(torch.nn.Module): # Define a simple feedforward neural network
    def __init__(self, input_dim): # Define the model's architecture
        super(TabularRegressor, self).__init__()  # Call the parent class's constructor
        self.hidden1 = torch.nn.Linear(input_dim, 64) # Define the first hidden layer
        self.hidden2 = torch.nn.Linear(64, 32) # Define the second hidden layer
        self.relu = torch.nn.ReLU() # Define the activation function
        self.regressor = torch.nn.Linear(32, 1) # Define the output layer

    def forward(self, features): # Define the forward pass
        x = self.hidden1(features) # Pass the input through the first hidden layer
        x = self.relu(x) # Apply the activation function
        x = self.hidden2(x) # Pass the input through the second hidden layer
        x = self.relu(x) # Apply the activation function
        x = self.regressor(x) # Pass the input through the output layer
        return x # Return the output

# Extract feature names and target name
feature_names = ['Aspect', 'Slope', 'WindSpeed']
target_name = 'BareSand_it1'

The model is initialized and trained on 5 epochs of the training data. Predictions are then made on the test set and the MSE and R-squared metrics are computed for both sets of data.   

In [None]:
# Model initialization
input_dim = len(feature_names) # Number of input features
model = TabularRegressor(input_dim) # Initialize the model
optimizer = torch.optim.Adam(model.parameters()) # Initialize the optimizer
loss_function = torch.nn.MSELoss() # Initialize the loss function

num_epochs = 5  # number of times the training loop iterates over the whole training data

print('Training...')
model.train() # Set the model to training mode
all_train_targets = []
all_train_predictions = []
for epoch in range(num_epochs):
    epoch_loss = 0
    for example in tqdm(train_loader): #  training data is loaded in batches using train_loader
        features = torch.stack([example[feature] for feature in feature_names], dim=1).float()
        target = example[target_name].unsqueeze(dim=1).float()
        prediction = model(features) # input features are passed through the model to obtain predictions.
        loss = loss_function(prediction, target) # loss calculated using the MSE loss function
        optimizer.zero_grad()
        loss.backward() # gradients are computed using backpropagation
        optimizer.step() # model parameters are updated using the optimizer
        epoch_loss += loss.item() 
        all_train_targets.extend(target.squeeze().tolist())
        all_train_predictions.extend(prediction.squeeze().tolist())
    
    epoch_loss /= len(train_loader) # loss for each epoch is calculated
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}') 

train_r2 = r2_score(all_train_targets, all_train_predictions)
print(f'Training R-squared: {train_r2:.4f}')

print('Testing...') # Testing the model on the test data
model.eval() # Set the model to evaluation mode
total_loss = 0 
num_examples = 0
all_targets = []
all_predictions = []
for example in tqdm(test_loader): # test data is loaded in batches using test_loader
    features = torch.stack([example[feature] for feature in feature_names], dim=1).float() 
    target = example[target_name].unsqueeze(dim=1).float()  
    prediction = model(features) # input features are passed through the model to obtain predictions. 
    loss = loss_function(prediction, target) # loss calculated using the MSE loss function
    total_loss += loss.item() * features.shape[0]  
    num_examples += features.shape[0] 
    all_targets.extend(target.squeeze().tolist()) 
    all_predictions.extend(prediction.squeeze().tolist())

mean_squared_error = total_loss / num_examples
r2 = r2_score(all_targets, all_predictions)
print(f'\nMean Squared Error: {mean_squared_error:.4f}')
print(f'R-squared: {r2:.4f}')

The MSE and R-squared performance metrics are poor, which is as expected as we're not trying to demonstrate sensible scientific research but just how the data can be loaded an put through a ML pipeline using the Croissant format. 