In [None]:
# 1_preprocess_data.ipynb
#
# This notebook preprocesses the Cylinder Flow Dataset with memory optimizations:
# - Loads the data from MinIO
# - Extracts velocity fields
# - Creates the snapshot matrix
# - Performs mean subtraction
# - Normalizes the data if needed
# - Uploads processed data back to MinIO

import os
import numpy as np
import h5py
import matplotlib.pyplot as plt
import json
from datetime import datetime
import boto3
from botocore.client import Config
import tempfile
import io
import gc  # For garbage collection
import psutil  # For memory tracking

# Memory management flags
MEMORY_EFFICIENT = True
CHUNK_SIZE = 10  # Process this many snapshots at a time
ENABLE_MEMORY_TRACKING = True
DOWNSAMPLE = False
DOWNSAMPLE_FACTOR = 2  # Only use every Nth point in grid

# Thread control to limit memory usage
os.environ["OMP_NUM_THREADS"] = "2"
os.environ["MKL_NUM_THREADS"] = "2"
os.environ["NUMEXPR_MAX_THREADS"] = "2"

# Function to track memory usage
def print_memory_usage(label=""):
    if ENABLE_MEMORY_TRACKING:
        process = psutil.Process(os.getpid())
        memory_mb = process.memory_info().rss / 1024 / 1024
        print(f"Memory usage {label}: {memory_mb:.1f} MB")

print_memory_usage("at start")

# Function to convert NumPy types to Python native types for JSON serialization
def json_serialize(obj):
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, tuple) and hasattr(obj, '_asdict'):
        # Handle named tuples
        return obj._asdict()
    elif isinstance(obj, tuple):
        return list(obj)
    else:
        return obj

def save_dict_to_json(data_dict, filepath):
    """Save dictionary to JSON with NumPy value conversion"""
    # Convert data to JSON-serializable types
    serializable_dict = {}
    for key, value in data_dict.items():
        serializable_dict[key] = json_serialize(value)
    
    # Save to file
    with open(filepath, 'w') as f:
        json.dump(serializable_dict, f, indent=2)

# Create a temporary local directory for processing
temp_dir = tempfile.mkdtemp()
print(f"Using temporary directory: {temp_dir}")

# Connect to MinIO
print("Connecting to MinIO...")
s3_endpoint = os.environ.get('S3_ENDPOINT', 'http://minio:9000')

# Fix the endpoint URL if the protocol is missing
if s3_endpoint and not s3_endpoint.startswith(('http://', 'https://')):
    s3_endpoint = f"http://{s3_endpoint}"
    print(f"Adding http:// prefix to endpoint: {s3_endpoint}")

s3_access_key = os.environ.get('AWS_ACCESS_KEY_ID', 'minioadmin')
s3_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', 'minioadmin')

s3 = boto3.client('s3',
                  endpoint_url=s3_endpoint,
                  aws_access_key_id=s3_access_key,
                  aws_secret_access_key=s3_secret_key,
                  config=Config(signature_version='s3v4'))

# Load parameters from previous step
MINIO_BUCKET = 'rom-data'
MINIO_OUTPUT_PREFIX = 'rom-pipeline/outputs'

# Download parameters file from MinIO
try:
    params_key = f"{MINIO_OUTPUT_PREFIX}/params.json"
    params_path = os.path.join(temp_dir, 'params.json')
    s3.download_file(MINIO_BUCKET, params_key, params_path)
    
    with open(params_path, 'r') as f:
        params = json.load(f)
        
    print(f"Loaded parameters successfully")
except Exception as e:
    print(f"Error loading parameters, using defaults: {str(e)}")
    params = {
        "dataset_name": "cylinder",
        "minio_bucket": MINIO_BUCKET,
        "minio_output_prefix": MINIO_OUTPUT_PREFIX
    }

# Update parameters for this step
params.update({
    "preprocessing_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "preprocessing_options": {
        "mean_subtraction": True,
        "normalization": True,
        "memory_efficient": MEMORY_EFFICIENT,
        "chunk_size": CHUNK_SIZE,
        "downsample": DOWNSAMPLE,
        "downsample_factor": DOWNSAMPLE_FACTOR if DOWNSAMPLE else None
    }
})

