In [None]:
from whisperspeech.pipeline import Pipeline

class TTSProcessor:
    def __init__(self, num_processes_per_gpu, num_gpus):
        """TTS Processor to generate TTS from prompts."""
        self.num_processes_per_gpu = num_processes_per_gpu
        self.num_gpus = num_gpus

    def distribute_pipe(self):
        """Distribute the pipeline."""
        self.pipes = []
        for gpu_id in range(self.num_gpus):
            for pipe_id in range(self.num_processes_per_gpu):
                self.pipes.append(Pipeline(s2a_ref="collabora/whisperspeech:s2a-q4-tiny-en+pl.model", ))


In [8]:
import torch

# random float32 tensor with normal distribution
x = torch.randn(10, 2, dtype=torch.float32).numpy()

In [25]:
import pyarrow as pa

# convert to pyarrow tensor
x_pa = pa.Tensor.from_numpy(x)
x_pa

<pyarrow.Tensor>
type: float
shape: (10, 2)
strides: (8, 4)

In [10]:
x_pa

<pyarrow.Tensor>
type: float
shape: (10, 2)
strides: (8, 4)

In [36]:
schema = pa.schema([pa.field("feature", pa.list_(pa.float32())), pa.field("label", pa.list_(pa.float32()))])

In [31]:
import torchaudio


tensor, sr = torchaudio.load("/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_audio/audio_2499.wav")

In [32]:
tensor.squeeze(0).numpy()

array([-0.00079372, -0.00075542, -0.00076741, ...,  0.00136764,
       -0.00015275, -0.00142147], dtype=float32)

In [44]:
import pyarrow as pa
import torchaudio

# Define the schema
schema = pa.schema([
    pa.field("feature", pa.list_(pa.float32())),
    pa.field("label", pa.list_(pa.float32()))
])

# Load the audio file
tensor, sr = torchaudio.load("/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_audio/audio_2499.wav")

In [53]:
import pyarrow as pa
import torchaudio

# Define the schema
schema = pa.schema([
    pa.field("feature", pa.list_(pa.float32())),
    pa.field("label", pa.list_(pa.float32()))
])

# Load an audio file
tensor, sr = torchaudio.load("/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_audio/audio_2499.wav")
print(f"Loaded audio file with sample rate: {sr}")

# Open the file sink for writing
with pa.OSFile("data.arrow", "wb") as sink:
    writer = pa.ipc.RecordBatchFileWriter(sink, schema)
    print("Opened data.arrow for writing.")

    # Prepare the data as pyarrow arrays
    feature_array = pa.array([tensor.squeeze(0).numpy()], type=pa.list_(pa.float32()))
    label_array = pa.array([tensor.squeeze(0).numpy()], type=pa.list_(pa.float32()))
    print("Prepared pyarrow arrays for feature and label.")

    # Create a RecordBatch from the arrays
    batch = pa.RecordBatch.from_arrays([feature_array, label_array], schema)
    print("Created RecordBatch with schema:", schema)

    # Write the batch to the file
    writer.write_batch(batch)
    print("Wrote RecordBatch to data.arrow.")

    # Close the writer explicitly
    writer.close()
    print("Closed the writer.")


Loaded audio file with sample rate: 24000
Opened data.arrow for writing.
Prepared pyarrow arrays for feature and label.
Created RecordBatch with schema: feature: list<item: float>
  child 0, item: float
label: list<item: float>
  child 0, item: float
Wrote RecordBatch to data.arrow.
Closed the writer.


  batch = pa.RecordBatch.from_arrays([feature_array, label_array], schema)


In [6]:
import pyarrow as pa

try:
    with pa.ipc.open_file('/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_tokens_v4.arrow') as reader:
        table = reader.read_all()
    print("File opened successfully")
except pa.lib.ArrowInvalid:
    print("File is corrupted, need to try recovery")

File is corrupted, need to try recovery


In [7]:
import pyarrow as pa

try:
    with open('/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_tokens_v4.arrow', 'rb') as file:
        reader = pa.ipc.open_stream(file)
        batches = list(reader)
        if batches:
            table = pa.Table.from_batches(batches)
            print(f"Recovered {len(batches)} record batches")
        else:
            print("No valid record batches found")
except Exception as e:
    print(f"Recovery failed: {str(e)}")

Recovery failed: Invalid flatbuffers message.


In [11]:
import pyarrow as pa

