In [None]:
import datasets
from pathlib import Path
from typing import List, Optional, Union

import numpy as np
from gluonts.dataset.arrow import ArrowWriter
import pyarrow as pa
import pandas as pd

def convert_to_arrow_chunked(
    path: Union[str, Path],
    dataset,
    chunk_size: int = 10000,
    compression: str = "lz4",
):

    
    first_chunk = True
    
    with pa.OSFile(path, 'wb') as f:
        for i in range(0, len(dataset), chunk_size):
            chunk = dataset[i:i + chunk_size]
            chunk_data = []
            
            # Get the values as lists
            ids = chunk['id']
            targets = chunk['target']
            timestamps = chunk['timestamp']
            # Zip the values together and process each row
            for id_val, target_val, timestamp_val in zip(ids, targets, timestamps):
                chunk_data.append({
                    "start": timestamp_val[0],
                    "target": target_val
                })
            
            # Convert to Arrow table
            df_chunk = pd.DataFrame(chunk_data)
            table_chunk = pa.Table.from_pandas(df_chunk)
            
            # Initialize writer with schema from first chunk
            if first_chunk:
                schema = table_chunk.schema
                writer = pa.ipc.new_file(f, schema)
                first_chunk = False
            
            # Write chunk
            writer.write(table_chunk)
            print(f"Processed {i + len(chunk)} / {len(dataset)} rows")
        
        writer.close()

# # Load and process dataset
ds = datasets.load_dataset("autogluon/chronos_datasets", "training_corpus_tsmixup_10m", split="train")
ds.set_format("numpy")
# Convert to arrow format in chunks
convert_to_arrow_chunked("../data/tsmixup-data.arrow", ds, chunk_size=10000)
# # Load and process dataset
ds = datasets.load_dataset("autogluon/chronos_datasets", "training_corpus_kernel_synth_1m", split="train")
ds.set_format("numpy")
convert_to_arrow_chunked("../data/kernelsynth-data.arrow", ds, chunk_size=10000)

In [3]:
import pyarrow as pa
import random

def sample_arrow_file(input_path: str, output_path: str, sample_ratio: float = 0.1):
    # Read the input file
    reader = pa.ipc.open_file(input_path)
    total_rows = reader.num_record_batches
    
    # Calculate how many batches to keep
    num_batches_to_keep = int(total_rows * sample_ratio)
    selected_indices = sorted(random.sample(range(total_rows), num_batches_to_keep))
    
    # Get the schema from the original file
    schema = reader.schema
    
    # Write selected batches to new file
    with pa.OSFile(output_path, 'wb') as f:
        writer = pa.ipc.new_file(f, schema)
        
        for idx in selected_indices:
            batch = reader.get_batch(idx)
            writer.write(batch)
        
        writer.close()
    
    print(f"Sampled {num_batches_to_keep} batches out of {total_rows} total batches")

# Use the function
input_file = "/home/arda/Documents/chronos-forecasting/data/tsmixup-data.arrow"
output_file = "/home/arda/Documents/chronos-forecasting/data/tsmixup-data-10percent.arrow"
sample_arrow_file(input_file, output_file, 0.1)

Sampled 100 batches out of 1000 total batches