# Verify the previous step completed
try:
    marker_key = f"{MINIO_OUTPUT_PREFIX}/data_download_completed.txt"
    s3.head_object(Bucket=MINIO_BUCKET, Key=marker_key)
    print("Previous step (data fetching) completed successfully")
except Exception as e:
    print(f"Warning: Previous step completion marker not found: {str(e)}")
    print("Continuing anyway...")

# List objects in the data directory to check what's available
print("\nListing files in MinIO data directory:")
data_prefix = f"{MINIO_OUTPUT_PREFIX}/data/"
response = s3.list_objects_v2(Bucket=MINIO_BUCKET, Prefix=data_prefix)

# Print found objects
if 'Contents' in response:
    print(f"Found {len(response['Contents'])} objects in {MINIO_BUCKET}/{data_prefix}:")
    for obj in response['Contents']:
        print(f"  - {obj['Key']} ({obj['Size']} bytes)")
else:
    print(f"No objects found in {MINIO_BUCKET}/{data_prefix}")
    # Continue with sample data creation

if 'Contents' not in response or len(response.get('Contents', [])) == 0:
    print(f"Warning: No data files found in {MINIO_BUCKET}/{data_prefix}")
    print("Creating sample data since none was found...")
    
    # Create a sample dataset for testing (same code as in fetch_data notebook)
    n_snapshots = 50
    n_x, n_y = 100, 50
    
    # Create the velocity fields
    velocity = np.zeros((n_snapshots, n_x, n_y, 2))
    
    # Add a simple flow pattern
    x = np.linspace(0, 10, n_x)
    y = np.linspace(-2.5, 2.5, n_y)
    X, Y = np.meshgrid(x, y, indexing='ij')
    
    # Generate time-varying flow
    for t in range(n_snapshots):
        # Base flow from left to right
        u = np.ones_like(X) * 1.0
        v = np.zeros_like(Y)
        
        # Add cylinder at x=2
        cylinder_x, cylinder_y = 2, 0
        cylinder_radius = 0.5
        distance = np.sqrt((X - cylinder_x)**2 + (Y - cylinder_y)**2)
        mask = distance < cylinder_radius
        u[mask] = 0
        v[mask] = 0
        
        # Add oscillating wake
        wake_mask = (X > cylinder_x) & (distance > cylinder_radius)
        phase = 0.2 * t
        v[wake_mask] = 0.3 * np.sin(0.5 * (X[wake_mask] - cylinder_x) + phase) * np.exp(-(Y[wake_mask]**2) / 0.5)
        
        # Add some random noise
        u += 0.02 * np.random.randn(*u.shape)
        v += 0.02 * np.random.randn(*v.shape)
        
        # Store u and v components
        velocity[t, :, :, 0] = u
        velocity[t, :, :, 1] = v
    
    # Save to HDF5 file
    sample_file = os.path.join(temp_dir, 'cylinder_flow.h5')
    with h5py.File(sample_file, 'w') as f:
        f.create_dataset('velocity', data=velocity)
        f.create_dataset('x', data=x)
        f.create_dataset('y', data=y)
        f.create_dataset('time', data=np.linspace(0, 10, n_snapshots))
    
    print(f"Created sample dataset at {sample_file}")
    
    # Upload to MinIO
    s3_key = f"{MINIO_OUTPUT_PREFIX}/data/cylinder_flow.h5"
    s3.upload_file(sample_file, MINIO_BUCKET, s3_key)
    print(f"Uploaded sample dataset to {s3_key}")
    
    # Refresh the response
    response = s3.list_objects_v2(Bucket=MINIO_BUCKET, Prefix=data_prefix)

# Download data files from MinIO
print("\nDownloading data files from MinIO:")
for obj in response.get('Contents', []):
    if obj['Key'].endswith(('.h5', '.hdf5')):
        filename = os.path.basename(obj['Key'])
        local_path = os.path.join(temp_dir, filename)
        
        print(f"  Downloading {obj['Key']} to {local_path}")
        s3.download_file(MINIO_BUCKET, obj['Key'], local_path)

print_memory_usage("after downloading data")

# Find HDF5 files
h5_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) 
            if f.endswith('.h5') or f.endswith('.hdf5')]

