In [1]:
import numpy as np
import os
from glob import glob
from tqdm import tqdm
import psutil

In [2]:

def check_memory_usage(data_size):
    """Check if there's enough memory to process the data."""
    available_memory = psutil.virtual_memory().available
    return available_memory > data_size * 2  # Factor of 2 for safety margin


In [3]:

def get_total_size(file_list):
    """Calculate total size of all arrays in the first file to estimate memory requirements."""
    with np.load(file_list[0], allow_pickle=True) as data:
        return sum(arr.nbytes for arr in data.values())



In [4]:
def merge_npz_files_batch(file_list, output_file, batch_size=5):
    """
    Merge multiple .npz files using a batch approach to balance speed and memory usage.
    
    Parameters:
    - file_list: List of paths to .npz files
    - output_file: Path to save the final merged .npz file
    - batch_size: Number of files to process at once
    """
    if not file_list:
        raise ValueError("No files provided to merge")
    
    # Estimate memory requirements
    single_file_size = get_total_size(file_list[0])
    estimated_batch_size = single_file_size * batch_size
    
    # Adjust batch size based on available memory
    while not check_memory_usage(estimated_batch_size) and batch_size > 1:
        batch_size = max(1, batch_size - 1)
        estimated_batch_size = single_file_size * batch_size
    
    print(f"Using batch size of {batch_size} files")
    
    # Initialize with first file to get the structure
    with np.load(file_list[0], allow_pickle=True) as first_file:
        merged_data = {key: first_file[key] for key in first_file.files}
    
    # Process remaining files in batches
    remaining_files = file_list[1:]
    for i in tqdm(range(0, len(remaining_files), batch_size)):
        batch_files = remaining_files[i:i + batch_size]
        batch_data = []
        
        # Load batch into memory
        for file in batch_files:
            with np.load(file, allow_pickle=True) as data:
                batch_data.append({key: data[key] for key in data.files})
        
        # Merge batch with existing data
        for key in merged_data:
            arrays_to_concat = [merged_data[key]] + [data[key] for data in batch_data]
            merged_data[key] = np.concatenate(arrays_to_concat, axis=0)
        
        # Clear batch data from memory
        batch_data.clear()
    
    # Save final result
    print(f"Saving merged data to {output_file}")
    np.savez_compressed(output_file, **merged_data)
    print("Merge complete!")



In [5]:
# Example usage
def merge_files(input_path, output_path):
    """
    Wrapper function to handle the merging process with error handling.
    
    Parameters:
    - input_path: Path pattern to find .npz files
    - output_path: Path to save the merged file
    """
    try:
        file_list = sorted(glob(input_path))
        if not file_list:
            raise ValueError(f"No .npz files found matching pattern: {input_path}")
        
        print(f"Found {len(file_list)} files to merge")
        merge_npz_files_batch(file_list, output_path)
        
    except Exception as e:
        print(f"Error during merge: {str(e)}")
        raise

In [8]:
input_path = "E:/L2RPN/Dreamer_V3_Implimentation/data-20241123T131314Z-001/data/*.npz" # Adjust path as needed
output_path = 'E:/L2RPN/Dreamer_V3_Implimentation/merged_data.npz'

In [9]:
merge_files(input_path,output_path)

Found 683 files to merge
Error during merge: [Errno 2] No such file or directory: 'E'


FileNotFoundError: [Errno 2] No such file or directory: 'E'