In [1]:
import os
import uuid
import json
import math
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple

from dotenv import load_dotenv
load_dotenv()

True

In [2]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant import QdrantVectorStore
from langchain_core.documents import Document

In [3]:
from langgraph.graph import StateGraph, END
from typing import TypedDict

In [4]:
import librosa
import soundfile as sf
from pydub import AudioSegment
from transformers import pipeline as hf_pipeline



In [5]:
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams

from neo4j import GraphDatabase

In [6]:
from sqlalchemy import (
    create_engine, Table, Column, Text, Integer, TIMESTAMP,
    JSON, MetaData, ARRAY
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.engine import Engine
from datetime import datetime, timezone


In [7]:
from pydantic import BaseModel, Field

In [8]:
@dataclass
class Config:
    # Models
    stt_model: str = "openai/whisper-small"
    embed_model: str = "Qwen/Qwen3-Embedding-0.6B"
    clean_llm_model: str = "gpt-4o-mini"
    extract_llm_model: str = "gpt-4o-mini"

    # Audio
    sample_rate: int = 16000
    window_seconds: int = 30
    loudness_target_lufs: float = -23.0  # placeholder (demo)
    do_vad: bool = True  # placeholder (demo)

    # Paths
    audio_folder: str = "./raw_data/audio"

    # Vector DB (Qdrant)
    qdrant_url: str = os.getenv("QDRANT_URL", "http://localhost:6333")
    qdrant_api_key: str = os.getenv("QDRANT_API_KEY", None)
    qdrant_collection: str = "rag_main"
    qdrant_vector_size: int = 1024
    qdrant_distance: str = "Cosine"

    # Graph DB (Neo4j)
    neo4j_url: str = os.getenv("NEO4J_URL", "bolt://localhost:7687")
    neo4j_user: str = os.getenv("NEO4J_USER", "neo4j")
    neo4j_password: str = os.getenv("NEO4J_PASSWORD", "password")

    # SQL (Postgres)
    pg_url: str = os.getenv("PG_URL", "postgresql+psycopg2://postgres:postgres@localhost:5432/ragdb")

    # OpenAI
    openai_api_key: str = os.getenv("OPENAI_API_KEY", "")

CFG = Config()

In [9]:
def init_sql_engine() -> Engine:
    engine = create_engine(CFG.pg_url, future=True)
    return engine

def init_sql_schema(engine: Engine):
    metadata = MetaData()

    documents = Table(
        "documents", metadata,
        Column("doc_id", Text, primary_key=True),
        Column("title", Text),
        Column("language", Text),
        Column("source", Text),
        Column("file", Text),
        Column("author", Text),
        Column("created_at", TIMESTAMP(timezone=True)),
        Column("knowledge_tags", ARRAY(Text)),
        Column("role_restriction", ARRAY(Text)),
        Column("lineage", JSONB),
    )

    chunks = Table(
        "chunks", metadata,
        Column("chunk_id", Text, primary_key=True),
        Column("doc_id", Text),
        Column("segments", JSONB),
        Column("token_estimate", Integer),
        Column("created_at", TIMESTAMP(timezone=True)),
        Column("text", Text),
    )

    vdb_refs = Table(
        "vdb_refs", metadata,
        Column("chunk_id", Text, primary_key=True),
        Column("collection", Text),
        Column("vector_dim", Integer),
        Column("inserted_at", TIMESTAMP(timezone=True)),
    )

    gdb_triples = Table(
        "gdb_triples", metadata,
        Column("triple_id", Text, primary_key=True),
        Column("s", Text), Column("p", Text), Column("o", Text),
        Column("doc_id", Text), Column("chunk_id", Text),
        Column("confidence", Integer),
        Column("created_at", TIMESTAMP(timezone=True)),
    )

    metadata.create_all(engine)
    return {"documents": documents, "chunks": chunks, "vdb_refs": vdb_refs, "gdb_triples": gdb_triples}

In [10]:
def init_qdrant() -> QdrantClient:
    client = QdrantClient(url=CFG.qdrant_url, api_key=CFG.qdrant_api_key)
    # Ensure collection exists
    exists = False
    try:
        info = client.get_collection(CFG.qdrant_collection)
        exists = info is not None
    except Exception:
        exists = False

    if not exists:
        client.recreate_collection(
            collection_name=CFG.qdrant_collection,
            vectors_config=VectorParams(
                size=CFG.qdrant_vector_size,
                distance=Distance.COSINE
            )
        )
    return client

In [11]:
def init_neo4j_driver():
    driver = GraphDatabase.driver(CFG.neo4j_url, auth=(CFG.neo4j_user, CFG.neo4j_password))
    return driver

In [12]:
def get_clean_llm():
    return ChatOpenAI(model=CFG.clean_llm_model, temperature=0, api_key=CFG.openai_api_key)

def get_extract_llm():
    return ChatOpenAI(model=CFG.extract_llm_model, temperature=0, api_key=CFG.openai_api_key)

def get_embeddings():
    # Qwen3-Embedding-0.6B via HF
    return HuggingFaceEmbeddings(model_name=CFG.embed_model)

In [13]:
def resample_to_16k_mono(in_path: str, out_path: str):
    y, sr = librosa.load(in_path, sr=None, mono=True)
    y16 = librosa.resample(y, orig_sr=sr, target_sr=CFG.sample_rate)
    sf.write(out_path, y16, CFG.sample_rate)

def split_audio_30s(path_16k: str) -> List[str]:
    audio = AudioSegment.from_file(path_16k)
    # Ensure mono 16k
    audio = audio.set_channels(1).set_frame_rate(CFG.sample_rate)
    chunk_len_ms = CFG.window_seconds * 1000
    chunks = []
    for i in range(0, len(audio), chunk_len_ms):
        part = audio[i:i+chunk_len_ms]
        out = f"{path_16k}.{i//chunk_len_ms:02d}.wav"
        part.export(out, format="wav", parameters=["-ac", "1", "-ar", str(CFG.sample_rate)])
        chunks.append(out)
    return chunks

def stt_whisper_batch(chunk_paths: List[str], language: str|None=None) -> List[Dict[str, Any]]:
    # HuggingFace pipeline for ASR
    asr = hf_pipeline(
        "automatic-speech-recognition",
        model=CFG.stt_model,
        chunk_length_s=None,  # we already split at 30s
        return_timestamps=True
    )
    results = []
    for p in chunk_paths:
        if language:
            out = asr(p, generate_kwargs={"language": language})
        else:
            out = asr(p)
        results.append({"file": p, "text": out["text"], "chunks": out.get("chunks")})
    return results

def merge_segments_text(stt_results: List[Dict[str, Any]]) -> str:
    return "\n".join([r["text"].strip() for r in stt_results if r.get("text")])


In [14]:

CLEAN_PROMPT = ChatPromptTemplate.from_messages([
    ("system",
     "You are a professional text cleaner. "
     "Given raw ASR transcript, remove fillers, fix casing/punctuation, "
     "preserve meaning, do not hallucinate. Keep technical terms."),
    ("user", "Raw transcript:\n\n{raw}\n\nReturn the cleaned transcript.")
])

clean_chain = CLEAN_PROMPT | get_clean_llm()

In [15]:
def to_chunks(text: str, doc_id: str, file_name: str, language: str) -> List[Document]:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=1200,
        chunk_overlap=180,
        separators=["\n\n", "\n", ". ", " ", ""],
    )
    pieces = splitter.split_text(text)
    docs = []
    now = datetime.now(timezone.utc)
    for idx, t in enumerate(pieces, start=1):
        docs.append(Document(
            page_content=t,
            metadata={
                "chunk_id": f"ch_{doc_id}_{idx:02d}",
                "doc_id": doc_id,
                "file": file_name,
                "source": "audio_ingestion",
                "language": language,
                "role_restriction": ["public_read"],
                "created_at": now.isoformat(),
            }
        ))
    return docs