if not h5_files:
    raise FileNotFoundError(f"No HDF5 files found in downloaded data")

print(f"Found {len(h5_files)} HDF5 files: {[os.path.basename(f) for f in h5_files]}")

# Load data from the first file
h5_file = h5_files[0]
print(f"Loading data from {os.path.basename(h5_file)}")

try:
    with h5py.File(h5_file, 'r') as f:
        # Print available datasets
        print("Available datasets:")
        keys = []
        
        def print_structure(name, obj):
            print(f"  - {name}: {type(obj).__name__}", end="")
            if isinstance(obj, h5py.Dataset):
                print(f", Shape: {obj.shape}, Dtype: {obj.dtype}")
            else:
                print("")
            keys.append(name)
            
        f.visititems(print_structure)
            
        # Try to find velocity data by identifying the largest dataset
        datasets = [(name, f[name]) for name in keys if isinstance(f[name], h5py.Dataset)]
        
        # Sort datasets by number of dimensions and then by size
        datasets.sort(key=lambda x: (-len(x[1].shape), -np.prod(x[1].shape)))
        
        if not datasets:
            raise ValueError("No datasets found in the HDF5 file")
        
        print("\nPotential data candidates (sorted by size):")
        for name, dataset in datasets:
            print(f"  - {name}: Shape {dataset.shape}, Size {np.prod(dataset.shape)}")
        
        # Use the largest dataset as our velocity data
        velocity_key = datasets[0][0]
        print(f"\nUsing '{velocity_key}' as velocity data")
        
        # Check data shape before loading to estimate memory usage
        data_shape = f[velocity_key].shape
        data_size_bytes = np.prod(data_shape) * np.dtype(f[velocity_key].dtype).itemsize
        data_size_mb = data_size_bytes / (1024 * 1024)
        print(f"Data shape: {data_shape}, Estimated memory required: {data_size_mb:.1f} MB")
        
        # If dataset is large, don't load it all at once
        if MEMORY_EFFICIENT and data_size_mb > 1000:  # More than 1GB
            print("Dataset is large - will process in chunks later")
            data = None
            # Just store the shape and dtype for later chunked processing
            data_dtype = f[velocity_key].dtype
            
            # Try to identify coordinate data if available
            coord_data = {}
            coord_keys = ["x", "y", "z", "X", "Y", "Z", "coord", "coords", "coordinates", "grid"]
            for key in coord_keys:
                if key in f:
                    coord_data[key] = f[key][:]
                    print(f"Found coordinates '{key}': {coord_data[key].shape}")
            
            # If we have time data, save it
            if "time" in f:
                time_data = f["time"][:]
                print(f"Found time data: {time_data.shape}")
            elif "t" in f:
                time_data = f["t"][:]
                print(f"Found time data: {time_data.shape}")
            else:
                time_data = None
        else:
            # Load full dataset if it's small enough
            data = f[velocity_key][:]
            
            # Optionally downsample the data to reduce memory usage
            if DOWNSAMPLE and len(data_shape) >= 3:
                if len(data_shape) == 4:  # (time, x, y, components)
                    print(f"Downsampling data by factor of {DOWNSAMPLE_FACTOR}")
                    data = data[:, ::DOWNSAMPLE_FACTOR, ::DOWNSAMPLE_FACTOR, :]
                    print(f"Shape reduced from {data_shape} to {data.shape}")
                
                elif len(data_shape) == 3:  # (time, x, y) or similar
                    print(f"Downsampling data by factor of {DOWNSAMPLE_FACTOR}")
                    data = data[:, ::DOWNSAMPLE_FACTOR, ::DOWNSAMPLE_FACTOR]
                    print(f"Shape reduced from {data_shape} to {data.shape}")
            
            print(f"Data successfully loaded")
            
            # Try to identify coordinate data if available
            coord_data = {}
            coord_keys = ["x", "y", "z", "X", "Y", "Z", "coord", "coords", "coordinates", "grid"]
            for key in coord_keys:
                if key in f:
                    coord_data[key] = f[key][:]
                    print(f"Found coordinates '{key}': {coord_data[key].shape}")
            
            # If we have time data, save it
            if "time" in f:
                time_data = f["time"][:]
                print(f"Found time data: {time_data.shape}")
            elif "t" in f:
                time_data = f["t"][:]
                print(f"Found time data: {time_data.shape}")
            else:
                time_data = None
        
