In [16]:
import os
import openai
import tarfile
import replicate
import base64
import ffmpeg

In [10]:
def get_openai_key():
    # Check if the file exists
    if not os.path.isfile('../openai_key.txt'):
        # Create the file if it does not exist and write a default value or leave it blank
        with open('../openai_key.txt', 'w') as file:
            file.write('')  # You could prompt the user for a key or leave it blank
    # Read the key from the file
    with open('../openai_key.txt', 'r') as file:
        return file.read().strip()


# Usage
openai.api_key = get_openai_key()

In [11]:
def get_replicate_token():
    # Check if the file exists
    if not os.path.isfile('../replicate_token.txt'):
        # Create the file if it does not exist and write a default value or leave it blank
        with open('../replicate_token.txt', 'w') as file:
            file.write('')  # You could prompt the user for a key or leave it blank
    # Read the key from the file
    with open('../replicate_token.txt', 'r') as file:
        return file.read().strip()

os.environ["REPLICATE_API_TOKEN"] = get_replicate_token()

In [15]:

def extract_tar_files(tar_dir, output_dir):
    """
    Extracts all .tar files found in tar_dir to output_dir, skipping extraction if
    the output directory for the .tar file already exists.

    Parameters:
    - tar_dir: Directory containing .tar files.
    - output_dir: Directory where extracted files will be saved.
    """
    # Ensure output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Iterate over each file in the tar_dir
    for file in os.listdir(tar_dir):
        if file.endswith(".tar"):
            # Construct full file path
            tar_path = os.path.join(tar_dir, file)
            # Construct the specific output path for this tar file
            specific_output_dir = os.path.join(output_dir, os.path.splitext(file)[0])
            
            # Check if the specific output directory already exists
            if not os.path.exists(specific_output_dir):
                # If not, create the directory and proceed with extraction
                os.makedirs(specific_output_dir)
                
                # Open the .tar file
                with tarfile.open(tar_path, "r:") as tar:
                    # Extract all contents to the specific output directory
                    tar.extractall(path=specific_output_dir)
                    print(f"Extracted {file} to {specific_output_dir}")
            else:
                # If directory exists, skip the extraction
                print(f"Skipping {file}, directory {specific_output_dir} already exists.")


extract_tar_files("../data/raw/archives", "../data/raw")


Skipping juan-saenz.tar, directory ../data/raw/juan-saenz already exists.


In [18]:
def convert_m4a_to_mp3(root_directory):
    """
    Converts all M4A files found in the given root directory and its subdirectories to MP3,
    if the MP3 doesn't already exist.

    Parameters:
    - root_directory: The root directory to scan for M4A files.
    """
    for subdir, dirs, files in os.walk(root_directory):
        for filename in files:
            if filename.endswith(".m4a"):
                m4a_path = os.path.join(subdir, filename)
                mp3_path = os.path.join(subdir, filename[:-4] + '.mp3')

                # Check if the mp3 file already exists
                if not os.path.exists(mp3_path):
                    # Run ffmpeg to convert m4a to mp3
                    ffmpeg.input(m4a_path).output(mp3_path).run()

                    print(f"Converted {filename} to {filename[:-4]}.mp3")
                else:
                    print(f"{filename[:-4]}.mp3 already exists. No conversion necessary.")


# Example usage: replace 'path_to_directory' with the path to your directory
convert_m4a_to_mp3('../data/raw')




Converted interview-saenz.m4a to interview-saenz.mp3


In [19]:
import os
import base64
import replicate  # Assuming this is a valid library or API client that you have access to

def run_transcription_and_save_script(raw_directory):
    """
    Goes through all subdirectories in the raw directory, checks for .mp3 files, and if a corresponding
    script file does not exist, processes the mp3 for diarization and saves the output to a script file.
    
    Parameters:
    - raw_directory: The root directory to scan for mp3 files and save script files.
    """
    for subdir, _, files in os.walk(raw_directory):
        for filename in files:
            if filename.endswith(".mp3"):
                # Construct paths
                mp3_path = os.path.join(subdir, filename)
                script_name = f"script-{subdir.split('-')[-1]}.txt"  # Extracting word after the last '-' in subdir name
                script_path = os.path.join(subdir, script_name)

                # Check if the script file already exists
                if not os.path.exists(script_path):
                    # Read the .mp3 file and encode it to base64
                    with open(mp3_path, 'rb') as file:
                        data = base64.b64encode(file.read()).decode('utf-8')
                        file_data = f"data:application/octet-stream;base64,{data}"
                    
                    # Prepare the input for the replicate API
                    input_data = {
                        "file": file_data,
                        "prompt": "A 1 to 1 interview",
                        "file_url": "",
                        "num_speakers": 2,
                        "language": "it"
                    }
                    
                    # Call the replicate.run function and wait for the output
                    output = replicate.run(
                        "thomasmol/whisper-diarization:b9fd8313c0d492bf1ce501b3d188f945389327730773ec1deb6ef233df6ea119",
                        input=input_data
                    )
                    print(output)
                    
                    # Use the function to format the dialogue
                    formatted_lines = format_dialogue(output)
                    
                    # Save the formatted text to a script file
                    save_to_file(formatted_lines, script_path)
                    
                    print(f"Script saved as '{script_path}'.")

def format_dialogue(data):
    output_lines = []
    for segment in data['segments']:
        speaker = segment['speaker']
        text = segment['text']
        start_time = segment['start']
        end_time = segment['end']
        line = f"{speaker} ({start_time} - {end_time}): {text}"
        output_lines.append(line)
    
    return output_lines

def save_to_file(lines, filename):
    with open(filename, 'w') as file:
        for line in lines:
            file.write(line + '\n')

# Example usage:
run_transcription_and_save_script('../data/raw')


{'language': 'it', 'num_speakers': 2, 'segments': [{'avg_logprob': -0.1530612188638473, 'end': '6.78', 'speaker': 'SPEAKER_00', 'start': '1.46', 'text': 'Ok, alcune domande su quanto fatto e alcune domande più in generale.', 'words': [{'end': 1.98, 'probability': 0.92626953125, 'start': 1.46, 'word': 'Ok,'}, {'end': 2.58, 'probability': 0.999755859375, 'start': 2.1, 'word': 'alcune'}, {'end': 3.1, 'probability': 1, 'start': 2.58, 'word': 'domande'}, {'end': 3.32, 'probability': 0.99609375, 'start': 3.1, 'word': 'su'}, {'end': 3.86, 'probability': 0.99951171875, 'start': 3.32, 'word': 'quanto'}, {'end': 4.26, 'probability': 0.9462890625, 'start': 3.86, 'word': 'fatto'}, {'end': 4.6, 'probability': 0.98193359375, 'start': 4.26, 'word': 'e'}, {'end': 5.72, 'probability': 0.999755859375, 'start': 4.6, 'word': 'alcune'}, {'end': 6.1, 'probability': 1, 'start': 5.72, 'word': 'domande'}, {'end': 6.22, 'probability': 1, 'start': 6.1, 'word': 'più'}, {'end': 6.36, 'probability': 1, 'start': 6.2