In [16]:
def upsert_qdrant(client: QdrantClient, docs: List[Document], embeddings: HuggingFaceEmbeddings):
    vs = QdrantVectorStore(
        client=client,
        collection_name=CFG.qdrant_collection,
        embeddings=embeddings,
    )
    vs.add_documents(docs)
    return vs


In [17]:
class Entity(BaseModel):
    name: str
    aliases: List[str] = Field(default_factory=list)

class Triple(BaseModel):
    s: str
    p: str
    o: str
    confidence: float

class ExtractionResult(BaseModel):
    entities: List[Entity]
    triples: List[Triple]

EXTRACT_PROMPT = ChatPromptTemplate.from_messages([
    ("system",
     "You are an information extraction system for RAG indexing. "
     "Extract key entities (canonical names + aliases) and relation triples "
     "(subject, predicate, object) from the text. "
     "Use domain-agnostic simple predicates (uses, queries, requires, includes, mitigates, etc.). "
     "Return only facts present in the text. Confidence 0.0-1.0."),
    ("user", "Text:\n\n{chunk}\n\nReturn structured JSON.")
])

# LCEL structured output
extract_chain = EXTRACT_PROMPT | get_extract_llm().with_structured_output(ExtractionResult)



In [18]:
NEO4J_MERGE = """
MERGE (s:Entity {name: $s})
MERGE (o:Entity {name: $o})
MERGE (s)-[r:REL {predicate: $p}]->(o)
ON CREATE SET r.provenance = [{doc_id: $doc_id, chunk_id: $chunk_id}],
              r.confidence = $confidence,
              r.created_at = datetime()
ON MATCH SET  r.provenance = coalesce(r.provenance, []) + {doc_id: $doc_id, chunk_id: $chunk_id},
              r.confidence = CASE WHEN r.confidence < $confidence THEN $confidence ELSE r.confidence END
"""