except Exception as e:
    print(f"Error loading data: {str(e)}")
    raise

print_memory_usage("after loading data")

# Determine data dimensions and structure based on the shape
if data is None:
    # Working with chunks
    shape = data_shape
else:
    shape = data.shape

# Different cases based on data dimensionality
if len(shape) == 2:  # (time, points) or (points, time)
    if shape[0] < shape[1]:  # Likely (time, points)
        n_snapshots = shape[0]
        n_points = shape[1]
        n_dims = 1
        structured_grid = False
        print(f"Detected (time, points) format: {n_snapshots} snapshots, {n_points} points, {n_dims} dimensions")
    else:  # Likely (points, time)
        n_snapshots = shape[1]
        n_points = shape[0]
        n_dims = 1
        structured_grid = False
        print(f"Detected (points, time) format: {n_snapshots} snapshots, {n_points} points, {n_dims} dimensions")

elif len(shape) == 3:
    if shape[0] < shape[1] and shape[0] < shape[2]:  # Likely (time, x, y)
        n_snapshots = shape[0]
        grid_shape = (shape[1], shape[2])
        n_points = np.prod(grid_shape)
        n_dims = 1
        structured_grid = True
        print(f"Detected (time, x, y) format: {n_snapshots} snapshots, grid shape {grid_shape}, {n_dims} dimensions")
    
    elif shape[2] < shape[0] and shape[2] < shape[1]:  # Likely (x, y, time)
        n_snapshots = shape[2]
        grid_shape = (shape[0], shape[1])
        n_points = np.prod(grid_shape)
        n_dims = 1
        structured_grid = True
        print(f"Detected (x, y, time) format: {n_snapshots} snapshots, grid shape {grid_shape}, {n_dims} dimensions")
    
    elif shape[1] == 2 or shape[1] == 3:  # Likely (points, dimensions, time) 
        n_snapshots = shape[2]
        n_points = shape[0]
        n_dims = shape[1]
        structured_grid = False
        print(f"Detected (points, dimensions, time) format: {n_snapshots} snapshots, {n_points} points, {n_dims} dimensions")
    
    elif shape[2] == 2 or shape[2] == 3:  # Likely (time, points, dimensions)
        n_snapshots = shape[0]
        n_points = shape[1]
        n_dims = shape[2]
        structured_grid = False
        print(f"Detected (time, points, dimensions) format: {n_snapshots} snapshots, {n_points} points, {n_dims} dimensions")
    
    else:
        raise ValueError(f"Could not determine data structure from shape {shape}")

elif len(shape) == 4:  # Likely (time, x, y, components) 
    if shape[0] < shape[1] and shape[0] < shape[2]:  # Typical for time series
        n_snapshots = shape[0]
        grid_shape = (shape[1], shape[2])
        n_points = np.prod(grid_shape)
        n_dims = shape[3]
        structured_grid = True
        print(f"Detected (time, x, y, components) format: {n_snapshots} snapshots, grid shape {grid_shape}, {n_dims} dimensions")
    else:
        raise ValueError(f"Could not determine data structure from shape {shape}")
else:
    raise ValueError(f"Unsupported data shape: {shape}")