def safe_read_arrow(file_path):
    try:
        with pa.memory_map(file_path, 'r') as source:
            reader = pa.ipc.open_file(source)
            return [batch for batch in reader]
    except pa.ArrowInvalid as e:
        print(f"Error reading file: {e}")
        return None

recovered_data = safe_read_arrow('/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_tokens_v4.arrow')
if recovered_data:
    print(f"Successfully recovered {sum(len(batch) for batch in recovered_data)} records")

Error reading file: Not an Arrow file


In [44]:
import pyarrow as pa
import pyarrow.ipc as ipc

def read_partial_arrow_data(file_path, chunk_size=1024):
    try:
        with open(file_path, 'rb') as f:
            offset = 0
            batches = []
            while True:
                f.seek(offset)
                try:
                    reader = ipc.open_stream(f)
                    while True:
                        try:
                            batch = reader.read_next_batch()
                            batches.append(batch)
                        except StopIteration:
                            break
                except Exception as e:
                    print(f"Error at offset {offset}: {e}")
                offset += chunk_size
                if offset >= f.tell():
                    break
            if batches:
                table = pa.Table.from_batches(batches)
                return table
            else:
                print("No valid data found in the file.")
                return None
    except Exception as e:
        print(f"Error opening file: {e}")
        return None

# Replace 'your_file.arrow' with the path to your IPC file
table = read_partial_arrow_data('/home/root/Workspace/synthetic_data_generation/sound_instruct_llama3/data/new_tokens_v4.arrow')
if table:
    print(table)
else:
    print("Failed to recover data from the Arrow file.")


Error at offset 0: Invalid flatbuffers message.
Error at offset 1024: Invalid flatbuffers message.
Error at offset 2048: Invalid IPC stream: negative continuation token
No valid data found in the file.
Failed to recover data from the Arrow file.


In [18]:
import pyarrow as pa
import pyarrow.csv as csv

class CSVWriter:
    def __init__(self, file_path, schema):
        self.file_path = file_path
        self.schema = schema
        self.writer = None
        self.open()

    def open(self):
        self.writer = csv.CSVWriter(self.file_path, self.schema)

    def write(self, batch):
        self.writer.write(batch)

    def close(self):
        if self.writer:
            self.writer.close()

In [10]:
# Create a CSV writer for this process
schema = pa.schema(
    [
        pa.field("index", pa.int64()),
        pa.field("audio", pa.string()),
        pa.field("tokens", pa.string()),
    ]
)

writer = CSVWriter("data.csv", schema)

In [15]:
import json
import numpy as np

def create_random_array_string(size=15):
    # random float32 array
    array = np.random.rand(size).astype(np.float32)
    return json.dumps(array.tolist())

In [16]:
# Create random array
import numpy as np

batch = [
    pa.array([1, 2, 3, 4, 5], type=pa.int64()),
    pa.array([create_random_array_string() for _ in range(5)], type=pa.string()),
    pa.array([create_random_array_string() for _ in range(5)], type=pa.string())
]

In [17]:
batch_table = pa.table.from_arrays(
        batch, schema=csv_writer.schema
    )

[<pyarrow.lib.Int64Array object at 0x7ffaadc932e0>
 [
   1,
   2,
   3,
   4,
   5
 ],
 <pyarrow.lib.StringArray object at 0x7ffaa790f9a0>
 [
   "[0.4400988221168518, 0.5239095687866211, 0.723219096660614, 0.2820580303668976, 0.7444203495979309, 0.017944473773241043, 0.3287590742111206, 0.8090250492095947, 0.10484332591295242, 0.3647204041481018, 0.7928389310836792, 0.9845169186592102, 0.12505963444709778, 0.14084535837173462, 0.2445572465658188]",
   "[0.1545468121767044, 0.138200044631958, 0.6591715216636658, 0.4447644352912903, 0.3121277987957001, 0.5649989247322083, 0.46367937326431274, 0.5253438353538513, 0.18653614819049835, 0.6859237551689148, 0.2547590136528015, 0.6078174114227295, 0.058871880173683167, 0.5905410051345825, 0.8234201669692993]",
   "[0.4271077811717987, 0.8178958892822266, 0.8086280226707458, 0.02417149767279625, 0.9339762330055237, 0.10169157385826111, 0.24186016619205475, 0.21639375388622284, 0.38006070256233215, 0.3688339591026306, 0.472318559885025, 0.136996