In [26]:
from pathlib import Path
import sys
import re
import sqlite3

dbFile = "podcasts.db"

from gliner import GLiNER

model = GLiNER.from_pretrained("urchade/gliner_medium-v2.1")
from ollama import Client
import tiktoken
from chonkie import SDPMChunker

parent_dir = str(Path().resolve().parents[0])
sys.path.insert(0, parent_dir)

from app.utils import ShowMetadataList, ShowMetadata, metadata_to_dict, sanitize

data = ShowMetadataList(
    shows=[
        ShowMetadata(
            "rotl",
            Path("../files/meta/rotl_dates.txt"),
            Path("../files/meta/rotl_titles.txt"),
        ),
        ShowMetadata(
            "roadwork",
            Path("../files/meta/roadwork_dates.txt"),
            Path("../files/meta/roadwork_titles.txt"),
        ),
    ]
)

dates_titles = metadata_to_dict(data)


def get_meta(file):
    file_name = file.name
    episode_number = file_name.split("_-_")[0]
    episode_date = dates_titles["rotl"]["dates"][episode_number]
    episode_title = dates_titles["rotl"]["titles"][episode_number]
    return (file_name, episode_number, episode_date, episode_title)


def get_entities(transcript):
    names = set()
    for line in transcript.splitlines():
        speaker, text = line.split(": ")
        entities = model.predict_entities(text, ["Person"], threshold=0.5)
        for entity in entities:
            names.add(entity["text"])
    return sorted(list(names))


client = Client(host="https://mlkyway.anselbrandt.net/ollama")


def ask_llm(context, model="gemma2:27b"):
    response = client.chat(
        model=model,
        messages=[
            {
                "role": "user",
                "content": context,
            },
        ],
    )
    response_content = response["message"]["content"]
    return response_content


def clean(text):
    chunks = text.split("\n\n")
    response = chunks[1]
    return re.sub(r"\*", "", response)


chunker = SDPMChunker(
    embedding_model="minishlab/potion-base-8M",
    threshold=0.5,
    chunk_size=512,
    min_sentences=1,
    skip_window=1,
    delim="\n",
)


def num_tokens(string: str) -> int:
    encoding = tiktoken.get_encoding("cl100k_base")
    num_tokens = len(encoding.encode(string))
    return num_tokens


def clean_name_response(text):
    if "\n" in text:
        lines = re.sub(r"\n+", "\n", text).splitlines()
        clean_lines = [line for line in lines if "Let me know if" not in line]
        return "\n".join(clean_lines)
    else:
        return text


def clean_transcript(file):
    lines = open(file).read().split("\n\n")
    return "\n".join([line for line in lines if len(line.split(": ")) == 2])


def get_names(transcript):
    named_entities = get_entities(transcript)
    entities_query = "Which of these are people's names? Only return results if they are people's names."
    entities_context = f"{entities_query}\n\n{named_entities}"
    entities_response = ask_llm(entities_context)
    names = clean(entities_response)
    return [name.strip() for name in names.splitlines()]


def create_chunks_db():
    conn = sqlite3.connect(dbFile)
    cursor = conn.cursor()
    cursor.execute(
        """CREATE TABLE IF NOT EXISTS chunks (
        id integer primary key,
        filename text,
        showname text,
        episode text,
        title text,
        date text,
        idx integer,
        chunk text
        )"""
    )
    conn.commit()
    conn.close()


def create_names_db():
    conn = sqlite3.connect(dbFile)
    cursor = conn.cursor()
    cursor.execute(
        """CREATE TABLE IF NOT EXISTS names (
        id integer primary key,
        filename text,
        showname text,
        episode text,
        title text,
        date text,
        name integer,
        text text
        )"""
    )
    conn.commit()
    conn.close()


def insert_chunks(
    chunks, file_name, show_name, episode_number, episode_title, episode_date
):
    conn = sqlite3.connect(dbFile)
    c = conn.cursor()
    filename = file_name
    for idx, chunk in enumerate(chunks):
        c.execute(
            "INSERT INTO chunks VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)",
            (
                filename,
                show_name,
                episode_number,
                episode_title,
                episode_date,
                idx,
                chunk,
            ),
        )
    conn.commit()
    conn.close()


def insert_name(
    name, text, file_name, show_name, episode_number, episode_title, episode_date
):
    conn = sqlite3.connect(dbFile)
    c = conn.cursor()
    filename = file_name
    c.execute(
        "INSERT INTO names VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)",
        (
            filename,
            show_name,
            episode_number,
            episode_title,
            episode_date,
            name,
            text,
        ),
    )
    conn.commit()
    conn.close()

Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]



In [27]:
dir = Path("../files/bulk/rotl")
files = [file for file in dir.iterdir() if ".txt" in file.name]

names_dir = Path() / "names"
names_dir.mkdir(exist_ok=True)

results = {}
for file in sorted(files):
    file_name, episode_number, episode_date, episode_title = get_meta(file)
    transcript = clean_transcript(file)
    chunks = [chunk.text for chunk in chunker(transcript)]
    for chunk in chunks:
        for name in ["Ariella", "Marlo", "Eleanor", "Madeline"]:
            if name in chunk:
                if name in results:
                    results[name].append(chunk)
                else:
                    results[name] = [chunk]

for name, chunks in results.items():
    out_path = names_dir / f"{sanitize(name)}.txt"
    with open(out_path, "w") as f:
        f.write(f"{"\n".join(chunks)}\n\nWho is {name}?")

In [31]:
names_files = [file for file in names_dir.iterdir()]
results_dir = Path() / "results"
results_dir.mkdir(exist_ok=True)

for file in names_files:
    context = open(file, "r").read()
    results = ask_llm(context)
    out_path = results_dir / file.name
    with open(out_path, "w") as f:
        f.write(results)

In [38]:
from gliner import GLiNER

model = GLiNER.from_pretrained("knowledgator/gliner-multitask-large-v0.5")

Fetching 10 files:   0%|          | 0/10 [00:00<?, ?it/s]

In [None]:
files = [file for file in Path("../files/bulk/rotl").iterdir() if ".txt" in file.name]

transcripts = []

for file in files:
    transcript = open(file, "r").read().split("\n\n")
    text = "\n".join(transcript)
    transcripts.append(text)

with open("Roderick_on_the_Line.txt", "w") as f:
    f.write("\n".join(transcripts))