In [8]:
import os
import asyncio
import threading
import time
import base64
from datetime import datetime, timedelta
from datetime import time
from typing import List, Dict, Any, Optional
import sqlite3
from contextlib import asynccontextmanager
import json
from fastapi.responses import FileResponse


from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, File, UploadFile, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from dotenv import load_dotenv
import uvicorn
#from playsound import playsound
import io

# LangGraph and LangChain imports
from langgraph.graph import StateGraph, END, START
from langchain_core.messages import HumanMessage
from langchain_groq import ChatGroq
from fastapi.staticfiles import StaticFiles 
from typing import TypedDict,Annotated,Literal
from langchain.prompts import PromptTemplate

import nest_asyncio
nest_asyncio.apply()

# Load environment variables
load_dotenv()

True

We are going to Define States

In [9]:
class Location(BaseModel):
    latitude:  float = Field(..., ge=-90.0, le=90.0, description="Degrees north or south of the equator")
    longitude: float = Field(..., ge=-180.0, le=180.0, description="Degrees east or west of prime meridian")

    # Optional: custom string pattern instead of float
    @validator("latitude", "longitude", pre=True)
    def parse_str_to_float(cls, v):
        # Allow passing "12.345" or 12.345
        return float(v)


C:\Users\asus\AppData\Local\Temp\ipykernel_12104\1250525760.py:6: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator("latitude", "longitude", pre=True)


In [10]:
class Summarization(BaseModel):
    summary: str
    location: Location
    time: datetime
    distress_words: List[str]
    threat_level: float


In [11]:
class WorkflowState(BaseModel):
    location:    Location
    real_time:   str
    TAudio:      str | None
    threat_score:float
    consent:     Literal["Yes", "No"]
    gesture:     Literal["Yes", "No"]
    sudden_acc:  Literal["Yes", "No"]
    message:str
    analysis: Optional[Summarization] = None


Just an Example

Defining_A_Node for threat analysis

In [12]:
class Workflow:
    def __init__(self):
        self.llm = ChatGroq(
            model="openai/gpt-oss-120b",  # Using available model
            temperature=0.1,
        )

        self.template1 = """
You are an AI Assistant who is specially designed to analyse the transcripted audio file which is received from the phone of a woman who might get harassed at {real_time}. You have to generate a threat level between 5 to 10 based on the following transcript. If this prompt is running which exclusively means that there is night time and user input already a dangerous place:

Audio Transcript: {TAudio}

Remember:
Provide high rating if you found some distress sentences like "don't touch me", "I will complain about you", "Please don't do this to me"... and many other distress sentences.

Return your response in the exact JSON format:
{{
    "summary": "Provide a proper summary for the event",
    "threat_level":5.0-10.0,
    "distress_words": ["list", "of", "distress", "words"]
}}
"""

        self.template2 = """
You are an AI Assistant who is specially designed to analyse the transcripted audio file which is received from the phone of a woman who is not around the peak night time. You have to generate a threat level between 4 to 10 based on the following transcript and since some relaxation would be there as this is not night time. Thus, threat level could be low but provide high level if you hear serious distress words that you think might lead something dangerous to the woman:

Audio Transcript: {TAudio}

Remember:
Provide high rating if you found some distress sentences like "don't touch me", "I will complain about you", "Please don't do this to me"... and many other distress sentences.

Return your response in the exact JSON format:
{{
    "summary": "Provide a proper summary for the event",
    "threat_level": 4.0-8.0,
    "distress_words": ["list", "of", "distress", "words"]
}}
"""

    async def Analysis_Node(self, state: WorkflowState):
        night_start = time(0, 0)    # 00:00 (12:00 AM)
        night_end = time(4, 0)
        timestamp = datetime.fromisoformat(state.real_time)      # 04:00 (4:00 AM)

        if (
            state.consent == 'No'
            and not (night_start <= timestamp.time() <= night_end)
            and state.gesture == 'No'
            and state.sudden_acc == 'No'
        ):
            return 'END'

        # Use the correct template based on night time
        if night_start <= timestamp.time() <= night_end:
            template = self.template1
        else:
            template = self.template2

        prompt = PromptTemplate(
            input_variables=['real_time', 'TAudio'],
            template=template
        )

        response = await self.llm.ainvoke(prompt.format(
            real_time=state.real_time,
            TAudio=state.TAudio
        ))

        try:
            result = json.loads(response.content)
            obj = Summarization(
            location=state.location,
             time=state.real_time,            # Pydantic will parse ISO string to datetime
            summary=result["summary"],
            threat_level=result["threat_level"],
            distress_words=result["distress_words"],
    )
            
            return {"analysis": obj} 
        except Exception as e:
            return Summarization(
                location=state.location,
                summary=f"Groq API error: {str(e)}",
                time=state.real_time,
                distress_words=[],
                threat_level=0.0
            )

    async def Flow(self):
        workflow = StateGraph(WorkflowState)
        # Adding the Nodes
        workflow.add_node("Analysis", self.Analysis_Node)
        workflow.set_entry_point("Analysis")
        workflow.add_edge("Analysis", END)
        return workflow.compile()

In [15]:
async def main():
    # Create workflow instance
    workflow_instance = Workflow()
    app = await workflow_instance.Flow()
    
    # Dummy distress conversation transcript
    distress_transcript = """
    Woman: "Please stop following me, I don't know you"
    Unknown person: "Come on, don't be like that"
    Woman: "Don't touch me! Leave me alone!"
    Unknown person: "Just talk to me for a minute"
    Woman: "Help! Someone help me! Don't do this to me!"
    Woman: "I will call the police if you don't stop"
    Unknown person: "Nobody can hear you here"
    Woman: "Please don't hurt me, I haven't done anything"
    """
    
    # Create input state with all required fields
    input_state = WorkflowState(
        location=Location(latitude=12.9716, longitude=77.5946),  # Bangalore coordinates
        real_time="2025-08-18T19:29:00",  # 2:30 AM (night time)
        TAudio=distress_transcript,
        threat_score=0,
        consent="Yes",
        gesture="Yes", 
        sudden_acc="Yes",
        message="Dangerous"
    )
    
    # Run the workflow
    result = await app.ainvoke(input_state)

    print(type(result))
    print(result['analysis'])
   

    
# Run the workflow
if __name__ == "__main__":
    await main()


<class 'dict'>
summary='A woman is being pursued and verbally harassed by an unknown individual. She repeatedly pleads for the person to stop, asks for help, threatens to call the police, and expresses fear of being touched or harmed.' location=Location(latitude=12.9716, longitude=77.5946) time=datetime.datetime(2025, 8, 18, 19, 29) distress_words=['stop following me', "don't touch me", 'help', 'someone help me', 'I will call the police', "please don't hurt me", "I haven't done anything"] threat_level=7.8
