This code requires at least 16 GB of VRAM to run at a reasonable speed. This can be accessed through Google Colab or GPU Cloud Services like Runpod. Otherwise it will take far too long to run (around 20 hours).

In [1]:
from transformers import pipeline
from pydub import AudioSegment
import torch
import numpy as np
import time
import requests
import re
import os
import gc
import tempfile

2023-08-20 21:30:01.395909: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
#Should be a text file with each line containing a link to a website url with an audio file
#If you already have a collection of audio files then you can delete some parts of the code 
with open("link.txt") as file:
  podcast_links = file.readlines()

In [None]:
#Loads in the model we'll use for transcription
device = "cuda"
pipe = pipeline("automatic-speech-recognition", model="lorenzoncina/whisper-small-en-4-epochs", device=device)
final_transcript_list = []

In [4]:
import re

def get_audio_file_type(url):
    # Regular expression pattern to match .mp3 or .m4a in the URL
    pattern = re.compile(r'\.(mp3|m4a)(\?|$)', re.IGNORECASE)
    
    # Search for the pattern in the URL
    match = pattern.search(url)
    
    # If a match is found, return the file type, otherwise return 'unknown'
    if match:
        return match.group(1).lower()
    else:
        return 'unknown'


In [None]:
#Dowloads the audio files in chunks to process easier
#Can bypass the chunking with enough processing power
def download_audio(url, file_format, index):
    audio_response = requests.get(url, stream=True)
    with open(f'audio_file_{index}.{file_format}', 'wb') as audio_file:
        for chunk in audio_response.iter_content(chunk_size=1024):
            if chunk:
                audio_file.write(chunk)
    audio_response.close()

In [None]:
#Converts m4a files to mp3
def convert_m4a_to_mp3(index):
    input_filename = f"audio_file_{index}.m4a"
    audio = AudioSegment.from_file(input_filename, format="m4a")
    output_filename = f"audio_file_{index}.mp3"
    audio.export(output_filename, format="mp3")
    os.remove(input_filename)

In [None]:
#Where all the audio transcription actually happens
def transcribe_audio(file_path):
    audio = AudioSegment.from_mp3(file_path)
    chunk_size = 45000
    num_chunks = len(audio) // chunk_size
    transcriptions = []

    for j in range(num_chunks):
        start_time = j * chunk_size
        end_time = (j + 1) * chunk_size
        chunk = audio[start_time:end_time]
        with tempfile.NamedTemporaryFile(suffix=".wav") as temp_chunk:
            chunk.export(temp_chunk.name, format="wav")
            res = pipe(temp_chunk.name)
            transcriptions.append(res['text'])
        del chunk
        gc.collect()

    # Process the remaining part of the audio (if any)
    remaining_audio = len(audio) % chunk_size
    if remaining_audio > 0:
        start_time = num_chunks * chunk_size
        chunk = audio[start_time:]
        with tempfile.NamedTemporaryFile(suffix=".wav") as temp_chunk:
            chunk.export(temp_chunk.name, format="wav")
            res = pipe(temp_chunk.name)
            transcriptions.append(res['text'])
        del chunk
        gc.collect()

    return ' '.join(transcriptions)

In [None]:
with requests.Session() as session:
    for i, link in enumerate(podcast_links):
        #Below commmented part is unnecessary if you already have the audio urls
        # response = session.get(link)
        # content = response.text
        # response.close()

        # pattern = r'"media_url":"(https:\\/\\/[^"]+\.(mp3|m4a)[^"]*)"'
        # match = re.search(pattern, content)

        # if not match:
        #     print("No audio URL found.")
        #     continue

        # audio_url = match.group(1).replace("\\", "")
        # print("Audio URL:", audio_url)
        
        file_type = get_audio_file_type(audio_url)
        download_audio(audio_url, file_type, i)
        print(f"Audio downloaded as 'audio_file_{i}.{file_type}'")
        
        if file_type == "m4a":
            convert_m4a_to_mp3(i)

        file_path = f"audio_file_{i}.mp3"
        transcription = transcribe_audio(file_path)
        final_transcript_list.append(transcription)
        print(transcription)

        os.remove(file_path)
        print("File deleted successfully")

In [6]:
import csv 

with open("transcribed_audio.csv", "w"):
    writer = csv.writer(file)
    for snippet in final_transcript_list:
        writer.writerow([snippet]) 

NameError: name 'final_transcript_list' is not defined