<H2><center>Transaction Risk Scoring and Analysis

<H4>Description:</H4> This notebook utilizes multi-shot in-context learning with synthetic transaction data to train Mixtral-8x7B-Instruct-v0.1 for risk scoring financial transactions. The model conducts analysis by assessing transaction risk based on extracted entities. It incorporates transaction details along with data from external sources such as the SEC EDGAR database, the OFAC Sanctions List, WikiData, and recent news articles. 

In [None]:
import json
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from thefuzz import fuzz, process
from openai import OpenAI
import re
import nltk
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from fuzzywuzzy import fuzz
import numpy as np
import requests
import random
import csv
import os

In [None]:
class EntityExtractor:
    def __init__(self, base_url="https://openrouter.ai/api/v1"):
        self.client = OpenAI(
            base_url= base_url,
            api_key= os.getenv("OpenRouter_API_key"))
        self.model = "mistralai/mistral-small-3.1-24b-instruct:free"
        self.max_retries = 5
        
    def extract_entities(self, transaction_text: str) -> list:
        prompt_text = f"""
            Extract all named entities from the following transaction text and classify each entity into one of these categories:
            Person, Politically Exposed Person, Corporation, Bank, Government Agency, Non-Profit Organization, or Shell Company.
            - If the entity's category is ambiguous, classify it as "Corporation".
            - Do NOT include IBANs, VPNs, IPs, addresses, account numbers, tax IDs, location, cities, or countries.
            - Banks should be extracted separately (e.g., "Swiss Bank", "Cayman National Bank") without IBAN/account numbers.
            - Include people with titles (Mr., Mrs., Dr., etc.) as Person.
            - Output only a JSON list of objects. Each object must have exactly two keys: 
              "entity" (the entity name) and "category" (the classified category).
            - Do not output into code block, print as raw text

            Transaction Text:
            {transaction_text}

            Output Format **(Do not output into code block, print as raw text):**
            Eg: 
            [
                {{
                    "entity": "Acme Corp",
                    "category": "Corporation"
                }},
                {{
                    "entity": "SovCo Capital Partners",
                    "category": "Corporation"
                }}
            ]
        """
        for attempt in range(self.max_retries):
            try:
                completion = self.client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {
                            "role": "user",
                            "content": [{"type": "text", "text": prompt_text}]
                        }
                    ]
                )
                extracted_text = completion.choices[0].message.content
                extracted_entities = json.loads(extracted_text)
                return extracted_entities

            except Exception:
                lines = extracted_text.splitlines()
                if len(lines) > 2:
                    trimmed = "\n".join(lines[1:-1]).strip()
                    try:
                        extracted_entities = json.loads(trimmed)
                        return extracted_entities
                    except Exception:
                        pass


        return {"error": "Failed to parse entity extraction response"}

In [3]:
class CompanyMatcher:
    def __init__(self, database):
        self.lemmatizer = WordNetLemmatizer()
        self.raw_database = database  
        self.database = [self.preprocess_text(name) for name in database]  # Preprocessed database
        
        self.vectorizer = TfidfVectorizer()
        if self.database:
            self.vectors = self.vectorizer.fit_transform(self.database)
        else:
            self.vectors = None 

    def preprocess_text(self, text):
        text = text.lower()
        text = re.sub(r'[^a-z0-9\s]', '', text)  
        text = re.sub(r'\s+', ' ', text).strip() 
        
        stopwords = {"corporation", "limited", "ltd", "solutions", "technologies", 
                     "consulting", "consultancy", "services", "systems", "group", 
                     "inc", "pvt", "plc", "co"}
        
        words = text.split()
        words = [self.lemmatizer.lemmatize(word) for word in words if word not in stopwords]  # Apply lemmatization
        return " ".join(words)

    def get_top_cosine_matches(self, query, top_n=5):
        if not self.database:  
            return []

        query_cleaned = self.preprocess_text(query)
        query_vector = self.vectorizer.transform([query_cleaned])
        similarity_scores = cosine_similarity(query_vector, self.vectors).flatten()

        top_n = min(top_n, len(similarity_scores)) 
        top_indices = np.argsort(-similarity_scores)[:top_n]
        top_matches = [(self.raw_database[i], similarity_scores[i]) for i in top_indices]

        return top_matches

    def apply_fuzzy_matching(self, query, candidates):
        if not candidates:
            return None, None

        query_cleaned = self.preprocess_text(query)
        best_match, best_score = max(
            ((name, fuzz.ratio(query_cleaned, self.preprocess_text(name))) for name, _ in candidates),
            key=lambda x: x[1]
        )

        return (best_match, best_score)

    def find_best_match(self, query, top_n=5):
        top_matches = self.get_top_cosine_matches(query, top_n)
        best_match, best_score = self.apply_fuzzy_matching(query, top_matches)
        
        return best_match, best_score

