In [10]:
import torch
import pytubefix

In [11]:
#language = "english"
language = "german"
m4a_file = 'short_video_' + language +  '.m4a'


In [41]:
if language == "english":
    yt = pytubefix.YouTube("https://www.youtube.com/watch?v=f5s-Dvul22A")
else:
    yt = pytubefix.YouTube("https://www.youtube.com/watch?v=68gLTp_mIAw&t=58s")

#For abr choose the highest available one
yt.streams.filter(only_audio=True, abr="128kbps").first().download(filename=m4a_file)

'/Users/anniherrmann/tvdebates/short_video_german.m4a'

In [42]:
from pydub import AudioSegment


wav_filename = 'debate_' + language + '.wav'

sound = AudioSegment.from_file(m4a_file, format='m4a')
file_handle = sound.export(wav_filename, format='wav')

In [43]:
device = "mps" if torch.backends.mps.is_available() else "cpu"
torch_device = torch.device(device)
torch_dtype =  torch.float16 if torch.backends.mps.is_available() else torch.float32

In [44]:
with open("access_token","r") as f:
    hf_access_token = f.read()

In [47]:
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained(
  "pyannote/speaker-diarization-3.1",
  use_auth_token=hf_access_token)

pipeline.to(torch_device)

# run the pipeline on an audio file
diarization = pipeline(wav_filename, num_speakers=None)

speaker_file = "timestamps_" + language + ".rttm"

# dump the diarization output to disk using RTTM format
with open(speaker_file, "w") as rttm:
    diarization.write_rttm(rttm)

In [48]:

for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s {speaker}")

start=0.0s stop=4.0s SPEAKER_01
start=4.0s stop=4.5s SPEAKER_02
start=4.5s stop=4.5s SPEAKER_01
start=5.4s stop=6.4s SPEAKER_02
start=7.5s stop=8.1s SPEAKER_02
start=9.0s stop=12.9s SPEAKER_02
start=17.8s stop=51.5s SPEAKER_01
start=51.9s stop=55.6s SPEAKER_01
start=56.3s stop=57.7s SPEAKER_01
start=58.7s stop=58.7s SPEAKER_01
start=58.7s stop=92.0s SPEAKER_02
start=92.0s stop=105.8s SPEAKER_01
start=106.8s stop=107.0s SPEAKER_01
start=107.0s stop=115.1s SPEAKER_02
start=115.9s stop=123.6s SPEAKER_02
start=124.8s stop=140.3s SPEAKER_02
start=133.3s stop=133.8s SPEAKER_00
start=133.8s stop=134.0s SPEAKER_03
start=134.0s stop=134.3s SPEAKER_00
start=140.6s stop=160.1s SPEAKER_02
start=160.3s stop=163.7s SPEAKER_02
start=164.5s stop=165.8s SPEAKER_02
start=166.2s stop=169.6s SPEAKER_02
start=169.9s stop=188.0s SPEAKER_02
start=188.6s stop=204.4s SPEAKER_02
start=204.7s stop=226.6s SPEAKER_02
start=226.8s stop=237.4s SPEAKER_01
start=237.9s stop=243.0s SPEAKER_02
start=243.4s stop=248.0s S

In [49]:
# Saving the speakers and timestamps into a file
from datetime import timedelta

# Time formatting: seconds -> hh:mm:ss
def format_time(seconds):
    return str(timedelta(seconds=round(seconds)))

# Filename for the output
speaker_file = "formatted_timestamps.txt"

# Minimum duration for a speach in seconds
min_duration = 1.0

# Maximum permitted break between two segments of the same speaker
max_pause = 1.0

