In [28]:
import json

def format_time(seconds):
    """Converts seconds to SRT timestamp format."""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = seconds % 60
    milliseconds = int((seconds - int(seconds)) * 1000)
    return f"{hours:02}:{minutes:02}:{int(seconds):02},{milliseconds:03}"

def json_to_srt(json_path, srt_path):
    with open(json_path, 'r') as file:
        data = json.load(file)

    with open(srt_path, 'w') as file:
        for index, segment in enumerate(data['segments'], start=1):
            start = format_time(segment['start'])
            end = format_time(segment['end'])
            text = segment['text']
            file.write(f"{index}\n{start} --> {end}\n{text}\n\n")

    return srt_path

In [86]:
import os
import json

def adjust_transcript_segments(file_path, save_dir=None, min_segment_duration=2, max_segment_duration=12, max_gap_for_merge=0.5, verbose=False):
    with open(file_path, 'r') as file:
        data = json.load(file)

    if verbose:
        print("Original segments:")
        for segment in data['segments']:
            print(f"{round(segment['end'] - segment['start'], 2)}s: {segment['text']}")

    segments = data['segments']
    merged_segments = []

    # First pass: merge segments that are close together if it results in staying under max duration
    temp_segment = None
    for segment in segments:
        if temp_segment is None:
            temp_segment = segment
        else:
            gap = segment['start'] - temp_segment['end']
            combined_duration = segment['end'] - temp_segment['start']

            if gap <= max_gap_for_merge and combined_duration <= max_segment_duration:
                # Extend the current segment if they are close enough and under the max duration
                temp_segment['end'] = segment['end']
                temp_segment['text'] += " " + segment['text']
                temp_segment['words'].extend(segment['words'])
            else:
                merged_segments.append(temp_segment)
                temp_segment = segment
    if temp_segment:
        merged_segments.append(temp_segment)

    # Second pass: examine each segment for its length, merge with previous or next based on conditions
    final_segments = []
    i = 0
    while i < len(merged_segments):
        segment = merged_segments[i]
        duration = segment['end'] - segment['start']

        if duration < min_segment_duration:
            # Try to merge with previous segment if possible
            if i > 0 and (segment['end'] - merged_segments[i - 1]['start'] <= max_segment_duration):
                merged_segments[i - 1]['end'] = segment['end']
                merged_segments[i - 1]['text'] += " " + segment['text']
                merged_segments[i - 1]['words'].extend(segment['words'])
            elif i + 1 < len(merged_segments) and (merged_segments[i + 1]['end'] - segment['start'] <= max_segment_duration):
                # Merge with next segment if previous is not possible
                merged_segments[i + 1]['start'] = segment['start']
                merged_segments[i + 1]['text'] = segment['text'] + " " + merged_segments[i + 1]['text']
                merged_segments[i + 1]['words'] = segment['words'] + merged_segments[i + 1]['words']
                i += 1  # Skip the next segment since it's now merged
        else:
            final_segments.append(segment)
        i += 1

    data['segments'] = final_segments

    # Check for any segments that are still too long
    for segment in final_segments:
        duration = segment['end'] - segment['start']
        if duration > max_segment_duration:
            print(
                f"Warning: Segment longer than {max_segment_duration} seconds {duration:.2f}: {segment['text']}"
            )
        elif duration < min_segment_duration:
            print(
                f"Warning: Segment shorter than {min_segment_duration} seconds {duration:.2f}: {segment['text']}"
            )

    if verbose:
        print("\nAdjusted segments:")
        for segment in data['segments']:
            print(f"{round(segment['end'] - segment['start'], 2)}s: {segment['text']}")

    # Save the adjusted data back to a new JSON file
    if save_dir:
        save_path = os.path.join(save_dir, os.path.basename(file_path))
        if verbose:
            print(f"Saving adjusted transcript to {save_path}")
        with open(save_path, 'w') as file:
            json.dump(data, file, indent=4)

    return data


In [254]:
from pydub import AudioSegment, silence
import json

