In [None]:
# https://docs.python.org/3/library/threading.html

# threading are usefull for IO tasks such as file operations or making network requuests when much of a time is 
# related to waiting for external resources


In [1]:

# By chat model we mean LLM model which operates with chats 
import os 
import json

from langchain_openai import ChatOpenAI 
from dotenv import load_dotenv
from langchain_core.prompts import PromptTemplate

from pydantic import BaseModel, Field
from typing import Literal 

## check performance in agent workflow 
from typing import TypedDict, Annotated, List, Dict, Optional
from langchain_core.messages import BaseMessage, AnyMessage, ToolMessage,HumanMessage, AIMessage, SystemMessage
from langgraph.graph import add_messages , START, END , StateGraph
from IPython.display import Image, display 
from langgraph.checkpoint.memory import MemorySaver

from googleapiclient.discovery import build
from urllib.parse import urlparse

# prepare model for embeddings  
from langchain_openai import OpenAIEmbeddings 
from langchain_core.tools import tool, StructuredTool


from sklearn.metrics.pairwise import cosine_similarity
import numpy as np 
import numpy.typing as npt

from tavily import TavilyClient 

import subprocess
from IPython.display import Image, display


## OpenAI configuration
api_key_var = os.environ.get("OPENAI_API_KEY")
#print("OpenAI API: " , api_key_var)


# keys for tavily 
tavily_api = os.environ.get("TAVILY_API_KEY")
#print("Tavily API: " , tavily_api)

# api for goole
api_google = os.environ.get('GOOGLE_API_KEY')
#print("Google API: " , api_google)






In [2]:
## Generate summary of extracted content

## OpenAI configuration
api_key_var = os.environ.get("OPENAI_API_KEY")
#print("OpenAI API: " , api_key_var)


llm_dummy = ChatOpenAI(
    model="gpt-4.1-mini",  
    temperature=0.2,     
    max_tokens=500,      
    top_p=0.95,          
    timeout=15,         
)

dummy_statements = [
    "Our compliance dashboard aggregates alerts from transaction monitoring, sanctions screening, and adverse media feeds. Analysts triage cases using severity, age, and source credibility. Weekly reviews track false positives, throughput, and SLA adherence. Quarterly audits compare model outcomes with policy thresholds to justify tuning and ensure regulatory alignment.",
    "The research assistant ingests URLs in batches, cleans boilerplate, and extracts salient facts. Summaries capture entities, dates, amounts, and jurisdictions while discarding opinion. A consolidation step merges duplicates across outlets. Final outputs include provenance links, confidence notes, and a timeline suitable for auditors and investigative follow-ups.",
    "For parallel processing, a bounded thread pool dispatches fetch and summarize jobs while honoring rate limits. Retries use exponential backoff with jitter. Structured errors capture domain, stage, attempt, and message. Results are reassembled in input order, enabling deterministic runs, easy diffing, and quick root-cause analysis when failures occur.",
    "Risk evaluation weighs confirmed fines, ongoing investigations, and settlements more heavily than allegations. Jurisdictional diversity and recency increase materiality. Where facts conflict, the system elevates disagreement and tags items for manual review. Strict schema validation prevents malformed claims from propagating, improving trust in downstream scoring and reporting components.",
    "Token accounting records input and output usage per stage, per URL, and per batch. Counters roll up to daily totals by model family to forecast cost and throughput. Anomalies trigger alerts when variance exceeds thresholds. These metrics inform capacity planning, budget approvals, and pragmatic choices about model selection."
]



def generate_url_summary(raw_content: str, llm, results:dict, idx:int):
    
    prompt = f"Extract main idea from text {raw_content}"
           
    response = llm.invoke(prompt)
    summary = response.content.strip()
    results[idx] = summary
      



In [3]:
import threading 
import time 

# Dictionary to collect results (thread-safe for simple assignments)
results = {}
threads = []

# create and start threads  
for idx, text in enumerate(dummy_statements):
    t = threading.Thread(target = generate_url_summary, args = (text, llm_dummy, results , idx) )
    threads.append(t)
# We are creating threads and we tell what to do

# Start each thread
for t in threads:
    t.start() # RuntimeError: threads can only be started once

# wait thread to finish
for t in threads:
    t.join()    

print(results)