# Create snapshot matrix and process mean/fluctuations
if MEMORY_EFFICIENT and data is None:
    print(f"Processing data in chunks of {CHUNK_SIZE} snapshots to save memory")
    # Initialize arrays for the final results
    snapshot_matrix = np.zeros((n_points * n_dims, n_snapshots))
    mean_flow = np.zeros((n_points * n_dims, 1))
    
    # Process data in chunks
    with h5py.File(h5_file, 'r') as f:
        # We'll compute the mean in a streaming fashion
        chunk_means = []
        chunk_weights = []

        for chunk_start in range(0, n_snapshots, CHUNK_SIZE):
            chunk_end = min(chunk_start + CHUNK_SIZE, n_snapshots)
            chunk_size = chunk_end - chunk_start
            print(f"Processing chunk {chunk_start//CHUNK_SIZE + 1} of {(n_snapshots-1)//CHUNK_SIZE + 1}: snapshots {chunk_start+1}-{chunk_end}")
            
            # Load this chunk of data
            if len(shape) == 4:  # (time, x, y, components)
                chunk_data = f[velocity_key][chunk_start:chunk_end]
                # Reshape to (points*dims, chunk_size)
                chunk_reshaped = np.zeros((n_points * n_dims, chunk_size))
                for i in range(chunk_size):
                    chunk_reshaped[:, i] = chunk_data[i].reshape(n_points, n_dims).flatten()
            else:
                # Handle other dimensionalities similarly
                # This is a placeholder - add specific handling for other formats
                raise NotImplementedError(f"Chunked processing not implemented for shape {shape}")
            
            # Copy to snapshot matrix
            snapshot_matrix[:, chunk_start:chunk_end] = chunk_reshaped
            
            # Compute chunk mean for streaming mean calculation
            chunk_mean = np.mean(chunk_reshaped, axis=1, keepdims=True)
            chunk_means.append(chunk_mean)
            chunk_weights.append(chunk_size)
            
            # Free memory
            del chunk_data
            del chunk_reshaped
            gc.collect()
            print_memory_usage(f"after processing chunk {chunk_start//CHUNK_SIZE + 1}")
        
        # Compute global mean from chunk means
        total_weight = sum(chunk_weights)
        for mean, weight in zip(chunk_means, chunk_weights):
            mean_flow += (mean * weight / total_weight)
        
        # Free memory
        del chunk_means
        del chunk_weights
        gc.collect()
    
    # Compute fluctuations
    print("Computing fluctuations...")
    if params["preprocessing_options"]["mean_subtraction"]:
        # Process fluctuations in chunks to save memory
        fluctuations = np.zeros_like(snapshot_matrix)
        for chunk_start in range(0, n_snapshots, CHUNK_SIZE):
            chunk_end = min(chunk_start + CHUNK_SIZE, n_snapshots)
            print(f"Computing fluctuations for chunk {chunk_start//CHUNK_SIZE + 1}")
            fluctuations[:, chunk_start:chunk_end] = snapshot_matrix[:, chunk_start:chunk_end] - mean_flow
        print("Mean subtraction applied")
    else:
        fluctuations = snapshot_matrix
        print("Mean subtraction skipped")
    
    # Normalize the data if needed
    if params["preprocessing_options"]["normalization"]:
        # Compute norm in chunks
        print("Computing Frobenius norm...")
        frob_norm_sq = 0
        for chunk_start in range(0, n_snapshots, CHUNK_SIZE):
            chunk_end = min(chunk_start + CHUNK_SIZE, n_snapshots)
            chunk = fluctuations[:, chunk_start:chunk_end]
            frob_norm_sq += np.sum(chunk**2)
        
        frob_norm = np.sqrt(frob_norm_sq)
        print(f"Normalizing data (Frobenius norm: {frob_norm:.4f})...")
        
        # Normalize in chunks
        for chunk_start in range(0, n_snapshots, CHUNK_SIZE):
            chunk_end = min(chunk_start + CHUNK_SIZE, n_snapshots)
            fluctuations[:, chunk_start:chunk_end] /= frob_norm
        
        print(f"Normalization applied")
        
        # Save the normalization factor for later use
        params["preprocessing_options"]["normalization_factor"] = float(frob_norm)
    else:
        print("Normalization skipped")
    
