### Speaker diarization

In [None]:
import requests
import time
from dotenv import load_dotenv
load_dotenv()
import os
from csv import writer

base_url = "https://api.assemblyai.com"

headers = {
    "authorization": f"{os.getenv("AAI_KEY")}"
}

with open("Transcription/Case 07.m4a", "rb") as f:
  response = requests.post(base_url + "/v2/upload",
                          headers=headers,
                          data=f)

upload_url = response.json()["upload_url"]

data = {
    "audio_url": upload_url, # You can also use a URL to an audio or video file on the web
    "speaker_labels": True
}

url = base_url + "/v2/transcript"
response = requests.post(url, json=data, headers=headers)

transcript_id = response.json()['id']
polling_endpoint = base_url + "/v2/transcript/" + transcript_id

while True:
  transcription_result = requests.get(polling_endpoint, headers=headers).json()

  if transcription_result['status'] == 'completed':
    print(f"Transcript ID:", transcript_id)
    break

  elif transcription_result['status'] == 'error':
    raise RuntimeError(f"Transcription failed: {transcription_result['error']}")

  else:
    time.sleep(3)
transcription = writer(open("transcribe.csv", "w", newline = "", encoding="utf-8"))
transcription.writerow(["speaker", "start", "end", "word"])
for utterance in transcription_result['utterances']:
  print(f"Speaker {utterance['speaker']}: {utterance['text']} from {utterance['start']} to {utterance["end"]}")
  transcription.writerow([utterance["speaker"], utterance["start"], utterance["end"], utterance["text"]])



### Only transcibing

In [None]:
import requests
import time
from dotenv import load_dotenv
import os
from csv import writer
load_dotenv()

base_url = "https://api.assemblyai.com"

headers = {
    "authorization": os.getenv('AAI_KEY')
}

with open("Transcription/Case 07.m4a", "rb") as f:
  response = requests.post(base_url + "/v2/upload",
                          headers=headers,
                          data=f)

upload_url = response.json()["upload_url"]
print(upload_url)

data = {
    "audio_url": upload_url, # You can also use a URL to an audio or video file on the web
    "speech_model": "universal"
    # "speech_model": "slam-1"
}

url = base_url + "/v2/transcript"
response = requests.post(url, json=data, headers=headers)

transcript_id = response.json()['id']
polling_endpoint = base_url + "/v2/transcript/" + transcript_id

while True:
  transcription_result = requests.get(polling_endpoint, headers=headers).json()

  if transcription_result['status'] == 'completed':
    print(f"Transcript ID:", transcript_id)
    break

  elif transcription_result['status'] == 'error':
    raise RuntimeError(f"Transcription failed: {transcription_result['error']}")

  else:
    time.sleep(3)

csv_writer = writer(open("transcribe.csv", "w", encoding="utf-8", newline=""))

print(transcription_result)
csv_writer.writerow(["word", "start", "end"])
for result in transcription_result["words"]:
    print(f"{result["text"]}, from {result['start']} to {result["end"]}")
    csv_writer.writerow([result["text"], result["start"], result["end"]])


In [None]:
for result in transcription_result["words"]:
    print(result)

In [None]:
for result in transcription_result["words"]:
    print(f"{result["text"]}, from {result['start']} to {result["end"]}")


In [None]:
# instantiate the pipeline
from dotenv import load_dotenv
import os
load_dotenv()

HF_KEY = os.getenv("HF_KEY")
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained(
  "pyannote/speaker-diarization",
  use_auth_token=HF_KEY)

# run the pipeline on an audio file
with open("Transcription/Case 07.m4a", "rb") as f:
  diarization = pipeline(f)

# dump the diarization output to disk using RTTM format
with open("audio.rttm", "w") as rttm:
    diarization.write_rttm(rttm)


In [None]:
# instantiate the pipeline
from dotenv import load_dotenv
import os
load_dotenv()

HF_KEY = os.getenv("HF_KEY")
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained(
  "pyannote/speaker-diarization-3.1",
  use_auth_token=HF_KEY)

# run the pipeline on an audio file
with open("Transcription/Case 07_converted.wav", "rb") as f:
    # diarization = pipeline(f)#, num_speakers=2)
    # diarization = pipeline(f, num_speakers=3)
    diarization = pipeline(f, min_speakers=2, max_speakers=5)


# dump the diarization output to disk using rttm format
with open("audio.rttm", "w") as rttm:
    diarization.write_rttm(rttm)

with open("audio.lab", "w") as lab:
    diarization.write_lab(lab)


In [None]:
help(diarization)

In [None]:
import subprocess
import os

def convert_audio_with_ffmpeg(input_path, output_path, sample_rate=16000, channels=1):
    command = [
        "ffmpeg",
        "-i",
        input_path,
        "-ar", str(sample_rate),
        "-ac", str(channels),
        output_path
    ]

    print(f"Executing FFmpeg command: {' '.join(command)}")

    try:
        result = subprocess.run(command, capture_output=True, text=True, check=True)

        print("FFmpeg conversion successful!")
        print("STDOUT:\n", result.stdout)
        print("STDERR:\n", result.stderr)
        return True

    except FileNotFoundError:
        print(f"ERROR: FFmpeg not found. Please ensure FFmpeg is installed and added to your system's PATH.")
        return False
    except subprocess.CalledProcessError as e:
        print(f"ERROR: FFmpeg conversion failed with exit code {e.returncode}")
        print("STDOUT:\n", e.stdout)
        print("STDERR:\n", e.stderr)
        print(f"Command used: {' '.join(e.cmd)}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred during FFmpeg conversion: {e}")
        return False

if __name__ == "__main__":
    input_m4a_path = "Transcription/Case 07.m4a"
    output_wav_path = "Transcription/Case 07_converted.wav"

    if not os.path.exists(input_m4a_path):
        print(f"'{input_m4a_path}' not found. Please place an .m4a file there or change path.")
        try:
            subprocess.run(["ffmpeg", "-f", "lavfi", "-i", "anullsrc=r=44100:cl=mono,atempo=1.0", "-t", "5", input_m4a_path], check=True, capture_output=True)
            print(f"Dummy '{input_m4a_path}' created for testing.")
        except (FileNotFoundError, subprocess.CalledProcessError):
            print("Could not create dummy file. Ensure FFmpeg is installed.")
            exit()


    if convert_audio_with_ffmpeg(input_m4a_path, output_wav_path):
        print(f"Successfully converted '{input_m4a_path}' to '{output_wav_path}'")
    else:
        print(f"Failed to convert '{input_m4a_path}'. Check errors above.")
        