# import

In [1]:
import pprint
import os
import sys

from dotenv import load_dotenv

from langchain_ollama import OllamaLLM, ChatOllama, OllamaEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

from langchain_community.vectorstores import FAISS

from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain.chains import create_history_aware_retriever

from langchain_core.prompts import MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage

import psycopg
from langchain_postgres.vectorstores import PGVector
from SPARQLWrapper import SPARQLWrapper, JSON, POST, N3
from urllib.parse import urljoin

import concurrent
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from threading import Lock

from rdflib import Graph, URIRef, Literal, Namespace
from rdflib.namespace import RDF, RDFS, OWL, FOAF, XSD, SKOS, DCTERMS
import re
from collections import Counter, defaultdict
import json
import datetime
import logging
import time

import urllib.parse
from typing import List, Tuple, Dict, Set, DefaultDict
import asyncio
from asyncio import Lock, Semaphore
import aiohttp
import logging
import aiofiles
import json
from asyncio import Lock
from more_itertools import chunked
import nest_asyncio
import random

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [24]:
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)  # Change to WARNING or ERROR in production

# loading env variables

In [25]:
load_dotenv()

True

# Langain & Ollama

## check ollama status

In [None]:
# !curl --location 'http://127.0.0.1:11434/api/generate' \
# --header 'Content-Type: application/json' \
# --data '{ \
#     "model": "llama3.2:3b", \
#     "prompt": "hello llama!",  \
#     "options": { \
#         "temperature": 0 \
#     } \
# }' \
# | python -m json.tool

In [None]:
# !curl http://localhost:11434/api/tags

## Ollama model: configuration

In [26]:
llm = OllamaLLM(model="llama3.2:3b", temperature=0)
# llm = OllamaLLM(model="deepseek-r1:8b", temperature=0)
llm

OllamaLLM(model='llama3.2:3b', temperature=0.0)

In [27]:
chat_ollama = ChatOllama(model="llama3.2:3b", temperature=0)
chat_ollama

ChatOllama(model='llama3.2:3b', temperature=0.0)

## testing llm: invoke


In [None]:
# llm.invoke(input="tell me a joke")
response = llm.invoke("hello ollama!")

# response = llm.invoke("Create an agent that uses Ollama function calling in Langchain.")

logger.info(response)

In [None]:
messages = [
    ("system", "You are a helpful translator. Translate the user sentence to French."),
    ("human", "I love programming."),
]
chat_ollama.invoke(messages)

## testing llm: chat prompt template

In [None]:
chat_prompt_template = ChatPromptTemplate.from_messages([
    ("system", "You are a world class technical documentation writer."),
    ("user", "{input}")
])

chain = chat_prompt_template | llm

response = chain.invoke({"input": "how can langsmith help with testing?"})

print(response)

## testing llm: chat prompt template & StrOutputParser

In [None]:
chat_prompt_template = ChatPromptTemplate.from_messages([
    ("system", "You are a world class technical documentation writer."),
    ("user", "{input}")
])

output_parser = StrOutputParser()

chain = chat_prompt_template | llm | output_parser

response = chain.invoke({"input": "how can langsmith help with testing?"})

print(response)

## create vector store & a retriever

In [None]:
# 1. select a specfic datasource. In this case a web page.
# 2. save extracted content from the web page as docs.
# 3. index the docs using FAISS vector store.
# 4. convert the vector store to retriever.

web_base_loader = WebBaseLoader("https://docs.smith.langchain.com/user_guide")

docs = web_base_loader.load()

# print(f"type(docs) : {type(docs)} \n")
# print(f"len(docs) : {len(docs)}\n")
# print(f"docs: {docs} \n")
# type(f"docs[0] : {docs[0]} \n")
# print(f"docs[0].page_content : {docs[0].page_content} \n")

recursive_character_text_splitter = RecursiveCharacterTextSplitter()
documents = recursive_character_text_splitter.split_documents(documents=docs)


# print(type(documents))
# print(len(documents))
# print(documents)
# print(documents[0])
# print(documents[2])

