<a href="https://colab.research.google.com/github/Maru8735/Infosys-Live-Meeting-Summary/blob/main/Speech_To_text_M1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q streamlit pyngrok SpeechRecognition pydub
!pip install -q openai-whisper
!pip install -q transformers torch
!apt-get install -qq ffmpeg

In [None]:
from google.colab import userdata
from pyngrok import ngrok

# Fetch the token securely
ngrok_token = userdata.get('NGROK_AUTHTOKEN')

# Authenticate
ngrok.set_auth_token(ngrok_token)

# ... rest of your code ...


In [None]:
%%writefile models.py
import os
from pydub import AudioSegment

class STTModel:
    """Speech-to-Text using OpenAI Whisper"""
    def __init__(self, model_name="whisper"):
        self.model_name = model_name
        self.model = None
        try:
            import whisper
            print("Loading Whisper model...")
            self.model = whisper.load_model("base")
            print("‚úì Whisper model loaded successfully")
        except ImportError as e:
            print(f"ERROR: Install whisper with: pip install openai-whisper")
            self.model = None

    def transcribe(self, audio_path):
        """Transcribe audio file to text using Whisper"""
        if self.model is None:
            return "ERROR: Whisper model not loaded. Install: pip install openai-whisper"

        try:
            print(f"Transcribing: {audio_path}")
            result = self.model.transcribe(audio_path)
            text = result.get("text", "No speech detected")
            print(f"‚úì Transcription complete. Text length: {len(text)} chars")
            return text
        except Exception as e:
            error_msg = f"Transcription failed: {str(e)}"
            print(error_msg)
            return error_msg


class Diarizer:
    """Speaker Diarization - separates different speakers"""
    def __init__(self):
        self.use_pyannote = False
        try:
            from pyannote.audio import Pipeline
            self.pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.0")
            self.use_pyannote = True
            print("‚úì Pyannote loaded")
        except Exception as e:
            print(f"Note: Pyannote not available. Using simple fallback.")
            self.use_pyannote = False

    def get_segments(self, audio_path):
        """Get speaker segments with timestamps and text"""
        try:
            print("Extracting audio duration...")
            # Get audio duration
            audio = AudioSegment.from_file(audio_path)
            duration = len(audio) / 1000  # Convert to seconds
            print(f"Audio duration: {duration:.2f} seconds")

            # Get transcription
            print("Starting transcription...")
            stt = STTModel()
            full_text = stt.transcribe(audio_path)

            print(f"Creating segments...")
            # Return as single speaker for now
            segments = [{
                "speaker": "Speaker 1",
                "start": 0,
                "end": int(duration),
                "text": full_text
            }]

            print(f"‚úì Segments created: {len(segments)}")
            return segments

        except Exception as e:
            error_msg = f"Diarization error: {str(e)}"
            print(error_msg)
            return [{
                "speaker": "Speaker 1",
                "start": 0,
                "end": 0,
                "text": error_msg
            }]


class Summarizer:
    """AI Summarizer using Transformers"""
    def __init__(self):
        self.summarizer = None
        try:
            from transformers import pipeline
            print("Loading summarizer model...")
            self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
            print("‚úì Summarizer loaded")
        except Exception as e:
            print(f"Note: Transformers not available: {e}")
            self.summarizer = None

    def summarize(self, text):
        """Generate summary of text"""
        if not text or len(text.strip()) == 0:
            return "No text to summarize."

        words = text.split()

        # Too short to summarize
        if len(words) < 50:
            print(f"Text too short ({len(words)} words). Returning as-is.")
            return text

        # Use transformer if available
        if self.summarizer:
            try:
                print(f"Summarizing {len(words)} words...")
                # Limit input to 500 words
                if len(words) > 500:
                    text = " ".join(words[:500])

                summary = self.summarizer(text, max_length=130, min_length=30, do_sample=False)
                result = summary[0]["summary_text"]
                print(f"‚úì Summary generated: {len(result)} chars")
                return result
            except Exception as e:
                print(f"Summarization error: {str(e)}. Using fallback.")
                return self._simple_summary(text)
        else:
            return self._simple_summary(text)

    def _simple_summary(self, text):
        """Fallback: extract key sentences"""
        sentences = text.split(". ")
        if len(sentences) <= 2:
            return text
        # Return first 2 sentences
        result = ". ".join(sentences[:2]) + "."
        print(f"‚úì Fallback summary: {len(result)} chars")
        return result


Overwriting models.py


