# ESL Speech Analysis (Remote Kernel)


# Cell 0: Environment Setup (run once)
# For .m4a support, pydub needs ffmpeg available in the runtime.
!pip install faster-whisper language-tool-python pydub openai replicate -U typing_extensions

In [19]:
import ipywidgets as widgets
from IPython.display import display

# FileUpload widget (in-memory)
upload = widgets.FileUpload(accept=".wav,.m4a", multiple=False)
display(upload)

# Global variable to hold the audio content
AUDIO_BYTES = None
AUDIO_FILENAME = None


def _iter_uploaded_files(value):
    # ipywidgets can return dict-like (v7) or tuple/list (v8)
    if hasattr(value, "items"):
        for name, file_info in value.items():
            yield name, file_info
    elif isinstance(value, (list, tuple)):
        for file_info in value:
            name = file_info.get("name") if isinstance(file_info, dict) else None
            yield name, file_info


def store_audio(change):
    global AUDIO_BYTES, AUDIO_FILENAME
    if not upload.value:
        return
    for name, file_info in _iter_uploaded_files(upload.value):
        if isinstance(file_info, dict):
            AUDIO_BYTES = file_info.get("content")
            AUDIO_FILENAME = name or file_info.get("name")
            if AUDIO_BYTES and AUDIO_FILENAME:
                print(
                    f"Audio file '{AUDIO_FILENAME}' is now ready in memory for other cells."
                )


# Automatically trigger when a file is uploaded
upload.observe(store_audio, names="value")


FileUpload(value=(), accept='.wav,.m4a', description='Upload')

In [20]:
# Cell 1: Load most recent audio file from ./audio (.wav or .m4a)
import os
import io
import tempfile
from pydub import AudioSegment

AUDIO_DIR = "audio"
SUPPORTED_EXTS = {".wav", ".m4a"}

# Prefer in-memory upload if present
if "AUDIO_BYTES" in globals() and AUDIO_BYTES and AUDIO_FILENAME:
    ext = os.path.splitext(AUDIO_FILENAME)[1].lower()
    if ext not in SUPPORTED_EXTS:
        raise ValueError("Unsupported file type. Use .wav or .m4a.")

    os.makedirs(AUDIO_DIR, exist_ok=True)
    if ext == ".m4a":
        AUDIO_PATH = os.path.join(
            AUDIO_DIR, os.path.splitext(AUDIO_FILENAME)[0] + ".wav"
        )
        audio = AudioSegment.from_file(io.BytesIO(AUDIO_BYTES), format="m4a")
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio.export(AUDIO_PATH, format="wav")
    else:
        AUDIO_PATH = os.path.join(AUDIO_DIR, AUDIO_FILENAME)
        with open(AUDIO_PATH, "wb") as f:
            f.write(AUDIO_BYTES)

    print(f"Audio file loaded from upload: {AUDIO_PATH}")
else:
    if not os.path.isdir(AUDIO_DIR):
        raise FileNotFoundError(f"Directory not found: {AUDIO_DIR}")

    candidates = [
        os.path.join(AUDIO_DIR, f)
        for f in os.listdir(AUDIO_DIR)
        if os.path.splitext(f)[1].lower() in SUPPORTED_EXTS
        and os.path.isfile(os.path.join(AUDIO_DIR, f))
    ]

    if not candidates:
        raise FileNotFoundError(
            "No .wav or .m4a files found in ./audio. Add a file and try again."
        )

    INPUT_PATH = max(candidates, key=os.path.getmtime)

    ext = os.path.splitext(INPUT_PATH)[1].lower()

    if ext == ".m4a":
        AUDIO_PATH = os.path.splitext(INPUT_PATH)[0] + ".wav"
        audio = AudioSegment.from_file(INPUT_PATH, format="m4a")
        # Convert to mono/16k for best Whisper results
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio.export(AUDIO_PATH, format="wav")
    elif ext == ".wav":
        AUDIO_PATH = INPUT_PATH
    else:
        raise ValueError("Unsupported file type. Use .wav or .m4a.")

    print(f"Audio file loaded: {AUDIO_PATH}")


# Cell 0c: Replicate diarization (Whisper + diarization as a service)
# Put your Replicate API key here (or set in environment before running):
os.environ["REPLICATE_API_TOKEN"] = "r8_UdBzdYuIUsDW5MWGvuvgchX7FrxJvho3Vj88j"

import os
import replicate

# Ensure audio is prepared (run Cell 1 first to set AUDIO_PATH)
if "AUDIO_PATH" not in globals():
    raise RuntimeError("AUDIO_PATH not set. Run the audio load cell first.")

if not os.environ.get("REPLICATE_API_TOKEN"):
    raise RuntimeError("REPLICATE_API_TOKEN not set. Add it in this cell and re-run.")

# Replicate diarization settings
NUM_SPEAKERS = 2  # set to None to autodetect
GROUP_SEGMENTS = True  # merge short same-speaker segments