def upsert_neo4j(driver, triples: List[Triple], doc_id: str, chunk_id: str):
    with driver.session() as s:
        for t in triples:
            if t.confidence < 0.8:
                continue
            s.run(NEO4J_MERGE, s=t.s, p=t.p, o=t.o,
                  doc_id=doc_id, chunk_id=chunk_id, confidence=t.confidence)

In [19]:
def insert_document(engine: Engine, documents_tbl, doc: Dict[str, Any]):
    with engine.begin() as conn:
        conn.execute(documents_tbl.insert().values(**doc))

def insert_chunk(engine: Engine, chunks_tbl, row: Dict[str, Any]):
    with engine.begin() as conn:
        conn.execute(chunks_tbl.insert().values(**row))

def insert_vdb_ref(engine: Engine, vdb_tbl, chunk_id: str, dim: int):
    with engine.begin() as conn:
        conn.execute(vdb_tbl.insert().values(
            chunk_id=chunk_id, collection=CFG.qdrant_collection, vector_dim=dim, inserted_at=datetime.now(timezone.utc)
        ))

def insert_triple(engine: Engine, gdb_tbl, tri: Triple, doc_id: str, chunk_id: str):
    with engine.begin() as conn:
        conn.execute(gdb_tbl.insert().values(
            triple_id=str(uuid.uuid4()),
            s=tri.s, p=tri.p, o=tri.o,
            doc_id=doc_id, chunk_id=chunk_id,
            confidence=int(tri.confidence * 100),
            created_at=datetime.now(timezone.utc)
        ))

In [20]:
class PipeState(TypedDict):
    doc_id: str
    title: str
    language: str
    file_path: str
    file_name: str
    transcript_raw_segments: List[Dict[str, Any]]
    transcript_full: str
    transcript_clean: str
    chunks: List[Document]
    extraction: Dict[str, Any]

def node_preprocess_audio(state: PipeState) -> PipeState:
    # resample to 16k mono
    out16 = f"{state['file_path']}.16k.wav"
    resample_to_16k_mono(state["file_path"], out16)
    state["file_path"] = out16
    return state

def node_stt(state: PipeState) -> PipeState:
    parts = split_audio_30s(state["file_path"])
    stt_results = stt_whisper_batch(parts, language=state["language"] if state["language"] else None)
    state["transcript_raw_segments"] = stt_results
    state["transcript_full"] = merge_segments_text(stt_results)
    return state

def node_clean(state: PipeState) -> PipeState:
    cleaned = clean_chain.invoke({"raw": state["transcript_full"]})
    # cleaned bisa berupa str (ChatModel) → pastikan type
    state["transcript_clean"] = cleaned if isinstance(cleaned, str) else str(cleaned)
    return state

def node_chunk(state: PipeState) -> PipeState:
    docs = to_chunks(
        text=state["transcript_clean"],
        doc_id=state["doc_id"],
        file_name=state["file_name"],
        language=state["language"] or "auto",
    )
    state["chunks"] = docs
    return state

def node_persist_sql_docs_chunks(state: PipeState) -> PipeState:
    engine = init_sql_engine()
    tables = init_sql_schema(engine)
    # documents
    doc_row = {
        "doc_id": state["doc_id"],
        "title": state["title"],
        "language": state["language"] or "auto",
        "source": "audio_ingestion",
        "file": state["file_name"],
        "author": "narrator",
        "created_at": datetime.now(timezone.utc),
        "knowledge_tags": ["RAG","LLM","retrieval","vector_database","best_practices"],
        "role_restriction": ["public_read"],
        "lineage": {
            "stt_model": CFG.stt_model,
            "preclean_llm": CFG.clean_llm_model,
            "embed_model": CFG.embed_model,
            "audio_window": f"{CFG.window_seconds}s"
        }
    }
    insert_document(engine, tables["documents"], doc_row)

    # chunks
    for d in state["chunks"]:
        row = {
            "chunk_id": d.metadata["chunk_id"],
            "doc_id": d.metadata["doc_id"],
            "segments": ["auto_topic"],  # placeholder; bisa diisi jika ada partitioning tematik
            "token_estimate": len(d.page_content.split()),
            "created_at": datetime.now(timezone.utc),
            "text": d.page_content
        }
        insert_chunk(engine, tables["chunks"], row)

    return state

