# Ethos
This is actually quite a fancy file concatenator, because in fact it merges, but makes sure the stat or the end of one file is similar to the state of the start of the next so that you have continuity.  It expects "Sam-style" files.

In [1]:
import os
import pandas as pd
import numpy as np
from pathlib import Path

def merge_parquet_files(input_folder, output_folder, size_limit_mb=2, min_length_mb=1, match_up=True):
    Path(output_folder).mkdir(parents=True, exist_ok=True)

    current_df = None
    file_counter = 1
    current_size_mb = 0
    saved_files = []
    last_time = 0
    dt = None
    last_channel = None

    parquet_files = sorted([f for f in os.listdir(input_folder) if f.endswith('.parquet')])

    for file in parquet_files:
        file_path = os.path.join(input_folder, file)
        df = pd.read_parquet(file_path)

        # Ensure the Time and Channels columns exist
        if "Time" not in df.columns or "Channels" not in df.columns:
            raise ValueError(f"Time or Channels column not found in file {file}")

        # Calculate dt if not already set
        if dt is None:
            dt = round(df["Time"].diff().mean(), 3)
            print(f"Calculated dt: {dt}")
        if match_up:
            # Optionally find the correct joining point
            if last_channel is not None:
                joining_index = df.index[df['Channels'] == last_channel].tolist()
                if joining_index:
                    joining_index = joining_index[0]
                    print(f"Joining point found at index {joining_index} in file {file}")
                    df = df.iloc[joining_index:]
                else:
                    print(f"Warning: No matching Channels value found in file {file}")

        # Adjust time to be continuous
        if current_df is not None:
            time_shift = last_time + dt - df["Time"].iloc[0]
            df["Time"] += time_shift

        if current_df is None:
            current_df = df
        else:
            current_df = pd.concat([current_df, df], ignore_index=True)

        last_time = current_df["Time"].iloc[-1]
        last_channel = current_df["Channels"].iloc[-1]

        # Check the size of the current DataFrame
        current_size_mb = current_df.memory_usage(deep=True).sum() / 1024 / 1024

        # If size limit is reached, save the current DataFrame and start a new one
        if current_size_mb >= size_limit_mb:
            output_file = os.path.join(output_folder, f'merged_file_{file_counter}.parquet')
            current_df.to_parquet(output_file, index=False)
            print(f"Saved {output_file} (Size: {current_size_mb:.2f} MB)")
            
            saved_files.append((output_file, current_size_mb))
            
            # Reset for the next file, but keep the last row to ensure continuity
            current_df = current_df.iloc[[-1]]
            file_counter += 1
            current_size_mb = current_df.memory_usage(deep=True).sum() / 1024 / 1024

    # Handle the last file
    if current_df is not None:
        if current_size_mb < min_length_mb and len(saved_files) > 0:
            # Append to the penultimate file
            penultimate_file, penultimate_size = saved_files[-1]
            penultimate_df = pd.read_parquet(penultimate_file)
            merged_df = pd.concat([penultimate_df, current_df], ignore_index=True)
            merged_size_mb = merged_df.memory_usage(deep=True).sum() / 1024 / 1024
            
            merged_df.to_parquet(penultimate_file, index=False)
            print(f"Appended to {penultimate_file} (New size: {merged_size_mb:.2f} MB)")
        else:
            # Save as a new file
            output_file = os.path.join(output_folder, f'merged_file_{file_counter}.parquet')
            current_df.to_parquet(output_file, index=False)
            print(f"Saved {output_file} (Size: {current_size_mb:.2f} MB)")

    print("Merging complete!")

# Usage
input_folder = 'source'
output_folder = 'merged'
merge_parquet_files(input_folder, output_folder, size_limit_mb=10, min_length_mb=3, match_up=True)

Calculated dt: 0.1
Joining point found at index 201 in file regularSimv103OffyB_101.parquet
Joining point found at index 0 in file regularSimv103OffyB_102.parquet
Joining point found at index 0 in file regularSimv103OffyB_103.parquet
Joining point found at index 0 in file regularSimv103OffyB_104.parquet
Joining point found at index 1 in file regularSimv103OffyB_105.parquet
Joining point found at index 0 in file regularSimv103OffyB_106.parquet
Joining point found at index 930 in file regularSimv103OffyB_107.parquet
Joining point found at index 597 in file regularSimv103OffyB_108.parquet
Saved merged/merged_file_1.parquet (Size: 10.26 MB)
Joining point found at index 1 in file regularSimv103OffyB_109.parquet
Joining point found at index 4182 in file regularSimv103OffyB_110.parquet
Joining point found at index 632 in file regularSimv103OffyB_111.parquet
Joining point found at index 1107 in file regularSimv103OffyB_112.parquet
Joining point found at index 173 in file regularSimv103OffyB_11