ollama_embedding = OllamaEmbeddings(model="llama3.2:3b")
vector_store = FAISS.from_documents(
    documents=documents, embedding=ollama_embedding)


# print(f"vector_store.index.ntotal: {vector_store.index.ntotal}")
# print(f"vector_store._get_retriever_tags() : {vector_store._get_retriever_tags()}")
# print(f"vector_store.index_to_docstore_id : {vector_store.index_to_docstore_id}")
# print(f"type(vector_store.index_to_docstore_id) : {type(vector_store.index_to_docstore_id)}")

vector_store_retriever = vector_store.as_retriever()
print(f"vector_store_retriever: {vector_store_retriever}")

## document chain

In [None]:
# 5. create a chat prompt template
# 6. create a stuff document chain that accepts a llm model and chat prompt template & we can also run stuff document chain by passing in documents directly

chat_prompt_template = ChatPromptTemplate.from_template(
    """Answer the following question based only on the provided context:

<context>
{context}
</context>

Question: {input}"""
)

documents_chain = create_stuff_documents_chain(
    llm=llm, prompt=chat_prompt_template)
response = documents_chain.invoke(
    {
        "input": "how can langsmith help with testing?",
        "context": documents
    }
)
print(response)

## retrieval chain

In [None]:
# 7. create a document retrieval chain that takes vector store retriever and stuff document chain

retrieval_chain = create_retrieval_chain(
    vector_store_retriever, documents_chain)
response = retrieval_chain.invoke(
    {"input": "how can langsmith help with testing?"})

# print(type(response))
pprint.pprint(response, indent=4)

## conversation retrieval chain

In [None]:
chat_prompt_template = ChatPromptTemplate.from_messages([
    MessagesPlaceholder(variable_name="chat_history"),
    ("user", "{input}"),
    ("user", "Given the above conversation, generate a search query to look up to get information relevant to the conversation")
])

history_aware_retriever_chain = create_history_aware_retriever(
    llm, vector_store_retriever, chat_prompt_template)

In [None]:
chat_prompt_template = ChatPromptTemplate.from_messages([
    ("system",
     "Answer the user's questions based on the below context:\n\n{context}"),
    MessagesPlaceholder(variable_name="chat_history"),
    ("user", "{input}")
])

document_chain = create_stuff_documents_chain(llm, chat_prompt_template)
retrieval_chain = create_retrieval_chain(
    history_aware_retriever_chain, document_chain)

chat_history = [HumanMessage(
    content="Can LangSmith help test my LLM applications?"), AIMessage(content="Yes!")]

response = retrieval_chain.invoke({
    "chat_history": chat_history,
    "input": "tell me how"
})

pprint.pprint(response)

# embeddings

### initialize embedding model

In [4]:
# ollama_embedding = OllamaEmbeddings(model="mxbai-embed-large:335m")
# ollama_embedding = OllamaEmbeddings(model="nomic-embed-text:latest")
ollama_embedding = OllamaEmbeddings(model="bge-m3:567m")

### connect to pgvector

In [5]:
# Format: postgresql+psycopg2://user:password@host:port/dbname
# Database Connection Details
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")

CONNECTION_STRING = f"postgresql+psycopg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
COLLECTION_NAME = "dbpedia_docs"

In [6]:
logger.info(f"\nConnecting to PGVector '{COLLECTION_NAME}'...")
try:
    # If the collection table doesn't exist, PGVector will try to create it.
    vectorstore = PGVector(
        connection=CONNECTION_STRING,
        embeddings=ollama_embedding,
        collection_name=COLLECTION_NAME,
        use_jsonb=True
        # pre_delete_collection=True
        # Use pre_delete_collection=True if you want to clear the collection on every run (USE WITH CAUTION!)
        # pre_delete_collection=False,
    )
    print(f"connection successfull!")
except psycopg.OperationalError as e:
    logger.exception(f"\nDatabase Connection Error: {e}")
    exit(1)
except Exception as e:
    logger.exception(f"\nAn error occurred during PGVector connection: {e}")
    exit(1)

INFO:__main__:
Connecting to PGVector 'dbpedia_docs'...