with open(speaker_file, "w", encoding="utf-8") as file:
    merged_start = None
    merged_end = None
    merged_speaker = None

    # Iterating over the segments
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        # Calculation of the duration
        exact_duration = turn.end - turn.start
        duration = round(exact_duration, 2)

        # Only consider segments that are larger than the threshold
        if duration >= min_duration:
            # If the current speaker is the same as the previous one and the break is short enough
            if speaker == merged_speaker and (turn.start - merged_end) <= max_pause:
                # Update end time
                merged_end = turn.end
            else:
                # If there was previously a merged segment, write this segment
                if merged_speaker:
                    merged_exact_duration = merged_end - merged_start
                    start_hhmmss = format_time(merged_start)
                    end_hhmmss = format_time(merged_end)
                    file.write(f"Start: {start_hhmmss}, Dauer: {round(merged_exact_duration, 2)}s, Ende: {end_hhmmss}, Sprecher: {merged_speaker}\n")

                # Start new segment
                merged_start = turn.start
                merged_end = turn.end
                merged_speaker = speaker

    # Write last merged segment, if present
    if merged_speaker:
        merged_exact_duration = merged_end - merged_start
        start_hhmmss = format_time(merged_start)
        end_hhmmss = format_time(merged_end)
        file.write(f"Start: {start_hhmmss}, Dauer: {round(merged_exact_duration, 2)}s, Ende: {end_hhmmss}, Sprecher: {merged_speaker}\n")

In [50]:
# Assignment of speakers to their names
speaker_names = {

    'SPEAKER_01': 'Moderator',
    'SPEAKER_02': 'Robert Habeck (Grüne)',

}

with open(speaker_file, 'r') as f:
    lines = f.readlines()

# Add names to the file
new_lines = []
for line in lines:
    for speaker, speaker_name in speaker_names.items():
        if speaker in line:
            line = line.strip() + f', {speaker_name}\n'
            break 
    new_lines.append(line)

# Save changes directly in the original file
with open(speaker_file, 'w') as f:
    f.writelines(new_lines)

In [34]:
chunk_length_ms = 10 * 60 * 1000  # 10 min in milliseconds
audio = AudioSegment.from_wav(wav_filename)
chunks = [audio[i:i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)]
final_transcript = []
offset = 0

In [24]:
import whisper_timestamped as whisper
model = model = whisper.load_model("medium")

In [35]:
for idx, chunk in enumerate(chunks):
    # Export the chunk to a temporary WAV file
    chunk_filename = f"chunk_{idx}.wav"
    chunk.export(chunk_filename, format="wav")
    
    # Transcribe the chunk
    result = whisper.transcribe(model, chunk_filename, temperature=0.2)
    
    # Adjust timestamps based on chunk start time
    for segment in result["segments"]:
        segment["start"] += offset
        segment["end"] += offset
        for word in segment.get("words", []):
            word["start"] += offset
            word["end"] += offset

    final_transcript.extend(result["segments"])
    
    # Update offset for next chunk
    offset += chunk_length_ms / 1000  # Convert ms to seconds


Detected language: German


100%|██████████| 60000/60000 [17:42<00:00, 56.48frames/s]


Detected language: German


100%|██████████| 60000/60000 [16:05<00:00, 62.12frames/s]


Detected language: German


100%|██████████| 60000/60000 [14:08<00:00, 70.72frames/s]


Detected language: German


100%|██████████| 60000/60000 [15:52<00:00, 63.01frames/s]


Detected language: German


100%|██████████| 60000/60000 [13:26<00:00, 74.37frames/s]


Detected language: German


100%|██████████| 60000/60000 [12:00<00:00, 83.29frames/s]


Detected language: German


100%|██████████| 19692/19692 [03:42<00:00, 88.41frames/s]


In [38]:
for segment in final_transcript:
    for word in segment.get("words", []):  # Use .get() to avoid errors if "words" is missing
        print(f"{word['start']:.2f}s - {word['end']:.2f}s: {word['text']}")