def find_silences(audio_path, min_silence_len=1000, silence_thresh=-40):
    """Detects silences in an audio file and returns the intervals."""
    sound = AudioSegment.from_file(audio_path)
    silence_intervals = silence.detect_silence(
        sound, 
        min_silence_len=min_silence_len, 
        silence_thresh=silence_thresh
    )
    # Convert from milliseconds to seconds
    silence_intervals = [(start / 1000.0, end / 1000.0) for start, end in silence_intervals]
    return silence_intervals


def add_intelligent_padding(data, max_padding=0.5):
    """ Adjust the start and end times of each segment to add padding without overlapping speech.
        'max_padding' is the maximum padding to add to each side of a segment if space allows.
    """
    segments = data['segments']
    if not segments:
        return data
    
    # Process each segment except the last
    for i in range(len(segments) - 1):
        current_segment = segments[i]
        next_segment = segments[i + 1]
        
        # Calculate available gap between the current segment end and the next segment start
        gap = next_segment['start'] - current_segment['end']
        
        # Determine the amount of padding to apply
        padding = min(max_padding, gap / 2)  # Half the gap, but no more than max_padding
        
        # Apply padding to the end of the current segment and the start of the next
        current_segment['end'] += padding
        next_segment['start'] -= padding

    # Optionally, add padding to the last segment
    segments[-1]['end'] += max_padding

    return data


def add_buffer_padding(data, buffer_time=0.2, max_allowed_gap=0.3):
    segments = data['segments']

    new_segments = []
    for i in range(len(segments)):
        current_segment = segments[i]
        end_time = current_segment['end']

        if i < len(segments) - 1:
            next_segment = segments[i + 1]
            next_start_time = next_segment['start']
            gap_to_next = next_start_time - end_time

            if gap_to_next > max_allowed_gap:
                end_time += buffer_time
            else:
                adjustment = min(buffer_time, gap_to_next // 2)
                end_time += adjustment

        else:  # Last segment
            end_time += buffer_time

        current_segment['end'] = end_time
        new_segments.append(current_segment)


    data['segments'] = new_segments
    return data


In [None]:
from copy import deepcopy


json_file = './srt/test.json'
srt_folder = "srt"
audio_folder = "wav"
audio_file = json_file.replace(srt_folder, audio_folder).replace('json', 'wav')

# Call the function with the path to your JSON file
data_1 = adjust_transcript_segments(
    json_file,
    min_segment_duration=1.7,
    max_segment_duration=12, 
    verbose=False
)

# print average, min, max gap between segments
gaps = [data_1['segments'][i+1]['start'] - data_1['segments'][i]['end'] for i in range(len(data_1['segments'])-1)]
print(f"Average gap between segments: {sum(gaps)/len(gaps):.2f}")
print(f"Minimum gap between segments: {min(gaps):.2f}")
print(f"Maximum gap between segments: {max(gaps):.2f}")


data_2 = add_intelligent_padding(deepcopy(data_1), max_padding=0.3)
data_3 = add_buffer_padding(deepcopy(data_1), buffer_time=0.3, max_allowed_gap=0.3)


In [None]:
# print segment durations
for segments in zip(
        data_1['segments'], 
        data_2['segments'], 
        data_3['segments'], 
    ):

    print(segments[0]['text'])

    for segment in segments:
        print(f"{segment['start']:.2f}, {segment['end']:.2f}, {segment['end']-segment['start']:.2f}")

    for segment in segments:
        display(
            AudioSegment.from_file(audio_file )[segment['start']*1000:segment['end']*1000]
        )
    print('---')
    

In [None]:
import glob

book = "."
srt_folder = "srt"
audio_folder = "wav"
processed_folder = "processed"

srt_list = glob.glob(f"{book}/{srt_folder}/*.json")  # Gets a list of all srt files
srt_list.sort() 

for file in srt_list[:]:
    print(file)
    data = adjust_transcript_segments(
        file, 
        # f"{book}/{processed_folder}", 
        min_segment_duration=1.7,
        max_segment_duration=12
    )

    # find silences
    audio_file = file.replace(srt_folder, audio_folder).replace('json', 'wav')
    data = add_intelligent_padding(data, max_padding=0.3)

    # save the adjusted data back to a new JSON file
    save_path = file.replace(srt_folder, processed_folder)
    with open(save_path, 'w') as file:
        json.dump(data, file, indent=4)

    # save srt file
    json_to_srt(save_path, save_path.replace('json', 'srt'))