In [None]:
class CompanyScreening:
    def __init__(self, ofac_list_file):
        self.SEC_BASE_URL = "https://www.sec.gov/cgi-bin/browse-edgar"
        self.WIKIDATA_URL = "https://query.wikidata.org/sparql"
        self.NEWS_API_KEY = os.getenv("News_API_key")
        self.OFAC_LIST_FILE = ofac_list_file
        with open(self.OFAC_LIST_FILE, "r") as f:
            self.ofac_companies = [line.strip().lower() for line in f]
        self.matcher = CompanyMatcher(self.ofac_companies)
        
    def check_sec_edgar(self, company_name):
        params = {"action": "getcompany", "company": company_name, "output": "atom"}
        headers = {"User-Agent": "XXX (xxx@yyy.com)"}
        response = requests.get(self.SEC_BASE_URL, params = params, headers = headers)
        if response.status_code == 200 and "No matching companies" not in response.text:
            soup = BeautifulSoup(response.text, "xml")
            cik_tag = soup.find("cik")
            edgar_profile_url = None
            recent_8k_filings = []
            
            if cik_tag:
                cik = cik_tag.text.strip()

            three_years_ago = datetime.now() - timedelta(days=3*365)
            filings = soup.find_all("entry")

            for filing in filings:
                title = filing.find("title").text
                date_str = filing.find("updated").text[:10]
                filing_date = datetime.strptime(date_str, "%Y-%m-%d")

                if "8-K" in title and filing_date >= three_years_ago:
                    recent_8k_filings.append(title)

            return {
                "SEC Registered": "Yes",
                "CIK": cik if cik_tag else "Not available",
                "Recent 8-K Filings": recent_8k_filings[:3] if recent_8k_filings else "None"
            }
        return {
            "SEC Registered": "No"
        }

    def check_ofac_sanctions(self, company_name, threshold=85):
        match, score = self.matcher.find_best_match(company_name)
        return {
            "OFAC Sanctioned": "Yes" if score >= threshold else "No",
            "Closest OFAC Database Match": match if score >= threshold else "None",
        }

    def check_wikidata_scandals(self, company_name):
        
        query = f"""
        SELECT ?company ?companyLabel ?industry ?industryLabel ?scandal ?scandalLabel ?description WHERE {{
          ?company rdfs:label "{company_name}"@en.
          ?company wdt:P31 wd:Q4830453.  # Instance of (Business/Company)
          OPTIONAL {{ ?company wdt:P452 ?industry. }}  # Industry type
          OPTIONAL {{ ?company schema:description ?description. FILTER (LANG(?description) = "en") }}

          # Looking for scandals
          OPTIONAL {{ ?company wdt:P793 ?scandal. }}  # Significant events (may include fraud cases, controversies)
          OPTIONAL {{ ?company wdt:P5053 ?scandal. }} # Cause of dissolution (bankruptcy, fraud)
          OPTIONAL {{ ?company wdt:P2416 ?scandal. }} # Scandals (direct connection)

          SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
        }}
        """
        response = requests.get(self.WIKIDATA_URL, params={"query": query, "format": "json"})

        if response.status_code != 200:
            return {"Error": "Failed to fetch data"}

        data = response.json().get("results", {}).get("bindings", [])

        if not data:
            return {"Status": "Not Found in Wikidata"}

        result = data[0] 
        company_qid = result["company"]["value"].split("/")[-1]
        
        description = result.get("description", {}).get("value", "No description available")
        if "scandal" in result:
            scandal_name = result["scandalLabel"]["value"]
            scandal_qid = result["scandal"]["value"].split("/")[-1]
            scandal_link = f"https://www.wikidata.org/wiki/{scandal_qid}"

        return {
            "Wikidata_QID": company_qid,
            "Description": description,
            "Scandals": (scandal_name, scandal_link) if "scandal" in result else "No known scandals",
        }
    def get_news(self, company_name):
        url = f"https://newsapi.org/v2/everything?q={company_name}&language=en&sortBy=publishedAt&apiKey={self.NEWS_API_KEY}"
        response = requests.get(url)
        if response.status_code != 200:
            return {"Recent News": "Error fetching news"}

        news_data = response.json()
        articles = news_data.get("articles", [])

        filtered_articles = [article["title"] for article in articles if company_name.lower() in article["title"].lower()]
        return {"Recent News": filtered_articles[:3] if filtered_articles else "No relevant news found"}

    def screen_company(self, company_name):
        result = {"Company": company_name}
        result.update(self.check_sec_edgar(company_name))
        result.update(self.check_ofac_sanctions(company_name))
        result.update(self.check_wikidata_scandals(company_name))
        result.update(self.get_news(company_name))
        return result