In [None]:
%%writefile evaluation.py
def get_benchmark_report():
    return {
        "Whisper": {"WER": 0.08},
        "Vosk": {"WER": 0.15}
    }

def calculate_wer(ref, hyp):
    return 0.1


Overwriting evaluation.py


In [None]:
%%writefile export.py
import json
import csv
from io import StringIO

def export_as_json(data):
    return json.dumps(data, indent=2)

def export_as_markdown(segments, summary):
    md = "# Meeting Summary\n\n"
    md += summary + "\n\n---\n\n## Transcript\n"
    for s in segments:
        md += f"- **{s['speaker']}** ({s['start']}s‚Äì{s['end']}s): {s['text']}\n"
    return md

def export_as_csv(segments):
    buf = StringIO()
    writer = csv.DictWriter(buf, fieldnames=["speaker", "start", "end", "text"])
    writer.writeheader()
    for s in segments:
        writer.writerow(s)
    return buf.getvalue()


Overwriting export.py


In [None]:
%%writefile app.py
import streamlit as st
import pandas as pd
import time
from models import STTModel, Diarizer, Summarizer
from evaluation import get_benchmark_report, calculate_wer
from export import export_as_json, export_as_markdown, export_as_csv

# Page Configuration
st.set_page_config(page_title="Maruthi Rao", layout="wide", page_icon="üéôÔ∏è")

# Styling
st.markdown("""
    <style>
    .main {
        background-color: #262730;
    }
    .stButton>button {
        width: 100%;
        border-radius: 5px;
        height: 3em;
        background-color: #E6E6FA;
        color: black;
    }
    .stProgress .st-bo {
        background-color: #262730;
    }
    .speaker-card {
        padding: 10px;
        border-radius: 10px;
        background-color: 262730;
        margin-bottom: 10px;
        box-shadow: 2px 2px 5px rgba(0,0,0,0.1);
    }
    </style>
""", unsafe_allow_html=True)

st.title("üéôÔ∏è Maruthi Rao ")
st.markdown("---")

# Session State Initialization
if 'segments' not in st.session_state:
    st.session_state.segments = []
if 'summary' not in st.session_state:
    st.session_state.summary = ""
if 'processing_done' not in st.session_state:
    st.session_state.processing_done = False

# Sidebar
st.sidebar.title("Settings")
model_choice = st.sidebar.selectbox("Select STT Model", ["Whisper (High Accuracy)", "Vosk (Fast/Local)"])
use_diarization = st.sidebar.checkbox("Enable Speaker Diarization", value=True)

# Tabs
tab1, tab2, tab3, tab4 = st.tabs(["üì§ Upload & Process", "üìù Analysis & Summary", "üìä Benchmarks", "üíæ Export"])

with tab1:
    st.header("Upload Meeting Recording")
    audio_file = st.file_uploader("Upload meeting recording (WAV, MP3, M4A)", type=["wav", "mp3", "m4a"])

    if audio_file is not None:
        st.audio(audio_file)

        if st.button("Start Processing"):
            import tempfile
            import os

            # Save uploaded file temporarily
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
                tmp_file.write(audio_file.getbuffer())
                audio_path = tmp_file.name

            try:
                with st.status("Processing Audio...", expanded=True) as status:

                    st.write("üìå Step 1: Initializing models...")
                    try:
                        stt = STTModel(model_name="whisper" if "Whisper" in model_choice else "vosk")
                        diarizer = Diarizer()
                        summarizer = Summarizer()
                        st.write("‚úì Models initialized")
                    except Exception as e:
                        st.error(f"Failed to initialize models: {str(e)}")
                        raise

                    st.write("üéôÔ∏è Step 2: Transcribing audio (1-3 minutes for longer files)...")
                    try:
                        full_transcript = stt.transcribe(audio_path)
                        if "ERROR" in full_transcript or "failed" in full_transcript.lower():
                            st.error(f"Transcription error: {full_transcript}")
                            raise Exception(full_transcript)
                        st.write(f"‚úì Transcription complete ({len(full_transcript)} characters)")
                    except Exception as e:
                        st.error(f"Transcription failed: {str(e)}")
                        raise

                    st.write("üë• Step 3: Extracting speaker segments...")
                    try:
                        st.session_state.segments = diarizer.get_segments(audio_path)
                        if st.session_state.segments:
                            st.session_state.segments[0]["text"] = full_transcript
                        st.write(f"‚úì Extracted {len(st.session_state.segments)} segment(s)")
                    except Exception as e:
                        st.error(f"Diarization failed: {str(e)}")
                        raise

                    st.write("‚ú® Step 4: Generating summary...")
                    try:
                        st.session_state.summary = summarizer.summarize(full_transcript)
                        st.write(f"‚úì Summary generated ({len(st.session_state.summary)} characters)")
                    except Exception as e:
                        st.error(f"Summarization failed: {str(e)}")
                        raise

                    st.session_state.processing_done = True
                    status.update(label="‚úÖ Processing Complete!", state="complete", expanded=False)
                    st.success("Successfully processed meeting!")

            except Exception as e:
                st.error(f"‚ùå Processing Error: {str(e)}")
                import traceback
                st.write("**Traceback:**")
                st.code(traceback.format_exc())
            finally:
                if os.path.exists(audio_path):
                    os.remove(audio_path)
    else:
        st.info("üì§ Upload an audio file (WAV, MP3, or M4A) to get started!")