{4: 'The main idea is that token accounting tracks usage and aggregates data to forecast costs and throughput, detect anomalies, and support capacity planning, budgeting, and model selection decisions.', 0: 'The compliance dashboard consolidates alerts from various monitoring sources, enabling analysts to prioritize cases based on severity and credibility, while regular reviews and audits ensure accuracy, efficiency, and regulatory compliance.', 2: 'The main idea is that a bounded thread pool manages parallel fetch and summarize tasks within rate limits, using exponential backoff with jitter for retries, structured error tracking, and reassembling results in input order to ensure deterministic execution and facilitate debugging.', 1: 'The research assistant processes batches of URLs to clean and extract key factual information, summarizes important details while excluding opinions, consolidates duplicates, and produces final outputs with source links, confidence levels, and timelines f

In [None]:
# threading.Thread creates worker for my function, but the process is not initiated yet
   # we use it to assing work to each thread

# t.start() no we need to begin the execution 
# t.join() waut thread tofinish before the execution 

# Now thread cant return values directly , we use return to 
  # have shared dictionary 
  # queue 
# Good for: I/O-bound tasks (API calls, file reading, web scraping)

#### Prepare snippet for main code 

In [7]:
## Generate summary of extracted content

## dummy data 

url_to_content = {
    "https://news.example.com/article/alpha": "Alpha Corp announced a strategic partnership to expand its AI-driven risk analytics across European markets. The deal includes data-sharing agreements, a joint research roadmap, and a pilot program with three banks scheduled for Q1 next year.",
    "https://blog.example.org/posts/data-pipelines": "A step-by-step guide to building reliable data pipelines with Python and PostgreSQL. Covers schema design, idempotent ETL jobs, observability basics, and how to version transformations to ensure reproducibility.",
    "https://docs.example.dev/quickstart": "Quickstart for the Example SDK: install the package, authenticate with an API key, initialize the client, and call the /summarize endpoint. Includes code snippets, rate limit notes, and links to pagination and error-handling sections.",
    "https://portal.example.net/status/2025-11-07": "Incident report: elevated latency in EU-West between 14:05â€“14:42 EET due to a misconfigured cache layer. Mitigated by rolling back the config and purging stale entries. Postmortem to follow with action items on alert thresholds and canary tests."
}

thread_outcomes = {}

def generate_url_summary(raw_content: str, llm, entity_name: str, thread_outcomes: dict, url: str):

    print(f"Thread for {url} starting...")

    # Input validation
    if not raw_content or not raw_content.strip():
        thread_outcomes[url] = "Error: No content provided for analysis"
        return
      
    # Truncate content if too long (prevent token limit issues)
    max_content_length = 8000  # Adjust based on your model's limits
    if len(raw_content) > max_content_length:
        raw_content = raw_content[:max_content_length] + "...[truncated]"
    
    prompt = f"""
            You are analyzing an article about {entity_name} company for anti-money laundering due diligence.

            Content to analyze:
            {raw_content}

            Return answer if the information contains an mention of money laudering , 
            if none, respond that data in article is irrelevant

            """
           
    try:
        response = llm.invoke(prompt)

        # Validate response
        if not response or not hasattr(response, 'content'):
            thread_outcomes[url] = "Error: Invalid response from language model"
            return
        
        summary = response.content.strip()
        thread_outcomes[url] = summary
        
        
    except Exception as e:
        # More specific error handling
        error_msg = f"Error generating reputation summary: {str(e)}"
        print(error_msg)  # For debugging
        thread_outcomes[url] = "Error: Could not generate reputation summary due to processing issues"
        return
    


In [8]:
# now trreading
threads = []
entity_name = "Example Corp" 

for url, raw_content in url_to_content.items():
    t = threading.Thread( 
        target = generate_url_summary ,
        args = ( raw_content, llm_dummy, entity_name, thread_outcomes, url )
     )
    threads.append(t)

for thread in threads:
    thread.start() 

for thread in threads:
    t.join()

print(thread_outcomes)            


Thread for https://news.example.com/article/alpha starting...
Thread for https://blog.example.org/posts/data-pipelines starting...
Thread for https://docs.example.dev/quickstart starting...
Thread for https://portal.example.net/status/2025-11-07 starting...
{'https://blog.example.org/posts/data-pipelines': 'The data in the article is irrelevant.', 'https://docs.example.dev/quickstart': 'Data in article is irrelevant.', 'https://news.example.com/article/alpha': 'The data in the article is irrelevant.', 'https://portal.example.net/status/2025-11-07': 'The data in the article is irrelevant to money laundering.'}
