In [None]:
!pip install python-dotenv
!pip install openai-whisper


Collecting openai-whisper
  Downloading openai-whisper-20240927.tar.gz (800 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.0/800.0 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper)
  Downloading tiktoken-0.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting triton>=2.0.0 (from openai-whisper)
  Downloading triton-3.0.0-1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.3 kB)
Downloading triton-3.0.0-1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (209.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m209.4/209.4 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tiktoken-0.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
[

1. Setting Up the Environment

In [None]:
import os
import sys
import re
from dotenv import load_dotenv
import librosa
import numpy as np
import pandas as pd
from io import StringIO, BytesIO
import logging
from collections import Counter

# Load environment variables from .env file
load_dotenv()

# Set up logging
log_dir = '/app/logs'
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'speech_pace_logfile.log')

LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

# Add file handler to save logs to a file
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger = logging.getLogger()
if not logger.handlers:
    logger.addHandler(file_handler)

2. Defining Speech Pace

In [None]:
def define_pace(zcr):
    if zcr < 0.01:
        return 'very slow'
    elif 0.01 <= zcr < 0.05:
        return 'slow'
    elif 0.05 <= zcr < 0.1:
        return 'medium slow'
    elif 0.1 <= zcr < 0.25:
        return 'medium'
    elif 0.25 <= zcr < 0.35:
        return 'medium fast'
    elif zcr < 0.5:
        return 'fast'
    else:
        return 'very fast'

3. Analyzing the Speech Speed

In [None]:
def analyze_speech_speed(audio_data):
    frame_length = 2048
    hop_length = 512
    energy = np.array([
        sum(abs(audio_data[i:i + frame_length] ** 2))
        for i in range(0, len(audio_data), hop_length)
    ])
    energy_diff = np.diff(energy)
    zcr = np.mean(librosa.feature.zero_crossing_rate(y=energy_diff, frame_length=frame_length, hop_length=hop_length))
    return zcr

4. Retrieving Data from Local Storage

In [None]:
def read_audio_from_local(audio_file_path):
    """Read an audio file from the local system."""
    try:
        with open(audio_file_path, 'rb') as audio_file:
            return audio_file.read()
    except Exception as e:
        logger.error(f"An error occurred while reading the audio file from local storage: {str(e)}")
        return None


5. Transcription Using OpenAI Whisper

In [None]:
import whisper

def transcribe_audio(audio_path):
    model = whisper.load_model("base")
    result = model.transcribe(audio_path)
    return pd.DataFrame(result['segments'])

6. Analyzing and Saving Results

In [None]:
import os
import pandas as pd
import librosa
import logging
from io import BytesIO

# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def read_audio_from_local(audio_file_path):
    try:
        with open(audio_file_path, 'rb') as f:
            return f.read()
    except Exception as e:
        logger.error(f"Error reading audio file: {str(e)}")
        return None

def analyze_speech_speed(segment_audio):
    # Calculate ZCR
    zcr = librosa.feature.zero_crossing_rate(segment_audio)
    return zcr.mean()

def define_pace(zcr):
    if zcr < 0.01:
        return 'very slow'
    elif 0.01 <= zcr < 0.05:
        return 'slow'
    elif 0.05 <= zcr < 0.1:
        return 'medium slow'
    elif 0.1 <= zcr < 0.25:
        return 'medium'
    elif 0.25 <= zcr < 0.35:
        return 'medium fast'
    elif zcr < 0.5:
        return 'fast'
    else:
        return 'very fast'

def speech_pace_for_transcription_segments(df_transcript, audio_id, destination, audio_file_path, date):
    speech_pace_data = {'start_time': [], 'end_time': [], 'speech_pace': [], 'speaker': [], 'audio_id': [], 'zcr': []}

    audio_data = read_audio_from_local(audio_file_path)
    if not audio_data:
        logger.error(f"Failed to read audio data for audio_id {audio_id}")
        return

    try:
        audio, sample_rate = librosa.load(BytesIO(audio_data), sr=None, mono=True)
    except Exception as e:
        logger.error(f"An error occurred while loading the audio data for audio_id {audio_id}: {str(e)}")
        return

    filtered_df = df_transcript[df_transcript['speaker'] != 'IVR']
    for index, row in filtered_df.iterrows():
        start_time = row['start_time']
        end_time = row['end_time']
        speaker = row['speaker']

        logger.info(f"Processing segment {index}: start_time={start_time}, end_time={end_time}, speaker={speaker}")

        try:
            start_frame = librosa.time_to_frames(start_time, sr=sample_rate)
            end_frame = librosa.time_to_frames(end_time, sr=sample_rate)
            segment_audio = audio[start_frame:end_frame]

            if len(segment_audio) == 0:
                logger.warning(f"Segment {index} is empty before trimming for audio_id {audio_id}. Skipping this segment.")
                continue

            # Trim silent parts from the segment
            segment_audio, _ = librosa.effects.trim(segment_audio)

            if len(segment_audio) == 0:
                logger.warning(f"Segment {index} is empty after trimming for audio_id {audio_id}. Skipping this segment.")
                continue

            zcr_segment = analyze_speech_speed(segment_audio)
            pace_segment = define_pace(zcr_segment)

            logger.info(f"Computed ZCR for segment {index}: {zcr_segment}")

            speech_pace_data['start_time'].append(start_time)
            speech_pace_data['end_time'].append(end_time)
            speech_pace_data['speech_pace'].append(pace_segment)
            speech_pace_data['speaker'].append(speaker)
            speech_pace_data['audio_id'].append(audio_id)
            speech_pace_data['zcr'].append(zcr_segment)
        except Exception as e:
            logger.error(f"An error occurred while processing segment {index} for audio_id {audio_id}: {str(e)}")
            continue

    df_result = pd.DataFrame(speech_pace_data)
    df_result = df_result[df_result['speaker'] != 'IVR']

    # Save the result locally
    result_file_path = os.path.join(destination, f"{audio_id}_speech_pace.csv")
    df_result.to_csv(result_file_path, index=False)
    logger.info(f"File successfully saved locally for {audio_id} at {result_file_path}")

# Example data for testing
segment_duration = 10  # Duration of each segment in seconds
num_segments = 295 // segment_duration  # Number of full segments

# Create start and end times based on the number of segments
df_transcript = pd.DataFrame({
    'start_time': [i * segment_duration for i in range(num_segments)],
    'end_time': [(i + 1) * segment_duration for i in range(num_segments)],
    'speaker': ['Speaker 1' if i % 2 == 0 else 'Speaker 2' for i in range(num_segments)]
})

# If the last segment needs to cover the remaining duration, adjust the last row
if num_segments * segment_duration < 295:
    extra_segment = pd.DataFrame({
        'start_time': [num_segments * segment_duration],
        'end_time': [295],
        'speaker': ['Speaker 1']  # Assign a speaker for the last segment
    })
    df_transcript = pd.concat([df_transcript, extra_segment], ignore_index=True)

# Run the function with actual audio file
audio_file_path = "/content/winston-churchill-the-threat-of-germany.wav"  # Change this to your actual path
audio_id = "winston_churchill_speech"
destination = "results"
date = "2024-09-28"

# Ensure destination directory exists
os.makedirs(destination, exist_ok=True)

# Call the function
speech_pace_for_transcription_segments(df_transcript, audio_id, destination, audio_file_path, date)


In [None]:
def speech_pace_for_transcription_segments(df_transcript, audio_id, destination, audio_file_path, date):
    speech_pace_data = {'start_time': [], 'end_time': [], 'speech_pace': [], 'speaker': [], 'audio_id': [], 'zcr': []}
    audio_data = read_audio_from_local(audio_file_path)
    if not audio_data:
        logger.error(f"Failed to read audio data for audio_id {audio_id}")
        return

    try:
        audio, sample_rate = librosa.load(BytesIO(audio_data), sr=None, mono=True)
    except Exception as e:
        logger.error(f"An error occurred while loading the audio data for audio_id {audio_id}: {str(e)}")
        return

    filtered_df = df_transcript[df_transcript['speaker'] != 'IVR']
    for index, row in filtered_df.iterrows():
        start_time = row['start_time']
        end_time = row['end_time']
        speaker = row['speaker']

        try:
            start_frame = librosa.time_to_frames(start_time, sr=sample_rate)
            end_frame = librosa.time_to_frames(end_time, sr=sample_rate)
            segment_audio = audio[start_frame:end_frame]

            if len(segment_audio) == 0:
                raise ValueError("Segment audio is empty")

            # Trim silent parts from the segment
            segment_audio, _ = librosa.effects.trim(segment_audio)

            zcr_segment = analyze_speech_speed(segment_audio)
            pace_segment = define_pace(zcr_segment)

            speech_pace_data['start_time'].append(start_time)
            speech_pace_data['end_time'].append(end_time)
            speech_pace_data['speech_pace'].append(pace_segment)
            speech_pace_data['speaker'].append(speaker)
            speech_pace_data['audio_id'].append(audio_id)
            speech_pace_data['zcr'].append(zcr_segment)
        except Exception as e:
            logger.error(f"An error occurred while processing segment {index} for audio_id {audio_id}: {str(e)}")
            continue

    df_result = pd.DataFrame(speech_pace_data)
    df_result = df_result[df_result['speaker'] != 'IVR']

    # Save the result locally
    result_file_path = os.path.join("results", f"{audio_id}_speech_pace.csv")
    df_result.to_csv(result_file_path, index=False)
    logger.info(f"File successfully saved locally for {audio_id} at {result_file_path}")

In [None]:
# make custom
segment_duration = 10  # Duration of each segment in seconds
num_segments = 295 // segment_duration  # Number of full segments

# Create start and end times based on the number of segments
df_transcript = pd.DataFrame({
    'start_time': [i * segment_duration for i in range(num_segments)],
    'end_time': [(i + 1) * segment_duration for i in range(num_segments)],
    'speaker': ['Speaker 1' if i % 2 == 0 else 'Speaker 2' for i in range(num_segments)]
})

# If the last segment needs to cover the remaining duration, adjust the last row
if num_segments * segment_duration < 295:
    extra_segment = pd.DataFrame({
        'start_time': [num_segments * segment_duration],
        'end_time': [295],
        'speaker': ['Speaker 1']  # Assign a speaker for the last segment
    })
    df_transcript = pd.concat([df_transcript, extra_segment], ignore_index=True)

# Run the function with actual audio file
audio_file_path = "/content/winston-churchill-the-threat-of-germany.wav"
audio_id = "winston_churchill_speech"
destination = "results"
date = "2024-09-28"
os.makedirs(destination, exist_ok=True)


# Call the function
speech_pace_for_transcription_segments(df_transcript, audio_id, destination, audio_file_path, date)