connection successfull!


### connect to ontotext graph db and fetch all the entities and the description

#### Constants

In [28]:
GRAPHDB_BASE_URL = os.getenv("GRAPHDB_BASE_URL")
GRAPHDB_REPOSITORY = os.getenv("GRAPHDB_REPOSITORY")

# Format: {base_url}/repositories/{repository_id}
SPARQL_ENDPOINT = urljoin(GRAPHDB_BASE_URL.strip('/') + '/', f"repositories/{GRAPHDB_REPOSITORY}")

OUTPUT_FILENAME_DIR = os.path.join("c:\\Users\\deepa\\data\\workspace\\notebooks", "datasets", "instance_description")
OUTPUT_FILENAME = os.path.join(OUTPUT_FILENAME_DIR, "instance_description.jsonl")

FAILED_LOG_DIR = os.path.join("c:\\Users\\deepa\\data\\workspace\\notebooks", "datasets", "failed")
FAILED_CLASS_LOG = os.path.join(FAILED_LOG_DIR, "failed_class_iri.txt")
FAILED_INSTANCE_LOG = os.path.join(FAILED_LOG_DIR, "failed_instance_iri.txt")
MAX_CONCURRENT_REQUESTS = 5
BATCH_SIZE = 100

In [29]:
instance_lock = Lock()
class_lock = Lock()
output_file_lock = Lock()
request_semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

In [30]:
instance_lock = Lock()
class_lock = Lock()
output_file_lock = Lock()
request_semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

#### helper function

In [2]:
# Allow nested asyncio event loops (useful in environments like Jupyter)
try:
    nest_asyncio.apply()
except ImportError:
    pass


In [31]:
async def log_failed_instance(instance_iri: str):
    async with instance_lock:
        async with aiofiles.open(FAILED_INSTANCE_LOG, 'a', encoding='utf-8') as f:
            await f.write(instance_iri + "\n")

In [32]:
async def log_failed_class(ontology_class: str):
    async with class_lock:
        async with aiofiles.open(FAILED_CLASS_LOG, 'a', encoding='utf-8') as f:
            await f.write(ontology_class + "\n")

In [33]:
def get_sparql(return_format=JSON):
    sparql = SPARQLWrapper(SPARQL_ENDPOINT)
    sparql.setReturnFormat(return_format)
    return sparql

#### All Classes

In [34]:
def fetch_classes() -> List[str]:
    logger.info("Fetching ontology classes from model graph")
    class_query = r"""
    PREFIX owl: <http://www.w3.org/2002/07/owl#>
    SELECT ?class
    FROM <http://dbpedia.org/model>
    WHERE { ?class a owl:Class .
      FILTER(regex(STRAFTER(STR(?class), "http://dbpedia.org/ontology/"), "^[\\x00-\\x7F]+$")) }
    ORDER BY ?class
    """
    try:
        sparql = get_sparql(return_format=JSON)
        sparql.setQuery(class_query)
        results = sparql.query().convert()
        return [b['class']['value'] for b in results['results']['bindings']]
    except Exception as e:
        logger.exception(f"[Error] Fetching classes: {e}")
        return []

#### All instances for a class

In [35]:
def fetch_instances_for_class(ontology_class: str) -> List[str]:
    logger.info(f"Fetching instances of class {ontology_class}")
    instance_query = f"""
    SELECT ?instance
    FROM <http://dbpedia.org/model>
    FROM <http://dbpedia.org/data>
    WHERE {{ BIND(<{ontology_class}> AS ?entity) ?instance a ?entity . }}
    ORDER BY ?instance
    """
    try:
        sparql = get_sparql(return_format=JSON)
        sparql.setQuery(instance_query)
        results = sparql.query().convert()
        return [b['instance']['value'] for b in results['results']['bindings']]
    except Exception as e:
        logger.exception(f"[Error] Fetching instances for {ontology_class}: {e}")
        asyncio.create_task(log_failed_class(ontology_class))
        return []

#### Describe Instance