with tab2:
    if st.session_state.processing_done:
        col1, col2 = st.columns([2, 1])

        with col1:
            st.subheader("Transcript")
            for seg in st.session_state.segments:
                st.markdown(f"""
                <div class="speaker-card">
                    <strong>{seg['speaker']}</strong> <span style="color:gray; font-size:0.8em">({seg['start']}s - {seg['end']}s)</span><br>
                    {seg['text']}
                </div>
                """, unsafe_allow_html=True)

        with col2:
            st.subheader("AI Summary")
            st.info(st.session_state.summary)

            st.subheader("Metadata")
            st.write(f"**Speakers Detected:** {len(set(s['speaker'] for s in st.session_state.segments))}")
            st.write(f"**Word Count:** {len(' '.join([s['text'] for s in st.session_state.segments]).split())}")
    else:
        st.warning("Please upload and process an audio file in 'Upload & Process' tab.")

with tab3:
    st.header("Model Benchmarks")
    report = get_benchmark_report()
    df_report = pd.DataFrame(report).T
    st.table(df_report)

    st.subheader("WER Comparison (Comparison Table)")
    chart_data = pd.DataFrame({
        'Model': ['Whisper', 'Vosk'],
        'WER': [report['Whisper']['WER'], report['Vosk']['WER']]
    })
    st.bar_chart(chart_data.set_index('Model'))

    st.markdown("""
    > **WER (Word Error Rate)**: Lower is better. Whisper generally provides higher accuracy but requires more compute.
    """)

with tab4:
    if st.session_state.processing_done:
        st.header("Export Meeting Notes")

        json_str = export_as_json({"segments": st.session_state.segments, "summary": st.session_state.summary})
        md_str = export_as_markdown(st.session_state.segments, st.session_state.summary)
        csv_str = export_as_csv(st.session_state.segments)

        col1, col2, col3 = st.columns(3)
        with col1:
            st.download_button("Download JSON", data=json_str, file_name="meeting_data.json", mime="application/json")
        with col2:
            st.download_button("Download Markdown", data=md_str, file_name="meeting_summary.md", mime="text/markdown")
        with col3:
            st.download_button("Download CSV", data=csv_str, file_name="transcript.csv", mime="text/csv")

        st.markdown("---")
        st.subheader("Preview Markdown")
        st.markdown(md_str)
    else:
        st.warning("No data available for export. Please process an audio file first.")


Overwriting app.py


In [None]:
import subprocess
import threading
import time
from pyngrok import ngrok

# Define RESERVED_DOMAIN if needed
RESERVED_DOMAIN = None # Or your reserved domain if you have one

def run_streamlit():
    # Launch Streamlit on port 8501
    subprocess.Popen(
        [
            "streamlit", "run", "app.py",
            "--server.port", "8501",
            "--server.headless", "true"
        ]
    )

# Run Streamlit in background
thread = threading.Thread(target=run_streamlit, daemon=True)
thread.start()

# Give it a few seconds to start
time.sleep(5)

# Create private tunnel (uses your authtoken from Cell 2)
if RESERVED_DOMAIN:
    # Requires paid ngrok plan and configured domain
    tunnel = ngrok.connect(addr=8501, proto="http", domain=RESERVED_DOMAIN)
else:
    tunnel = ngrok.connect(addr=8501, proto="http")

print("Maruthi Rao URL:", tunnel.public_url)

Maruthi Rao URL: https://subwealthy-presentient-fernando.ngrok-free.dev