else:
    # Original code for small datasets or if MEMORY_EFFICIENT is False
    # Create snapshot matrix - reshape data into a 2D matrix
    # For POD, we need a matrix of shape (n_points*n_dims, n_snapshots)
    if len(shape) == 2:
        if shape[0] < shape[1]:  # (time, points)
            snapshot_matrix = data.T
        else:  # (points, time)
            snapshot_matrix = data
    elif len(shape) == 3:
        if shape[0] < shape[1] and shape[0] < shape[2]:  # (time, x, y)
            snapshot_matrix = np.zeros((n_points, n_snapshots))
            for i in range(n_snapshots):
                snapshot_matrix[:, i] = data[i].flatten()
        elif shape[2] < shape[0] and shape[2] < shape[1]:  # (x, y, time)
            snapshot_matrix = np.zeros((n_points, n_snapshots))
            for i in range(n_snapshots):
                snapshot_matrix[:, i] = data[:, :, i].flatten()
        elif shape[1] == 2 or shape[1] == 3:  # (points, dimensions, time)
            snapshot_matrix = np.zeros((n_points * n_dims, n_snapshots))
            for i in range(n_snapshots):
                snapshot_matrix[:, i] = data[:, :, i].flatten()
        elif shape[2] == 2 or shape[2] == 3:  # (time, points, dimensions)
            snapshot_matrix = np.zeros((n_points * n_dims, n_snapshots))
            for i in range(n_snapshots):
                snapshot_matrix[:, i] = data[i].flatten()
    elif len(shape) == 4:  # (time, x, y, components)
        snapshot_matrix = np.zeros((n_points * n_dims, n_snapshots))
        for i in range(n_snapshots):
            snapshot_matrix[:, i] = data[i].reshape(n_points, n_dims).flatten()
    
    print(f"Created snapshot matrix with shape: {snapshot_matrix.shape}")
    print_memory_usage("after creating snapshot matrix")
    
    # Compute mean flow
    mean_flow = np.mean(snapshot_matrix, axis=1, keepdims=True)
    print(f"Mean flow shape: {mean_flow.shape}")
    
    # Subtract mean (centering the data)
    if params["preprocessing_options"]["mean_subtraction"]:
        fluctuations = snapshot_matrix - mean_flow
        print("Mean subtraction applied")
    else:
        fluctuations = snapshot_matrix
        print("Mean subtraction skipped")
    
    # Normalize the data if needed
    if params["preprocessing_options"]["normalization"]:
        # Compute the Frobenius norm of the fluctuation matrix
        frob_norm = np.linalg.norm(fluctuations)
        # Normalize
        fluctuations = fluctuations / frob_norm
        print(f"Normalization applied (Frobenius norm: {frob_norm:.4f})")
        
        # Save the normalization factor for later use
        params["preprocessing_options"]["normalization_factor"] = float(frob_norm)
    else:
        print("Normalization skipped")

print_memory_usage("after processing")

# Free memory that's not needed anymore
if data is not None:
    del data
gc.collect()
print_memory_usage("after cleanup")

# Visualize the mean flow
if structured_grid:
    plt.figure(figsize=(10, 6))
    
    if n_dims == 1:
        # Scalar field
        plt.imshow(mean_flow.reshape(grid_shape), cmap='viridis')
        plt.colorbar(label='Mean Velocity')
    else:
        # Vector field - plot magnitude
        mean_reshaped = mean_flow.reshape(grid_shape[0], grid_shape[1], n_dims)
        mean_magnitude = np.sqrt(np.sum(mean_reshaped**2, axis=2))
        plt.imshow(mean_magnitude, cmap='viridis')
        plt.colorbar(label='Mean Velocity Magnitude')
        
    plt.title('Mean Flow')
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.tight_layout()
    
    # Save visualization to local file
    mean_flow_img = os.path.join(temp_dir, 'mean_flow.png')
    plt.savefig(mean_flow_img)
    
    # Upload to MinIO
    s3.upload_file(
        mean_flow_img, 
        MINIO_BUCKET, 
        f"{MINIO_OUTPUT_PREFIX}/visualizations/mean_flow.png"
    )
    print(f"Mean flow visualization uploaded to {MINIO_BUCKET}/{MINIO_OUTPUT_PREFIX}/visualizations/mean_flow.png")

# Visualize the first fluctuation mode
if structured_grid:
    plt.figure(figsize=(10, 6))
    
    if n_dims == 1:
        # Scalar field
        plt.imshow(fluctuations[:, 0].reshape(grid_shape), cmap='RdBu_r')
        plt.colorbar(label='Velocity Fluctuation')
    else:
        # Vector field - plot magnitude
        fluct_reshaped = fluctuations[:, 0].reshape(grid_shape[0], grid_shape[1], n_dims)
        fluct_magnitude = np.sqrt(np.sum(fluct_reshaped**2, axis=2))
        plt.imshow(fluct_magnitude, cmap='viridis')
        plt.colorbar(label='Velocity Fluctuation Magnitude')
        
    plt.title('First Fluctuation Snapshot')
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.tight_layout()
    
    # Save visualization to local file
    fluct_img = os.path.join(temp_dir, 'first_fluctuation.png')
    plt.savefig(fluct_img)
    
    # Upload to MinIO
    s3.upload_file(
        fluct_img, 
        MINIO_BUCKET, 
        f"{MINIO_OUTPUT_PREFIX}/visualizations/first_fluctuation.png"
    )
    print(f"First fluctuation visualization uploaded to {MINIO_BUCKET}/{MINIO_OUTPUT_PREFIX}/visualizations/first_fluctuation.png")