In [5]:
class TransactionAnalyzer:
    def __init__(self, ofac_list_file):
        self.entity_extractor = EntityExtractor()
        self.company_screening = CompanyScreening(ofac_list_file)

    def analyze_transaction(self, transaction_text):
        extracted_entities = self.entity_extractor.extract_entities(transaction_text)
        final_results = {"Corporation Details": []}

        for entity in extracted_entities:
            if entity["category"] == "Corporation":
                company_details = self.company_screening.screen_company(entity["entity"])
                final_results["Corporation Details"].append(company_details)

        return json.dumps(final_results, indent=4)

In [None]:
class RiskAnalyzer:
    def __init__(self):
        self.api_key = os.getenv("TogetherAI_API_Key") 
        self.api_url = "https://api.together.xyz/v1/chat/completions"
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
    def analyze_risk(self, input_texts, output_texts, test_input):

        examples = random.sample(list(zip(input_texts, output_texts)), 50)
        icl_prompt = "".join([f"Input: {inp}\nOutput: {out}\n\n" for inp, out in examples])
        icl_prompt += f"Input: {test_input}\n Output:"

        data = {
            "model": "mistralai/Mixtral-8x7B-Instruct-v0.1",
            "messages": [{"role": "user", "content": icl_prompt}],
        }
        
        response = requests.post(self.api_url, headers=self.headers, json=data)
        
        try:
            return response.json()["choices"][0]["message"]["content"]
        except KeyError:
            return f"Error: {response.json()}"


In [7]:
with open("corporate_details.txt", "r", encoding="utf-8") as f:
    content = f.read()
    details= [t.strip() for t in content.split('---') if t.strip()]
    
with open("synthetic_unstructured_data.txt", "r", encoding="utf-8") as f:
    content = f.read()
    transactions = [t.strip() for t in content.split('---') if t.strip()]
    
with open("risk_summary.txt", "r", encoding="utf-8") as f:
    content = f.read()
    risk= [t.strip() for t in content.split('---') if t.strip()]

In [8]:
combined_inputs = [f"{a1} {a2}" for a1, a2 in zip(transactions[:56], details[:56])]

In [9]:
with open("unstructured_data.txt", "r", encoding="utf-8") as f:
    content = f.read()
    transactions = [t.strip() for t in content.split('---') if t.strip()]

In [10]:
with open("structured_data.csv", newline = "", encoding="utf-8") as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        if "TransactionText" in row:
            txn_text = row["TransactionText"]
        else:
            txn_text = "\n".join(f"{key}: {value}" for key, value in row.items())
            transactions.append(txn_text)