# Run diarization on Replicate (pin to a model version)

model_id = "thomasmol/whisper-diarization:1495a9cddc83b2203b0d8d3516e38b80fd1572ebc4bc5700ac1da56a9b3ed886"
with open(AUDIO_PATH, "rb") as f:
    input_payload = {
        "file": f,
        "output": "json",
        "group_segments": GROUP_SEGMENTS,
    }
    if NUM_SPEAKERS:
        input_payload["num_speakers"] = NUM_SPEAKERS
    replicate_output = replicate.run(
        model_id,
        input=input_payload,
    )

# Save results for downstream use
REPLICATE_DIARIZATION = replicate_output

# print("Replicate diarization output:")
# print(replicate_output)


# Cell 2: Human-readable diarization output (from Replicate)

import re


def _format_time(sec):
    m = int(sec // 60)
    s = sec % 60
    return f"{m:02d}:{s:05.2f}"


def _normalize_text(t):
    t = t.lower()
    t = re.sub(r"\s+", " ", t).strip()
    t = re.sub(r"[\W_]+", "", t)
    return t


def _dedupe_sentences(text):
    # Remove consecutive duplicate sentences after splitting on punctuation
    parts = re.split(r"(?<=[.!?])\s+", text.strip())
    out = []
    last_norm = None
    for p in parts:
        if not p:
            continue
        norm = _normalize_text(p)
        if norm and norm == last_norm:
            continue
        out.append(p)
        last_norm = norm
    return " ".join(out)


def _merge_segments(segments):
    merged = []
    last_text_norm = None
    for seg in segments:
        speaker = seg.get("speaker") or "UNKNOWN"
        text = (seg.get("text") or "").strip()
        if not text:
            continue
        start = seg.get("start", 0.0)
        end = seg.get("end", 0.0)
        # If speaker is UNKNOWN, stick with previous speaker when possible
        if speaker == "UNKNOWN" and merged:
            speaker = merged[-1]["speaker"]
        text = _dedupe_sentences(text)
        text_norm = _normalize_text(text)
        if text_norm and text_norm == last_text_norm:
            # Skip exact repeat chunk
            continue
        if merged and merged[-1]["speaker"] == speaker:
            # Merge consecutive same-speaker chunks
            merged[-1]["end"] = end
            merged[-1]["text"] += " " + text
        else:
            merged.append(
                {"speaker": speaker, "start": start, "end": end, "text": text}
            )
        last_text_norm = text_norm
    return merged


def _pretty_print_replicate(output):
    if not output:
        print(
            "No Replicate output available. Run the Replicate diarization cell first."
        )
        return
    # Replicate returns a dict with `segments` or a list in some cases
    segments = None
    if isinstance(output, dict):
        segments = output.get("segments")
    elif isinstance(output, list):
        segments = output
    if not segments:
        print("No segments found in Replicate output.")
        return
    merged = _merge_segments(segments)
    if not merged:
        print("No usable segments after merging.")
        return
    print("HUMAN-READABLE DIARIZATION:\n")
    last_speaker = None
    for seg in merged:
        speaker = seg["speaker"]
        start = _format_time(seg.get("start", 0.0))
        end = _format_time(seg.get("end", 0.0))
        text = seg["text"]
        if speaker != last_speaker:
            print(f"[{speaker}] {start}–{end}: {text}")
            last_speaker = speaker
        else:
            print(text)


# Use saved output from the Replicate cell
_pretty_print_replicate(globals().get("REPLICATE_DIARIZATION"))

Audio file loaded from upload: audio/famous people.wav
HUMAN-READABLE DIARIZATION:

[SPEAKER_01] 00:01.95–00:12.91: We've been talking about a well-known person that you admire, and I'd like to discuss with you one or two more general questions related to this. Let's consider, first of all, famous people in your country.
[SPEAKER_00] 00:13.15–00:13.55: Yeah.
[SPEAKER_01] 00:15.05–00:17.33: What kind of people become famous in China?
[SPEAKER_00] 00:18.21–00:48.01: You know, those actors, especially the movie actors and the sports stars, they are very famous now in China because they can be seen by the people every day during the movie on the advertisements. They can be seen all the times, so they are very famous. And those people who are very rich and who have a really big company, they are also on the TV, on the news, so they are very famous as well.
[SPEAKER_01] 00:48.99–00:53.99: What's different about people who were famous in the past with people who are famous these days?
[SPEAKE

In [23]:
# Cell 4: ESL Error Detection (OpenAI via HTTP)
import os
import json
import requests

OPENAI_MODEL = "gpt-5.2"
api_key = "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"


def _extract_transcript_from_replicate(output):
    if isinstance(output, dict):
        if "text" in output and isinstance(output["text"], str):
            return output["text"].strip()
        segments = output.get("segments")
        if isinstance(segments, list):
            parts = []
            for seg in segments:
                if isinstance(seg, dict):
                    t = (seg.get("text") or "").strip()
                    if t:
                        parts.append(t)
            if parts:
                return " ".join(parts)
    return None


# Use Whisper transcript if available; otherwise fall back to Replicate output
if "transcript" not in globals() or not transcript:
    transcript = _extract_transcript_from_replicate(
        globals().get("REPLICATE_DIARIZATION")
    )
    if not transcript:
        raise RuntimeError(
            "Transcript not available. Run the Whisper/WhisperX cell first."
        )


def get_issue_type(match):
    if isinstance(match, dict):
        return match.get("type", "UNKNOWN")
    if hasattr(match, "ruleId"):
        return match.ruleId
    if hasattr(match, "rule_id"):
        return match.rule_id
    if hasattr(match, "rule"):
        rule = match.rule
        if isinstance(rule, dict) and "id" in rule:
            return rule["id"]
        if hasattr(rule, "id"):
            return rule.id
    return "UNKNOWN"


if not api_key:
    print("OPENAI_API_KEY not set. Set it to enable OpenAI-based ESL checks.")
    matches = []
else:
    system_msg = (
        '''
You are an ESL grammar and fluency analyst focused on IELTS Speaking assessment.

Input: a transcript with multiple speakers labeled [SPEAKER_00], [SPEAKER_01], etc.

TASK
1) Identify the second speaker in order of first appearance.
2) Analyze ONLY that speaker’s speech.
3) Produce a teacher-facing feedback TABLE that groups common errors and prioritizes them by IELTS impact.

NON-NEGOTIABLE RULES
- Use ONLY errors that explicitly appear in the transcript.
- Quote the student’s exact words for every example.
- Do NOT invent, generalize, or paraphrase student language.
- Group similar errors together under a clear error category.
- Explanations must reflect the REAL grammatical or lexical issue.
- Do NOT mention grammar forms that are not used in the correction.
- Prioritize errors that most affect IELTS bands (Coherence, Lexical Resource, Grammar).

OUTPUT FORMAT (STRICT TABLE)

Title: On-the-Fly Feedback Table (Speaker 00)

Table columns (exactly these, in this order):
1) Error Group (sorted from highest to lowest IELTS impact)
2) Student Examples (exact phrases from transcript)
3) Better Versions
4) Explanation (clear, teacher-ready, 1–2 short lines)

SORTING RULE
- Order rows by highest IELTS impact first:
  1) Coherence / clause structure / logic
  2) Collocation and word choice
  3) Fixed phrases / prepositions
  4) Verb forms and agreement
  5) Pronouns, fillers, repetition

STYLE CONSTRAINTS
- Concise but clear explanations
- No paragraphs outside the table
- No teaching activities or advice
- Teacher-facing language suitable for quick explanation in class
        '''
    )
    user_msg = f"Transcript:\n{transcript}"

    payload = {
        "model": OPENAI_MODEL,
        "messages": [
            {"role": "system", "content": system_msg},
            {"role": "user", "content": user_msg},
        ],
        "response_format": {
            "type": "json_schema",
            "json_schema": {"name": "esl_issues", "schema": schema, "strict": True},
        },
    }
    r = requests.post(
        "https://api.openai.com/v1/chat/completions",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        },
        json=payload,
        timeout=60,
    )
    r.raise_for_status()
    resp = r.json()

    if "choices" in resp and len(resp["choices"]) > 0:
        content_str = resp["choices"][0]["message"]["content"]
        data = json.loads(content_str)
        matches = data.get("issues", [])
    else:
        raise ValueError(
            f"Unexpected response format from OpenAI: {json.dumps(resp, indent=2)}"
        )

print("ESL ISSUES:\n")
for m in matches:
    if isinstance(m, dict):
        print(f"- {m.get('message', '')}")
        print(f"  Context: {m.get('context', '')}")
        if m.get("suggestion"):
            print(f"  Suggestion: {m.get('suggestion')}")
        print()
    else:
        print(f"- {m.message}")
        print(f"  Context: {m.context}")
        print()

ESL ISSUES:

- On-the-Fly Feedback Table (Speaker 00)

| Error Group (sorted from highest to lowest IELTS impact) | Student Examples (exact phrases from transcript) | Better Versions | Explanation (clear, teacher-ready, 1–2 short lines) |
|---|---|---|---|
| Coherence / clause structure / logic | "I think sometimes people need some casual social life that if they have a hobby actually they could probably" | "I think people sometimes want a casual social life, and having a hobby can help." | The sentence structure is tangled ("that if...") and weakens clarity and coherence. |
| Coherence / clause structure / logic | "Actually everyone could do it because as we know it's very expensive" | "Actually, not everyone can do it because, as we know, it’s very expensive." | Logical connector is wrong ("everyone could" conflicts with "very expensive"). |
| Coherence / clause structure / logic | "if he really addicted to a sports car" | "if he’s really addicted to sports cars" | Missing verb "is/’