In [1]:
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from tqdm import tqdm


In [3]:

# Function to process the file in chunks and write to Parquet
def process_file_to_parquet(file_path, batch_size):
    data = {}  # Initialize the dictionary inside the function
    current_column = None
    batch_number = 0

    with open(file_path, 'r') as file:
        for line in tqdm(file, desc="Processing file"):
            line = line.strip()  # Remove any extra whitespace
            if line.startswith("TBP:"):
                # If the line starts with "TBP:", it's a new column header
                current_column = line
                if current_column not in data:
                    data[current_column] = []  # Initialize an empty list for this column
            else:
                # Otherwise, it's a value for the current column
                if current_column is not None:
                    data[current_column].append(float(line))  # Convert to float and add to the column

            # Check if the batch size is reached
            if len(data.get(current_column, [])) >= batch_size:
                # Write the current batch to a Parquet file
                write_batch_to_parquet(data, batch_number)
                batch_number += 1
                # Reset the data dictionary for the next batch
                data = {key: [] for key in data.keys()}

        # Write any remaining data to a Parquet file
        if any(data.values()):
            write_batch_to_parquet(data, batch_number)

def write_batch_to_parquet(batch, batch_number):
    # Ensure all columns have the same length
    max_length = max(len(values) for values in batch.values())
    for column in batch:
        if len(batch[column]) < max_length:
            batch[column] += [None] * (max_length - len(batch[column]))  # Pad with None

    # Convert the batch to a PyArrow Table
    table = pa.Table.from_pydict(batch)
    # Save the batch to a Parquet file
    pq.write_table(table, f'output_batch_{batch_number}.parquet')
    print(f"Batch {batch_number} saved to output_batch_{batch_number}.parquet")


In [4]:
file_path = '../data/20210518_142318.lag'

# Initialize variables
batch_size = 100000  # Adjust batch size based on memory availability


In [None]:
# Process the file and write to Parquet in batches
process_file_to_parquet(file_path, batch_size)

print("All batches processed and saved.")

Processing file: 303898815it [02:54, 1908258.48it/s]