# Make sure preprocessed directory exists in MinIO
try:
    s3.put_object(Bucket=MINIO_BUCKET, Key=f"{MINIO_OUTPUT_PREFIX}/preprocessed/")
except Exception as e:
    print(f"Warning: Could not create preprocessed directory: {str(e)}")

# Save preprocessed data to local files then upload to MinIO
print("Saving processed data...")

# Save snapshot matrix
snapshot_path = os.path.join(temp_dir, 'snapshot_matrix.npy')
np.save(snapshot_path, snapshot_matrix)
s3.upload_file(
    snapshot_path, 
    MINIO_BUCKET, 
    f"{MINIO_OUTPUT_PREFIX}/preprocessed/snapshot_matrix.npy"
)

# Save mean flow
mean_flow_path = os.path.join(temp_dir, 'mean_flow.npy')
np.save(mean_flow_path, mean_flow)
s3.upload_file(
    mean_flow_path, 
    MINIO_BUCKET, 
    f"{MINIO_OUTPUT_PREFIX}/preprocessed/mean_flow.npy"
)

# Save fluctuations
fluctuations_path = os.path.join(temp_dir, 'fluctuations.npy')
np.save(fluctuations_path, fluctuations)
s3.upload_file(
    fluctuations_path, 
    MINIO_BUCKET, 
    f"{MINIO_OUTPUT_PREFIX}/preprocessed/fluctuations.npy"
)

# Save metadata (convert NumPy values to Python natives for JSON serialization)
metadata = {
    "n_snapshots": int(n_snapshots),
    "n_points": int(n_points),
    "n_dims": int(n_dims),
    "structured_grid": bool(structured_grid),
    "preprocessing_params": params["preprocessing_options"]
}

if structured_grid:
    metadata["grid_shape"] = tuple(map(int, grid_shape))

metadata_path = os.path.join(temp_dir, 'metadata.json')

# Use our custom function to save with proper type conversion
save_dict_to_json(metadata, metadata_path)

s3.upload_file(
    metadata_path, 
    MINIO_BUCKET, 
    f"{MINIO_OUTPUT_PREFIX}/preprocessed/metadata.json"
)

# Update and save parameters for the next step
params["preprocessing_metadata"] = metadata
params_path = os.path.join(temp_dir, 'params.json')

# Use our custom function to save with proper type conversion
save_dict_to_json(params, params_path)

s3.upload_file(
    params_path, 
    MINIO_BUCKET, 
    f"{MINIO_OUTPUT_PREFIX}/params.json"
)

# Upload completion marker
with open(os.path.join(temp_dir, 'preprocessing_completed.txt'), 'w') as f:
    f.write("Preprocessing completed successfully")

s3.upload_file(
    os.path.join(temp_dir, 'preprocessing_completed.txt'),
    MINIO_BUCKET, 
    f"{MINIO_OUTPUT_PREFIX}/preprocessing_completed.txt"
)

print("\nPreprocessing completed successfully!")
print(f"Preprocessed data uploaded to: {MINIO_BUCKET}/{MINIO_OUTPUT_PREFIX}/preprocessed/")

# List objects in the bucket to verify uploads
print("\nVerifying uploaded files:")
response = s3.list_objects_v2(Bucket=MINIO_BUCKET, Prefix=f"{MINIO_OUTPUT_PREFIX}/preprocessed/")
if 'Contents' in response:
    for obj in response['Contents']:
        print(f"  - {obj['Key']} ({obj['Size']} bytes)")
else:
    print("No objects found in preprocessed location")

print_memory_usage("at end")