## Import

In [1]:
# General data manipulation
import numpy as np
import pandas as pd

# Data visualization
import matplotlib.pyplot as plt

# Utilities
import json
import os
import joblib

# Parallel processing
import concurrent.futures

# Google Cloud Storage
from google.cloud import storage

## File Classification and Storage

In [2]:
if os.path.exists("all_files.json") :
    with open("all_files.json", 'r') as f :
        files = json.load(f)
        # files[0]     All the files seen
        # files[1]     File paths that were successfully read and validated with alpha and bêta angle in degree
        # files[2]     File paths that were successfully read and validated with alpha and bêta angle in radian

In [3]:
# Initialize the Google Cloud Storage (GCS) client
client = storage.Client()

# Define the Google Cloud Storage folder to read files from
gcs_path = "featurestore-spinewise/"

# Extract the bucket name and path prefix
bucket_name = gcs_path.split("/")[0]
prefix = "/".join(gcs_path.split("/")[1:])
bucket = client.bucket(bucket_name)

# List all files available in the GCS path
paths = [f"gs://{bucket_name}/{blob.name}" for blob in bucket.list_blobs(prefix=prefix)]

filePaths_id = []  # List of unique file names (used to avoid duplicates)
filePaths = []  # List of corresponding full paths (gs://.../file.parquet)

# Keep only one path per file name (each file name is unique even if stored in multiple folders)
for file in paths :
    if file.endswith('.parquet') :
        file_n = file.split("/")[-1]
        if file_n not in filePaths_id :
            filePaths_id.append(file_n)
            filePaths.append(file)

# Display stats : (files to process in GCS, files already processed)
len(filePaths), len(files[0])

(12510, 12510)

In [34]:
# Attempt to read a parquet file and check it has the required columns and enough data
def read_check_parquet(i, path):
    print(i)
    df = pd.read_parquet(path)
    # Check required columns and minimum length
    if not {"alpha_angle", "beta_angle"}.issubset(df.columns) or len(df) < 11250 :
        return None
    return df

id_file = [file.split("/")[-1] for file in files[0]] # List of already seen file names (used to skip duplicates)

# Loop through all new files to validate and classify them
for i in range(len(filePaths)):
    if filePaths[i].split("/")[-1] not in id_file :
        
        # Use a thread with timeout to avoid long-loading files
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor :
            future = executor.submit(read_check_parquet, i, filePaths[i])
            try :
                res = future.result(timeout=15) # If the file loading exceed 15 sec then return None
                if res is not None :
                    max_a, min_a, max_b, min_b = max(res["alpha_angle"]), min(res["alpha_angle"]), max(res["beta_angle"]), min(res["beta_angle"])
                    if max_a > 5 or min_a < -2 or max_b > 5 or min_b < -2 : # Because here radian_angle ∈ [ 3*π/2 ; π/2 ]
                        files[1].append(filePaths[i]) # Add to degree folder
                    else : 
                        files[2].append(filePaths[i]) # Add to radian folder
                        
            except concurrent.futures.TimeoutError :
                print(f"File {i} exceeded loading time (>15s), skipped")

        # Append the file to the global list of seen files
        files[0].append(filePaths[i])
        id_file.append(filePaths[i].split("/")[-1])
        
# Final save of the file list after processing
with open("all_files.json", "w") as f:
    json.dump(files, f, indent=4)

len(filePaths), len(files[0])

(12510, 12510)

In [4]:
# It's possible to have more files in the JSON than in the GCS (if some files were removed from GCS)
file0, file1, file2 = [], [], []

for f in files[0] :
    if f in filePaths :
        file0.append(f)

for f in files[1] :
    if f in filePaths :
        file1.append(f)
        
for f in files[2] :
    if f in filePaths :
        file2.append(f)

with open("all_files.json", "w") as f:
    json.dump(files, f, indent=4)
    
len(filePaths), len(files[0])

(12510, 12510)

## How to Save and Load Models/Scalers using joblib

In [24]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# === Step 1 : Train a model and fit a scaler ===

# For exemple we take Iris dataset
X, Y = load_iris(return_X_y=True)
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_scaled, Y_train)

# === Step 2 : Save the model and the scaler ===

# Save model
joblib.dump(model, "rd_iris_model.joblib")
print("Model saved")

# Save scaler
joblib.dump(scaler, "scaler.joblib")
print("Scaler saved\n")

# === Step 3: Load the model and the scaler later for prediction ===

# Load model
loaded_model = joblib.load("rd_iris_model.joblib")
print("Model loaded")

# Load scaler
loaded_scaler = joblib.load("scaler.joblib")
print("Scaler loaded\n")

# Use loaded scaler and model to make a prediction
X_test_scaled = loaded_scaler.transform(X_test)
predictions = loaded_model.predict(X_test_scaled)
print("Example predictions :", predictions[:5])

Model saved
Scaler saved

Model loaded
Scaler loaded

Example predictions : [1 0 2 1 1]
