In [1]:
import boto3
import whisper
import subprocess
import io
import os
import time
import torch
import numpy as np
import pandas as pd

In [29]:
import pyarrow as pa
import pyarrow.parquet as pq
import awswrangler as wr

In [2]:
def tokenise(audio_np_array: np.ndarray) -> torch.Tensor:
    """
    Function to tokenise an audio file represented as a NumPy array.

    Args:
    - audio_np_array (np.ndarray): The audio file as a NumPy array.

    Returns:
    - torch.Tensor: A random 1D tensor with dtype int16 and variable length in range (20, 1000).
    """

    # Check if the input is a NumPy array
    if not isinstance(audio_np_array, np.ndarray):
        raise ValueError("Input should be a NumPy array")

    # Time delay to simulate model inference
    time.sleep(0.15)

    tensor_length = np.random.randint(20, 1001)  # 1001 is exclusive
    return torch.randint(low=-32768, high=32767, size=(tensor_length,), dtype=torch.int16)



def transcribe_tokenize(file, s3, model, bucket_name):
    """
    Function to transcibe and tokenize an audio file .

    Args:
    - file (str): file path of audio file.
    - s3 (s3 Object): AWS S3 Object.
    - model (whisper model): Whisper model object.
    - bucket_name (str): AWS Bucket Name.

    Returns:
    - text (str): Transcribed text from whisper.
    - tokenized(torch.Tensor): A random 1D tensor with dtype int16 and variable length in range (20, 1000).
    """
    if file.endswith('.flac'): 
    
        local_audio_path = file.split('wav48_silence_trimmed')[1].replace('/', '0')
                
        s3.download_file(bucket_name, file, local_audio_path)
        
        result = model.transcribe(local_audio_path)
        
        text = result["text"]        

        tokenized = np.array(tokenise(whisper.load_audio(local_audio_path)))
        
    else:
        text, tokenized = np.nan, np.nan
    
    return (text, tokenized)


In [3]:
aws_access_key_id = "d6f2614d1a84055eb1fa65b50f394cb0"
aws_secret_access_key = "a1417644adc0d025b325da0fff96a2dc60813545efa44d9c291e14e66e4e441f"

bucket_name = 'data-engineer-test'
endpoint_url = 'https://bdadc4417ecd7714dd7d42a104a276c2.r2.cloudflarestorage.com'

In [4]:
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, 
                  aws_secret_access_key=aws_secret_access_key, 
                  endpoint_url=endpoint_url)

In [5]:
model = whisper.load_model("base")

response = s3.list_objects(Bucket=bucket_name)

process_df = pd.json_normalize(response, record_path= 'Contents')

process_df.tail()

Unnamed: 0,Key,LastModified,ETag,Size,StorageClass,Owner.DisplayName,Owner.ID
995,wav48_silence_trimmed/p226/p226_270_mic1.flac,2023-08-24 00:28:38.048000+00:00,"""6c9e6cd6f9af143688bf91df30dcaeb2""",147053,STANDARD,bdadc4417ecd7714dd7d42a104a276c2,bdadc4417ecd7714dd7d42a104a276c2
996,wav48_silence_trimmed/p226/p226_270_mic2.flac,2023-08-24 00:28:38.027000+00:00,"""a8a712d22b068c5b9a614fbd571a973c""",135809,STANDARD,bdadc4417ecd7714dd7d42a104a276c2,bdadc4417ecd7714dd7d42a104a276c2
997,wav48_silence_trimmed/p226/p226_271_mic1.flac,2023-08-24 00:28:38.139000+00:00,"""1a4e1d710be9e661e5b5f6fdc96d5fd3""",96778,STANDARD,bdadc4417ecd7714dd7d42a104a276c2,bdadc4417ecd7714dd7d42a104a276c2
998,wav48_silence_trimmed/p226/p226_271_mic2.flac,2023-08-24 00:28:38.194000+00:00,"""d1ab352749c2eaad120333b763899086""",91856,STANDARD,bdadc4417ecd7714dd7d42a104a276c2,bdadc4417ecd7714dd7d42a104a276c2
999,wav48_silence_trimmed/p226/p226_272_mic1.flac,2023-08-24 00:28:38.226000+00:00,"""01a71e53b399d29ee3e9c5db91214de4""",151981,STANDARD,bdadc4417ecd7714dd7d42a104a276c2,bdadc4417ecd7714dd7d42a104a276c2


In [6]:
process_df[['transcribed_text', 'tokenized_info']] =  process_df.apply(lambda x: transcribe_tokenized(x['Key'],s3, model, bucket_name),axis = 1,  result_type = 'expand')



In [8]:
#Save to CSV if required
#process_df.to_csv('processed_df.xlsx')

In [16]:
process_df['LastModifiedDate'] = pd.to_datetime(process_df['LastModified']).dt.date
process_df['LastModifiedHour'] = pd.to_datetime(process_df['LastModified']).dt.hour
process_df['LastModifiedMinute'] = pd.to_datetime(process_df['LastModified']).dt.minute
process_df['LastModifiedSecond'] = pd.to_datetime(process_df['LastModified']).dt.second

In [39]:
process_df['FolderName'] = process_df['Key'].apply(lambda x: x.split('/')[-2] if x.endswith('.flac') else np.nan)

In [42]:
partitioned_data = process_df.groupby(['FolderName', 'LastModifiedDate', 'LastModifiedHour', 'LastModifiedMinute'])

In [50]:
# Upload the Parquet file to S3
for partition_key, partition_df in partitioned_data:
    print(partition_key)
    partition_file = f'{partition_key}.parquet'
    partition_df = partition_df[['Key', 'transcribed_text', 'tokenized_info' ]]
    table = pa.Table.from_pandas(partition_df)
    pq.write_table(table, partition_file)
    
    s3.upload_file(partition_file, bucket_name, f'output_parquet/{partition_key}.parquet')

('p225', datetime.date(2023, 8, 24), 0, 21)
('p225', datetime.date(2023, 8, 24), 0, 22)
('p225', datetime.date(2023, 8, 24), 0, 23)
('p226', datetime.date(2023, 8, 24), 0, 28)