In [11]:
analyzer = TransactionAnalyzer("ofac_list.txt")

In [12]:
result_json = analyzer.analyze_transaction(transactions[0])
risk_model_input = f"{transactions[0]}\n{result_json}"
analyzer = RiskAnalyzer()
result = analyzer.analyze_risk(combined_inputs, risk, risk_model_input)
print(result)

 Risk Score: 0.75

Reason: The transaction involves a consulting fee paid to a company in Switzerland, with the funds processed via an intermediary in the British Virgin Islands (BVI), a jurisdiction known for its weak financial regulations and transparency. The sender's IP address shows a VPN connection from Panama, adding to the concerns about potential money laundering or tax evasion. Although Global Horizona Consulting LLC is not sanctioned, the unusual circumstances surrounding the transaction warrant a higher risk score.


In [13]:
analyzer = TransactionAnalyzer("ofac_list.txt")
result_json = analyzer.analyze_transaction(transactions[1])
risk_model_input = f"{transactions[1]}\n{result_json}"
result = RiskAnalyzer().analyze_risk(combined_inputs[:56], risk, risk_model_input)
print(result)

 Risk Score: 0.9
Reason: The transaction involves Quantum Holdings Ltd, a company with a beneficiary owner linked to the OFAC SDN List in 2022. The funds are routed through Deutsche Bank Frankfurt and Emirates NBD Dubai, raising concerns about potential money laundering. The approval by Mr. Viktor Petzov, who is linked to the OFAC SDN List, further increases the risk level.


In [14]:
result_json = analyzer.analyze_transaction(transactions[2])
risk_model_input = f"{transactions[2]}\n{result_json}"
analyzer = RiskAnalyzer()
result = analyzer.analyze_risk(combined_inputs[:56], risk, risk_model_input)
print(result)

 Risk Score: 0.2
Reason: Legitimate business transaction between two registered corporations in the same country, no red flags detected.


In [15]:
analyzer = TransactionAnalyzer("ofac_list.txt")
result_json = analyzer.analyze_transaction(transactions[3])
risk_model_input = f"{transactions[3]}\n{result_json}"
analyzer = RiskAnalyzer()
result = analyzer.analyze_risk(combined_inputs[:56], risk, risk_model_input)
print(result)

 Risk Score: 0.2
Reason: Legitimate grant disbursement from a reputable global health foundation to a well-known charity organization. No red flags detected.


In [16]:
analyzer = TransactionAnalyzer("ofac_list.txt")
result_json = analyzer.analyze_transaction(transactions[4])
risk_model_input = f"{transactions[4]}\n{result_json}"
analyzer = RiskAnalyzer()
result = analyzer.analyze_risk(combined_inputs[:56], risk, risk_model_input)
print(result)

 Risk Score: 0.2
Reason: Standard business transaction between two registered entities for the purchase of office supplies.


In [17]:
analyzer = TransactionAnalyzer("ofac_list.txt")
result_json = analyzer.analyze_transaction(transactions[5])
risk_model_input = f"{transactions[5]}\n{result_json}"
analyzer = RiskAnalyzer()
result = analyzer.analyze_risk(combined_inputs[:56], risk, risk_model_input)
print(result)

 Risk Score: 0.5
Reason: Large donation from a US-based organization to a Cayman Islands-based entity. Further scrutiny required due to the Cayman Islands' reputation as a tax haven.


In [18]:
analyzer = TransactionAnalyzer("ofac_list.txt")
result_json = analyzer.analyze_transaction(transactions[6])
risk_model_input = f"{transactions[6]}\n{result_json}"
analyzer = RiskAnalyzer()
result = analyzer.analyze_risk(combined_inputs[:56], risk, risk_model_input)
print(result)

 Risk Score: 0.9
Reason: Oceanic Holdings LLC is OFAC sanctioned, raising significant concerns about this offshore investment in Panama.