In [36]:
def describe_instance(instance_iri: str, retries: int = 3, delay: float = 1.0) -> str:
    logger.info(f"Describing instance {instance_iri}")
    query = f"DESCRIBE <{instance_iri}>"
    for attempt in range(1, retries + 1):
        try:
            sparql = get_sparql(return_format=N3)
            sparql.setQuery(query)
            res = sparql.query().convert()
            return res.decode('utf-8') if isinstance(res, bytes) else str(res)
        except Exception as e:
            logger.warning(f"[Retry {attempt}/{retries}] Error describing {instance_iri}: {e}")
            if attempt < retries:
                time.sleep(delay * (2 ** (attempt - 1)) + random.uniform(0, 0.5))
            else:
                logger.exception(f"[Error] Failed after {retries} retries: {instance_iri}")
    return None

In [None]:
# instance_iri = "http://dbpedia.org/resource/Roger_Federer"
# response = describe_instance(instance_iri)
# print(response)

#### tranform describe output to Key-Value and Tuple format

In [37]:
# URI label extraction utility
def get_label_from_uri(uri_str: str) -> str:
    if not (isinstance(uri_str, str) and uri_str.startswith('<') and uri_str.endswith('>')):
        return str(uri_str)
    uri = uri_str.strip('<>')
    try:
        parsed = urllib.parse.urlparse(uri)
        part = parsed.fragment or uri.split('/')[-1]
        decoded = urllib.parse.unquote(part)
        label = re.sub(r'(?<!^)(?=[A-Z])', ' ', decoded.replace('_',' ').replace('-',' '))
        return re.sub(r'\s+', ' ', label).strip() or part
    except:
        return uri

# Clean RDF term utility
def clean_value(rdf_term: str) -> str:
    t = rdf_term.strip()
    if t.startswith('<') and t.endswith('>'):
        return get_label_from_uri(t)
    if t.startswith('"'):
        m = re.match(r'"(.*?)"', t)
        return m.group(1) if m else t
    return t

# Simplify N3 data into summary
def process_n3_simplified(n3_data: str) -> str:
    triples, subjects = [], []
    pat = re.compile(r'^\s*(<[^>]+>|_:\\S+)\s+(<[^>]+>)\s+(.*)\s*\.\s*$')
    for ln in n3_data.splitlines():
        ln = ln.strip()
        if not ln or ln.startswith('#'): continue
        m = pat.match(ln)
        if m:
            s,p,o = m.groups(); triples.append((s,p,o))
            if s.startswith('<'): subjects.append(s)
        else:
            logger.warning(f"Skipping malformed triple: {ln}")
    if not triples: return "No valid triples found."
    if not subjects: return "No URI subjects found."
    main = Counter(subjects).most_common(1)[0][0]
    subj_iri = main.strip('<>')
    main_lbl = get_label_from_uri(main)
    props, inc = defaultdict(set), set()
    for s,p,o in triples:
        lbl = get_label_from_uri(p).lower()
        if s==main: props[lbl].add(clean_value(o.strip()))
        elif o.strip()==main: inc.add((clean_value(s), lbl, main_lbl))
    out = [f"IRI: {subj_iri}", f"label: {next(iter(props.get('label', [])), main_lbl)}"]
    if props:
        out.append("\nOutgoing Relationships:")
        for k in sorted(props): out.append(f"{k}: {', '.join(sorted(props[k]))}")
    if inc:
        out.append("\nIncoming Relationships:")
        for s,p,o in sorted(inc): out.append(f"({s}, {p}, {o})")
    return '\n'.join(out)

In [None]:
# output = process_n3_simplified(response)
# print(output)

#### worker

In [38]:
async def process_instance_worker(instance_iri: str):
    try:
        async with request_semaphore:
            loop = asyncio.get_running_loop()
            data = await loop.run_in_executor(None, describe_instance, instance_iri)
        if data:
            desc = process_n3_simplified(data)
            rec = {"iri": instance_iri, "description": desc}
            async with output_file_lock:
                async with aiofiles.open(OUTPUT_FILENAME, 'a', encoding='utf-8') as f:
                    await f.write(json.dumps(rec) + '\n')
            logger.debug(f"Saved: {instance_iri}")
    except Exception as e:
        logger.exception(f"[Error] Worker {instance_iri}: {e}")
        asyncio.create_task(log_failed_instance(instance_iri))