def node_vector_index(state: PipeState) -> PipeState:
    qclient = init_qdrant()
    embs = get_embeddings()
    upsert_qdrant(qclient, state["chunks"], embs)

    # persist vdb refs in SQL
    engine = init_sql_engine()
    tables = init_sql_schema(engine)
    for d in state["chunks"]:
        insert_vdb_ref(engine, tables["vdb_refs"], d.metadata["chunk_id"], CFG.qdrant_vector_size)
    return state

def node_extract_graph(state: PipeState) -> PipeState:
    # run extraction per chunk (you could batch; here sequential for clarity)
    results: List[Tuple[Document, ExtractionResult]] = []
    for d in state["chunks"]:
        res = extract_chain.invoke({"chunk": d.page_content})
        results.append((d, res))

    # persist to Neo4j & SQL
    driver = init_neo4j_driver()
    engine = init_sql_engine()
    tables = init_sql_schema(engine)

    for d, res in results:
        # Neo4j upsert
        upsert_neo4j(driver, res.triples, doc_id=d.metadata["doc_id"], chunk_id=d.metadata["chunk_id"])
        # SQL triples (audit/provenance)
        for tri in res.triples:
            if tri.confidence >= 0.8:
                insert_triple(engine, tables["gdb_triples"], tri, d.metadata["doc_id"], d.metadata["chunk_id"])

    # store in state (optional)
    state["extraction"] = {
        "total_chunks": len(results),
        "total_triples": sum(len(r.triples) for _, r in results)
    }
    return state

In [21]:
def build_graph():
    g = StateGraph(PipeState)
    g.add_node("preprocess_audio", node_preprocess_audio)
    g.add_node("stt", node_stt)
    g.add_node("clean", node_clean)
    g.add_node("chunk", node_chunk)
    g.add_node("persist_sql_docs_chunks", node_persist_sql_docs_chunks)
    g.add_node("vector_index", node_vector_index)
    g.add_node("extract_graph", node_extract_graph)

    g.set_entry_point("preprocess_audio")
    g.add_edge("preprocess_audio", "stt")
    g.add_edge("stt", "clean")
    g.add_edge("clean", "chunk")
    g.add_edge("chunk", "persist_sql_docs_chunks")
    g.add_edge("persist_sql_docs_chunks", "vector_index")
    g.add_edge("vector_index", "extract_graph")
    g.add_edge("extract_graph", END)
    return g.compile()


In [22]:
def new_doc_state(file_path: str, language: str|None):
    return PipeState(
        doc_id=f"doc_{uuid.uuid4().hex[:8]}",
        title=os.path.splitext(os.path.basename(file_path))[0],
        language=language or "auto",
        file_path=file_path,
        file_name=os.path.basename(file_path),
        transcript_raw_segments=[],
        transcript_full="",
        transcript_clean="",
        chunks=[],
        extraction={}
    )

def discover_audio_files(folder: str) -> List[Tuple[str, str]]:
    # Return list[(path, language_hint)]
    # Example assumption: file names contain _en/_id
    outs = []
    for name in os.listdir(folder):
        if not name.lower().endswith((".wav", ".mp3", ".m4a")):
            continue
        lang = "en" if "_en" in name.lower() else ("id" if "_id" in name.lower() else None)
        outs.append((os.path.join(folder, name), lang))
    return outs

def main():
    # Prepare DBs / clients once to fail fast if misconfig
    _ = init_qdrant()
    _ = init_neo4j_driver()
    _engine = init_sql_engine()
    _ = init_sql_schema(_engine)

    graph = build_graph()

    files = discover_audio_files(CFG.audio_folder)
    if not files:
        print("No audio files found in ./ingest/audio. Put your files there (e.g., *_en.wav, *_id.wav).")
        return

    for fpath, lang in files:
        print(f"Processing: {fpath} (lang={lang or 'auto'})")
        state = new_doc_state(fpath, lang)
        final = graph.invoke(state)
        print("Done:", final["doc_id"], final.get("extraction"))


In [23]:
if __name__ == "__main__":
    main()

KeyboardInterrupt: 