0.16s - 0.42s: Wenn
0.42s - 0.76s: die
0.76s - 1.22s: FDP
1.22s - 1.56s: am
1.56s - 2.04s: Sonntag
2.04s - 2.22s: an
2.22s - 2.46s: der
2.46s - 3.60s: 5-Prozent-Hürde
3.60s - 4.06s: scheitern
4.06s - 4.36s: sollte
4.36s - 4.44s: ...
5.40s - 5.42s: ...
5.42s - 5.60s: hat
5.60s - 5.78s: sie
5.78s - 6.02s: selbst
6.02s - 6.26s: Schuld.
7.44s - 7.64s: Wären
7.64s - 7.80s: Sie
7.80s - 8.22s: traurig?
9.04s - 9.82s: Ich
9.82s - 9.98s: hab
9.98s - 10.18s: keine
10.18s - 10.60s: Emotionen
10.60s - 10.74s: zur
10.74s - 11.06s: FDP.
11.42s - 11.62s: Aber
11.62s - 11.80s: das
11.80s - 11.94s: ist
11.94s - 12.10s: hart
12.10s - 12.34s: selbst
12.34s - 12.90s: erarbeitet.
17.82s - 18.32s: Als
18.32s - 19.12s: Superminister
19.12s - 19.56s: wollte
19.56s - 19.80s: er
19.80s - 19.96s: das
19.96s - 20.24s: grüne
20.24s - 21.04s: Wirtschaftswunder
21.04s - 21.62s: einläuten.
21.70s - 21.98s: Doch
21.98s - 22.20s: statt
22.20s - 22.38s: dem
22.38s - 23.04s: Wunder
23.04s - 23.36s: kam
23.36s - 23.78s: d

In [46]:
import json

output_filename = "transcription_Robert_Habeck.json"

with open(output_filename, "w", encoding="utf-8") as f:
    json.dump(final_transcript, f, ensure_ascii=False, indent=4)

print(f"Saved to {output_filename}")


Saved to transcription_Robert_Habeck.json


In [52]:
# Extract transcribed text from all chunks
transcribed_text = "\n".join(chunk["text"] for chunk in final_transcript)

# Save to file
with open("transcription.txt", "w", encoding="utf-8") as file:
    file.write(transcribed_text)

In [55]:
# Merge speaker diarization and transcription
diarization_segments = []
min_duration = 1.0  #Minimum duration for a speach in seconds
tolerance = 0.2  # Allows small deviations for the first words

for segment, _, speaker in diarization.itertracks(yield_label=True):
    segment_duration = segment.end - segment.start
    if segment_duration >= min_duration:
        diarization_segments.append((segment.start, segment.end, speaker))

active_speaker = None
combined_output = []
merged_start = None
merged_end = None
merged_speaker = None
merged_text = []

# Check which speaker is assigned to each word in the transcription
for segment in final_transcript:
    for word in segment.get("words", []):
        current_time = word["start"]

        # Find the matching speaker segment based on the timestamps
        closest_speaker = None
        for start, end, speaker in diarization_segments:
            # Check whether the word is within a segment or just before it (tolerance)
            if start - tolerance <= current_time <= end:
                closest_speaker = speaker
                break 

        # When a speaker has been found
        if closest_speaker is not None:
            # If the speaker changes or no speaker is set yet
            if active_speaker != closest_speaker:
                # If there has been a change of speaker, write the previous text
                if merged_speaker:
                    speaker_name = speaker_names.get(merged_speaker, "Unknown Speaker")
                    combined_output.append(f"{speaker_name}: {' '.join(merged_text)}")
                
                # Set variables for the new speaker
                merged_speaker = closest_speaker
                merged_start = word["start"]
                merged_end = word["end"]
                merged_text = [word["text"]]
                active_speaker = closest_speaker  # Update active speaker
            else:
                # If the speaker remains the same, add the word
                merged_end = word["end"]
                merged_text.append(word["text"])
        else:
            # If no speaker can be assigned, append the word to the last speaker
            if merged_speaker:
                merged_text.append(word["text"])

# Add last merged speaker, if present
if merged_speaker:
    speaker_name = speaker_names.get(merged_speaker, "Unknown Speaker")
    combined_output.append(f"{speaker_name}: {' '.join(merged_text)}")

# Save output
output_file = "combined_output.txt"
with open(output_file, "w", encoding="utf-8") as f:
    for line in combined_output:
        f.write(line + "\n")