#### main method

In [39]:
async def main():
    os.makedirs(OUTPUT_FILENAME_DIR, exist_ok=True)
    os.makedirs(FAILED_LOG_DIR, exist_ok=True)
    classes = fetch_classes()
    total = 0
    for idx, cls in enumerate(classes, 1):
        instances = fetch_instances_for_class(cls)
        if not instances: continue
        for chunk in chunked(instances, BATCH_SIZE):
            await asyncio.gather(*(process_instance_worker(i) for i in chunk))
        total += len(instances)
        logger.info(f"Processed class {idx}/{len(classes)}: {len(instances)} instances")
    logger.info(f"Total instances processed: {total}")
    # Log summaries
    try:
        async with aiofiles.open(FAILED_INSTANCE_LOG, 'r') as f:
            fail_i = sum(1 async for _ in f)
        logger.info(f"Failed instances: {fail_i}")
    except: pass
    try:
        async with aiofiles.open(FAILED_CLASS_LOG, 'r') as f:
            fail_c = sum(1 async for _ in f)
        logger.info(f"Failed classes: {fail_c}")
    except: pass

#### main method call

In [None]:
try:
    asyncio.run(main())
except RuntimeError as e:
    if 'asyncio.run()' in str(e): asyncio.get_event_loop().run_until_complete(main())
    else: raise

INFO:__main__:Fetching ontology classes from model graph
INFO:__main__:Fetching instances of class http://dbpedia.org/ontology/AcademicConference
INFO:__main__:Describing instance http://dbpedia.org/resource/AAAI/ACM_Conference_on_AI,_Ethics,_and_Society
INFO:__main__:Describing instance http://dbpedia.org/resource/AAAI_Conference_on_Artificial_Intelligence
INFO:__main__:Describing instance http://dbpedia.org/resource/ACM/IEEE_Virtual_Reality_International_Conference
INFO:__main__:Describing instance http://dbpedia.org/resource/ACM_Conference_on_Hypertext_and_Social_Media
INFO:__main__:Describing instance http://dbpedia.org/resource/ACM_Multimedia
INFO:__main__:Describing instance http://dbpedia.org/resource/Al_Hidayah_(organisation)
INFO:__main__:Describing instance http://dbpedia.org/resource/Alan_Turing_Year
INFO:__main__:Describing instance http://dbpedia.org/resource/Americas_Conference_on_Information_Systems
INFO:__main__:Describing instance http://dbpedia.org/resource/Australian

In [None]:
# main()

## read the entity Description from the output file

In [None]:
def read_one_jsonl(filename):
    """
    Reads a JSONL file line by line and yields each parsed JSON object.
    This allows processing one record at a time without loading the whole file.
    """
    try:
        with open(filename, "r", encoding="utf-8") as f:
            for line_number, line in enumerate(f, 1):
                line = line.strip()
                if not line: continue
                try:
                    yield json.loads(line) # Yield the parsed dictionary
                except json.JSONDecodeError:
                    print(f"Warning: Skipping invalid JSON on line {line_number} in {filename}")
    except FileNotFoundError:
        print(f"Error: File not found - {filename}")
    except Exception as e:
        print(f"An unexpected error occurred while reading {filename}: {e}")

# Example Usage:
for record in read_one_jsonl(OUTPUT_FILENAME):
    print("---"*50)
    print(f"IRI: {record.get('iri')}")
    print(f"Description: {record.get('description')}")
    # Process the record here

In [None]:
# Summarize the following information about the entity http://dbpedia.org/resource/100_Word_Story:

# Name: 100 Word Story
# Abbreviation: 100 Word Story
# Type: Academic Journal, Periodical Literature, Written Work, Creative Work
# First published in: 2011
# Frequency of publication: Quarterly
# Academic discipline: Literary Magazine
# Editor: Grant Faulkner
# Homepage: http://www.100wordstory.org/