In [None]:
!pip install google-search-results
!pip install requests beautifulsoup4
!pip install pandas
!pip install fuzzywuzzy
!pip install selenium



# I. Step -Sanction list check

In [None]:
import pandas as pd

# URLs for the CSV files
url1 = 'https://www.treasury.gov/ofac/downloads/sdn.csv'
url2 = 'https://www.treasury.gov/ofac/downloads/consolidated/cons_alt.csv'

# Read data from the first CSV file
df1 = pd.read_csv(url1, on_bad_lines='skip')
sanction_list_url1 = df1.iloc[:, 1].dropna().unique()

# Read data from the second CSV file
df2 = pd.read_csv(url2, on_bad_lines='skip')
sanction_list_url2 = df2.iloc[:, 3].dropna().unique()

# Combine the names from both CSV files
sanction_list = list(set(sanction_list_url1) | set(sanction_list_url2))

# Create a DataFrame from the combined names
sanction_list_df = pd.DataFrame({'Sanctioned Names': sanction_list})

# Save the DataFrame to a CSV file
sanction_list_df.to_csv('sanction_list.csv', index=False)

print("Sanctioned names saved to 'sanction_list.csv'")

Sanctioned names saved to 'sanction_list.csv'


In [None]:
import pandas as pd
from fuzzywuzzy import fuzz

# Paths to the files
uploaded_file_path = 'BELGIUM_companies_short.xlsx'
sanction_list_file_path = 'sanction_list.csv'

# Load the companies file
companies_df = pd.read_excel(uploaded_file_path)

# Load the sanction list
sanction_list_df = pd.read_csv(sanction_list_file_path)

# Normalize the sanction list for case-insensitive matching
sanctioned_names = sanction_list_df['Sanctioned Names'].str.lower().tolist()

# Function for approximate matching
def approximate_match(name, sanctioned_names, threshold=85):
    """ Check if a name approximately matches any sanctioned name.
    : name: Name to match
    :sanctioned_names: List of sanctioned names
    :threshold: Minimum similarity score for a match
    : 46 if a match is found, 0 otherwise
    """
    name = name.lower()
    for sanctioned_name in sanctioned_names:
        similarity = fuzz.ratio(name, sanctioned_name)
        if similarity >= threshold:
            return 46  # Match found
    return 0  # No match

# Evaluate if company names approximately match any name in the sanction list
companies_df['Score_Step_1'] = companies_df['Name'].apply(
    lambda name: approximate_match(name, sanctioned_names)
)

# Save the updated  new file
output_file_path = 'Step_1_evaluated_companies.xlsx'
companies_df.to_excel(output_file_path, index=False)

print(f"Evaluation complete with approximate matching. Results saved to {output_file_path}")

Evaluation complete with approximate matching. Results saved to Step_1_evaluated_companies.xlsx


# II STEP- Company status/ active check

In [None]:
import pandas as pd
import numpy as np
import re
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from time import sleep
from multiprocessing import Pool

def process_companies(company_chunk):
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    driver = webdriver.Chrome(options=options)
    wait = WebDriverWait(driver, 20)

    base_url = "https://kbopub.economie.fgov.be/kbopub/zoeknaamfonetischform.html?lang=en"
    result_chunk = []
    successful_count = 0

    company_types = [
        "VZW", "BVBA", "BV", "NV", "CV", "CVBA", "SPRL", "SCRL", "ASBL",
        "Comm.V", "SComm", "VOF", "SNC", "GIE", "AIE", "SE", "Partnership"
    ]

    def clean_company_name(company_name):
        return re.sub(r'\b(?:' + '|'.join(company_types) + r')\b', '', company_name, flags=re.IGNORECASE).strip()

    sleep_time = 10
    for company_name in company_chunk:
        try:
            clean_name = clean_company_name(company_name)
            driver.get(base_url)
            wait.until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
            sleep(sleep_time)
            search_box = wait.until(EC.presence_of_element_located((By.ID, "searchWord")))
            search_box.clear()
            search_box.send_keys(clean_name)

            checkbox = driver.find_element(By.ID, "filterEnkelActieve")
            if checkbox.is_selected():
                checkbox.click()

            search_button = wait.until(EC.element_to_be_clickable((By.NAME, "actionNPRP")))
            search_button.click()
            wait.until(EC.presence_of_element_located((By.TAG_NAME, 'body')))

            try:
                page_text = driver.find_element(By.TAG_NAME, "body").text
                if "no result found for this search term.".lower() in page_text.lower():
                    print(f"No result for {company_name}")
                    result_chunk.append({
                        'OriginalCompanyName': company_name,
                        'CleanedCompanyName': clean_name,
                        'Status': "No result found for this search term",
                        'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    })
                    continue
            except NoSuchElementException:
                pass

            rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '#onderneminglistfonetisch tbody tr')))
            status = "not found in KBO data table"
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            for row in rows:
                name_cell = row.find_element(By.CLASS_NAME, 'benaming').text.strip()
                if name_cell.lower() == clean_name.lower():
                    status_cell = row.find_elements(By.TAG_NAME, 'td')[1].text.strip()
                    status = re.sub(r'\s+', ' ', status_cell).strip()
                    successful_count += 1
                    break

            result_chunk.append({
                'OriginalCompanyName': company_name,
                'CleanedCompanyName': clean_name,
                'Status': status,
                'Timestamp': timestamp
            })

        except (NoSuchElementException, TimeoutException, Exception) as e:
            print(f"Failed to process {company_name}")
            result_chunk.append({
                'OriginalCompanyName': company_name,
                'CleanedCompanyName': clean_name,
                'Status': "error",
                'Timestamp': "N/A"
            })

    driver.quit()
    return result_chunk, successful_count


if __name__ == '__main__':
    start_time = datetime.now()
    print(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")

    # Load the Excel file
    uploaded_file_path = 'BELGIUM_companies_short.xlsx'
    company_list = pd.read_excel(uploaded_file_path)['Name']

    num_workers = 10

    # Split the list of companies into chunks for multiprocessing
    company_chunks = np.array_split(company_list, num_workers)
    with Pool(num_workers) as pool:
        results = pool.map(process_companies, company_chunks)

    # Combine all results
    all_results = [item[0] for item in results]
    successful_count = sum(item[1] for item in results)
    result_df = pd.DataFrame([item for sublist in all_results for item in sublist])

    # Define the scoring dictionary
    status_scores = {
        "ENT LP Active": 1,
        "ENT LP Stopped": 5,
        "error": 2,
        "EU Active": 1,
        "EU Stopped": 5,
        "No result found for this search term": 2,
        "not found in KBO data table": 2
    }

    # Map the 'Status' column to scores based on the dictionary
    result_df['Score'] = result_df['Status'].map(status_scores).fillna(0)

    # Save the updated DataFrame to a new CSV file
    result_df.to_csv('Step_2_company_status_report_with_scores.csv', index=False)

    end_time = datetime.now()
    print(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total time taken: {end_time - start_time}")
    print(f"Total successfully found statuses: {successful_count}")

Start time: 2024-12-23 22:35:15


  return bound(*args, **kwds)


No result for Zwick Roell Belux CV
No result for Van Laer-Mazet/Chris
No result for Brugs Motoren Bedrijf nv
No result for Zzlite
End time: 2024-12-23 22:37:33
Total time taken: 0:02:17.728790
Total successfully found statuses: 13


In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
import pandas as pd
import re
from datetime import datetime
from multiprocessing import Pool
import numpy as np
from time import sleep

def process_companies(company_chunk):
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    driver = webdriver.Chrome(options=options)
    wait = WebDriverWait(driver, 45)

    base_url = "https://kbopub.economie.fgov.be/kbopub/zoeknaamfonetischform.html?lang=en"
    result_chunk = []
    successful_count = 0

    company_types = [
    "VZW", "BVBA", "BV", "NV", "CV", "CVBA", "SPRL", "SCRL", "ASBL",
    "Comm.V", "SComm", "VOF", "SNC", "GIE", "AIE", "SE", "Partnership"
]
    def clean_company_name(company_name):
        return re.sub(r'\b(?:' + '|'.join(company_types) + r')\b', '', company_name, flags=re.IGNORECASE).strip()

    sleep_time = 15
    for company_name in company_chunk:
        try:
            clean_name = clean_company_name(company_name)
            driver.get(base_url)
            wait.until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
            sleep(sleep_time)
            search_box = wait.until(EC.presence_of_element_located((By.ID, "searchWord")))
            search_box.clear()
            search_box.send_keys(clean_name)

            checkbox = driver.find_element(By.ID, "filterEnkelActieve")
            if checkbox.is_selected():
                checkbox.click()

            search_button = wait.until(EC.element_to_be_clickable((By.NAME, "actionNPRP")))
            search_button.click()
            wait.until(EC.presence_of_element_located((By.TAG_NAME, 'body')))

            try:
                page_text = driver.find_element(By.TAG_NAME, "body").text
                if "no result found for this search term.".lower() in page_text.lower():
                    print(f"No result for {company_name}")
                    result_chunk.append({
                        'OriginalCompanyName': company_name,
                        'CleanedCompanyName': clean_name,
                        'Status': "No result found for this search term",
                        'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    })
                    continue
            except NoSuchElementException:
                pass

            rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '#onderneminglistfonetisch tbody tr')))
            status = "not found in KBO data table"
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            for row in rows:
                name_cell = row.find_element(By.CLASS_NAME, 'benaming').text.strip()
                if name_cell.lower() == clean_name.lower():
                    status_cell = row.find_elements(By.TAG_NAME, 'td')[1].text.strip()
                    status = re.sub(r'\s+', ' ', status_cell).strip()
                    successful_count += 1
                    break

            result_chunk.append({
                'OriginalCompanyName': company_name,
                'CleanedCompanyName': clean_name,
                'Status': status,
                'Timestamp': timestamp
            })

        except (NoSuchElementException, TimeoutException, Exception) as e:
            print(f"Failed to process {company_name}")
            result_chunk.append({
                'OriginalCompanyName': company_name,
                'CleanedCompanyName': clean_name,
                'Status': "error",
                'Timestamp': "N/A"
            })

    driver.quit()
    return result_chunk, successful_count


if __name__ == '__main__':
    start_time = datetime.now()
    print(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")

    company_list = pd.read_csv('random_sample_2500_5.csv', encoding='latin-1', sep=',')['Name']
    num_workers = 5

    company_chunks = np.array_split(company_list, num_workers)
    with Pool(num_workers) as pool:
        results = pool.map(process_companies, company_chunks)
    all_results = [item[0] for item in results]
    successful_count = sum(item[1] for item in results)
    result_df = pd.DataFrame([item for sublist in all_results for item in sublist])
    result_df.to_csv('company_status_random_sample_2500_5.csv', index=False)

    end_time = datetime.now()
    print(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total time taken: {end_time - start_time}")
    print(f"Total successfully found statuses: {successful_count}")

# III STEP- web scraping

## 3.1 STEP - web scraping using API

In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup, Comment
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
import re
from googleapiclient.discovery import build
from requests.exceptions import RequestException, SSLError

# Set up your API keys
google_api_key = 'xxx'  # Replace with your Google API key
google_cse_id = 'xxx'  # Replace with your Custom Search Engine ID

# Load the CSV file to get company names
#df = pd.read_csv('Offshore Leaks-entities.csv', low_memory=False, encoding='utf-8')
df = pd.read_excel('BELGIUM_companies_short.xlsx', engine='openpyxl')
#company_names = df['name'][0:20].tolist()  # Processing the first 20 companies

# Keywords and associated scores
keywords_score_30 = [
    "sanctions", "criminal", "crime", "corruption", "shell company", "criminal case", "arrested"
]
keywords_score_5 = [
    "court", "accusation", "penalty", "investigation", "insolvency", "violation", "debt", "blackmail"
]
keywords_score_minus_1 = ["stock"]  # Negative scoring for "stock"

score_no_words = 0

def google_search(search_term, api_key, cse_id, start_index=1):
    service = build("customsearch", "v1", developerKey=api_key)
    try:
        res = service.cse().list(q=search_term, cx=cse_id, start=start_index).execute()
        return res.get('items', [])
    except Exception as e:
        print(f"Failed to search for {search_term} with error: {e}")
        return []

def extract_text_from_url(url):
    headers = {
        'User-Agent': 'Mozilla/5.0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5'
    }
    try:
        response = requests.get(url, headers=headers, verify=False, timeout=10)
        if response.status_code == 200:
            if 'text/html' in response.headers.get('Content-Type', ''):
                soup = BeautifulSoup(response.text, 'html.parser')
                for script in soup(["script", "style", "header", "footer", "form", "nav"]):
                    script.extract()
                for comment in soup.findAll(text=lambda text: isinstance(text, Comment)):
                    comment.extract()
                text = ' '.join(soup.stripped_strings)
                return text
            else:
                return "Non-text content skipped"
        else:
            return ""
    except (RequestException, SSLError) as e:
        return f"Request failed for {url}: {e}"

def clean_text(text):
    text = re.sub(r'\s+', ' ', text)  # Replace multiple whitespace with single space
    text = text.strip()
    return text

def calculate_score(text):
    """
    Determine the score for a given text based on keyword matching.
    """
    text_lower = text.lower()
    if any(keyword in text_lower for keyword in keywords_score_30):
        return 30
    elif any(keyword in text_lower for keyword in keywords_score_5):
        return 5
    elif any(keyword in text_lower for keyword in keywords_score_minus_1):
        return -1
    elif text.strip() == "":
        return score_no_words
    else:
        return score_no_words

def process_company(company_name):
    results = google_search(company_name, google_api_key, google_cse_id)
    company_data = []
    for result in results:
        url = result['link']
        extracted_text = extract_text_from_url(url)
        if extracted_text != "Non-text content skipped":
            extracted_text = clean_text(extracted_text)
            score = calculate_score(extracted_text)
            company_data.append({
                'company': company_name,
                'url': url,
                'extracted_text': extracted_text,
                'score': score
            })
        else:
            company_data.append({
                'company': company_name,
                'url': url,
                'extracted_text': "Skipped due to non-text content",
                'score': score_no_words
            })
    return company_data

# Use ThreadPoolExecutor to process companies in parallel
data = []
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(process_company, name): name for name in company_names}
    for future in as_completed(futures):
        data.extend(future.result())

# Convert list of dicts to DataFrame
df_results = pd.DataFrame(data)

# Save the DataFrame to a CSV file with proper encoding and escaping
output_file_path = 'Step_3.1_company_analysis_with_scores.csv'  # Replace with desired file path
df_results.to_csv(output_file_path, index=False, escapechar='\\', encoding='utf-8', quoting=csv.QUOTE_ALL)

print(f"Data saved to {output_file_path}.")

##V-2

In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup, Comment
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
import re
from googleapiclient.discovery import build
from requests.exceptions import RequestException, SSLError

# Set up your API keys
google_api_key = 'xxx'  # Replace with your Google API key
google_cse_id = 'xxx'   # Replace with your Custom Search Engine ID

# Load the Excel file to get company names
df = pd.read_excel('BELGIUM_companies_short.xlsx', engine='openpyxl')
company_names = df['Name'][:500].tolist()  # Process 500 companies

# Keywords and associated scores
keywords_score_30 = [
    "sanctions", "criminal", "crime", "corruption", "shell company", "offshore",
    "criminal case", "arrested", "fraud", "money laundering",
    "embezzlement", "terrorism financing", "bribery", "tax evasion",
    "illicit funds", "smuggling", "seized assets", "fines",
    "indictment", "prosecuted", "wanted", "scam", "scandal"
]

keywords_score_5 = [
    "court", "accusation", "penalty", "investigation",
    "insolvency", "violation", "debt", "blackmail", "lawsuit",
    "default", "litigation", "settlement", "audit", "suspicious",
    "foreclosure", "dispute", "breach", "illegal transaction",
    "arbitration", "compliance failure", "tax fraud"
]

keywords_score_minus_1 = ["stock"]
score_no_words = 0

# Exclude domains
exclude_domains = [
    'dictionary.com', 'wiktionary.org', 'merriam-webster.com',
    'facebook.com', 'twitter.com', 'vimeo.com', 'youtube.com',
    'linkedin.com', 'reddit.com', 'quora.com', 'instagram.com',
    'tiktok.com', 'pinterest.com', 'justia.com'
]

# Detect dictionary-like content from API results
dictionary_keywords = [
    "definition", "meaning", "dictionary", "thesaurus", "pronunciation"
]


# API-based Google Search
def google_search(search_term, api_key, cse_id, start_index=1):
    service = build("customsearch", "v1", developerKey=api_key)
    try:
        res = service.cse().list(q=search_term, cx=cse_id, start=start_index).execute()
        return res.get('items', [])
    except Exception as e:
        print(f"Failed to search for {search_term} with error: {e}")
        return []


# Check if a URL belongs to an excluded domain
def is_valid_url(url):
    return not any(domain in url for domain in exclude_domains)


# Filter out dictionary or irrelevant pages from API results
def is_dictionary_page(api_result):
    title = api_result.get('title', '').lower()
    snippet = api_result.get('snippet', '').lower()

    if any(keyword in title for keyword in dictionary_keywords):
        return True
    if any(keyword in snippet for keyword in dictionary_keywords):
        return True
    return False


# Extract and clean text from URLs
def extract_text_from_url(url):
    headers = {
        'User-Agent': 'Mozilla/5.0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5'
    }
    try:
        response = requests.get(url, headers=headers, verify=False, timeout=10)
        if response.status_code == 200 and 'text/html' in response.headers.get('Content-Type', ''):
            soup = BeautifulSoup(response.text, 'html.parser')

            # Remove scripts and irrelevant tags
            for script in soup(["script", "style", "header", "footer", "form", "nav"]):
                script.extract()
            for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
                comment.extract()

            return ' '.join(soup.stripped_strings)
        else:
            return "Non-text content skipped"
    except (RequestException, SSLError) as e:
        return f"Request failed for {url}: {e}"


# Clean extracted text
def clean_text(text):
    return re.sub(r'\s+', ' ', text).strip()


# Calculate scores based on keyword proximity
def calculate_score_with_reason(text, snippet, company_name):
    text_lower = text.lower()
    snippet_lower = snippet.lower()
    matching_keywords = []
    score = score_no_words

    def keyword_in_same_sentence(keyword):
        sentences = re.split(r'[.!?]', text_lower)
        for sentence in sentences:
            if company_name.lower() in sentence and keyword in sentence:
                return True
        return False

    for keyword in keywords_score_30:
        if keyword_in_same_sentence(keyword) or keyword in snippet_lower:
            matching_keywords.append(keyword)
            score += 30

    for keyword in keywords_score_5:
        if keyword_in_same_sentence(keyword) or keyword in snippet_lower:
            matching_keywords.append(keyword)
            score += 5

    for keyword in keywords_score_minus_1:
        if keyword_in_same_sentence(keyword) or keyword in snippet_lower:
            matching_keywords.append(keyword)
            score -= 1

    return score, matching_keywords or ["No relevant keywords"]


# Process Each Company
def process_company(company_name):
    results = google_search(company_name, google_api_key, google_cse_id)
    company_data = []

    for result in results:
        url = result['link']

        # Skip dictionary or irrelevant results
        if is_dictionary_page(result) or not is_valid_url(url):
            continue

        extracted_text = extract_text_from_url(url)
        extracted_text = clean_text(extracted_text)

        score, reasons = calculate_score_with_reason(extracted_text, result['snippet'], company_name)

        company_data.append({
            'company': company_name,
            'url': url,
            'extracted_text': extracted_text[:300],
            'score': score,
            'matched_keywords': ', '.join(reasons)
        })

    return company_data


# Process companies in parallel
data = []
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(process_company, name): name for name in company_names}
    for future in as_completed(futures):
        data.extend(future.result())

df_results = pd.DataFrame(data)
df_results.to_csv('Step_3.1_company_analysis_with_scores.csv', index=False, escapechar='\\', encoding='utf-8', quoting=csv.QUOTE_ALL)

print("Data saved successfully.")

## 3.2 STEP - web scraping using BeautifulSoup

In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup, Comment
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
import re
from requests.exceptions import RequestException, SSLError
import time
from urllib.parse import quote
import random

# Load the CSV file to get company names
df = pd.read_excel('BELGIUM_companies_short.xlsx', engine='openpyxl')
#df = pd.read_csv('Offshore Leaks-entities.csv', low_memory=False, encoding='utf-8')
company_names = df['Name'][0:20].tolist()  # Processing the first 20 companies

# Keywords and associated scores
keywords_score_30 = [
    "sanctions", "criminal", "crime", "corruption", "shell company", "offshore",
    "criminal case", "arrested", "fraud", "money laundering",
    "embezzlement", "terrorism financing", "bribery", "tax evasion",
    "illicit funds", "smuggling", "seized assets", "fines",
    "indictment", "prosecuted", "wanted", "scam", "scandal"
]

keywords_score_5 = [
    "court", "accusation", "penalty", "investigation",
    "insolvency", "violation", "debt", "blackmail", "lawsuit",
    "default", "litigation", "settlement", "audit", "suspicious",
    "foreclosure", "dispute", "breach", "illegal transaction",
    "arbitration", "compliance failure", "tax fraud"
]

keywords_score_minus_1 = ["stock"]  # Negative scoring

score_no_words = 0

# User-Agent Rotation
user_agents = [

    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)",
    "Mozilla/5.0 (iPad; CPU OS 13_2 like Mac OS X)"
]

def random_headers():
    return {
        "User-Agent": random.choice(user_agents)
    }

# Domains to Exclude
exclude_domains = [
    'dictionary.com', 'wiktionary.org', 'merriam-webster.com',
    'facebook.com', 'twitter.com', 'vimeo.com', 'youtube.com',
    'linkedin.com', 'reddit.com', 'quora.com', 'instagram.com',
    'tiktok.com', 'pinterest.com', 'justia.com'
]

def is_valid_url(url):
    return not any(domain in url for domain in exclude_domains)

# Bing Search Scraper
def bing_search_scrape(company):
    query = f'"{company}"'
    url = f"https://www.bing.com/search?q={quote(query)}"
    time.sleep(random.uniform(3, 7))

    try:
        response = requests.get(url, headers=random_headers(), timeout=10)
        if response.status_code != 200:
            print(f"Failed to fetch results for {company}: {response.status_code}")
            return []
        soup = BeautifulSoup(response.text, "html.parser")
        results = []

        for g in soup.find_all('li', class_='b_algo'):
            link_tag = g.find('a')
            if not link_tag or 'href' not in link_tag.attrs:
                continue

            link = link_tag['href']
            title = g.find('h2').text if g.find('h2') else ""
            snippet = g.find('p').text if g.find('p') else ""

            if not is_valid_url(link):
                continue

            if title or snippet:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results
    except Exception as e:
        print(f"Error scraping Bing for {company}: {e}")
        return []

# Keywords to detect dictionary-related content
dictionary_keywords = [
    "definition", "meaning", "dictionary", "thesaurus", "pronunciation"
]

# Function to check if the content is likely from a dictionary
def is_dictionary_page(soup):
    # Check title
    if soup.title and any(word in soup.title.text.lower() for word in dictionary_keywords):
        return True

    # Check meta description
    meta_description = soup.find("meta", {"name": "description"})
    if meta_description and any(word in meta_description.get("content", "").lower() for word in dictionary_keywords):
        return True

    # Check h1 or h2 headings
    for tag in soup.find_all(['h1', 'h2']):
        if any(word in tag.text.lower() for word in dictionary_keywords):
            return True

    return False


# Extract Relevant Text from URL (Updated)
def extract_text_from_url(url, company_name):
    try:
        response = requests.get(url, headers=random_headers(), timeout=10)
        if response.status_code == 200 and 'text/html' in response.headers.get('Content-Type', ''):
            soup = BeautifulSoup(response.text, 'html.parser')

            # Filter out dictionary pages
            if is_dictionary_page(soup):
                return "Dictionary content skipped"

            # Clean unwanted sections
            for script in soup(["script", "style", "header", "footer", "form", "nav"]):
                script.extract()
            for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
                comment.extract()

            # Extract relevant text
            relevant_text = ""
            for tag in soup.find_all(['h1', 'h2', 'h3', 'p']):
                if company_name.lower() in tag.text.lower():
                    relevant_text += tag.text + " "

            if not relevant_text:
                relevant_text = ' '.join(soup.stripped_strings)

            return re.sub(r'\s+', ' ', relevant_text[:2000])
        else:
            return "Non-text content skipped"
    except (RequestException, SSLError) as e:
        return f"Request failed for {url}: {e}"

# Calculate Scores Based on Text
def calculate_score_with_reason(text, snippet, company_name):
    text_lower = text.lower()
    snippet_lower = snippet.lower()
    matching_keywords = []
    score = score_no_words

    # Direct sentence match for precision
    def keyword_in_same_sentence(keyword):
        sentences = re.split(r'[.!?]', text_lower)
        for sentence in sentences:
            if company_name.lower() in sentence and keyword in sentence:
                return True
        return False

    # Proximity match for broader detection
    def keyword_near_company(keyword):
        match = re.search(rf"\b{keyword}\b", text_lower)
        if match:
            window = text_lower[max(0, match.start() - 500):match.end() + 500]
            if company_name.lower() in window:
                return True
        return False

    # High-risk keyword scoring
    for keyword in keywords_score_30:
        if keyword_in_same_sentence(keyword):
            matching_keywords.append(keyword)
            score += 30
        elif keyword_near_company(keyword) or keyword in snippet_lower:
            matching_keywords.append(keyword)
            score += 30

    # Medium-risk keyword scoring
    for keyword in keywords_score_5:
        if keyword_in_same_sentence(keyword):
            matching_keywords.append(keyword)
            score += 5
        elif keyword_near_company(keyword) or keyword in snippet_lower:
            matching_keywords.append(keyword)
            score += 5

    # Low-risk or neutral keywords
    for keyword in keywords_score_minus_1:
        if keyword_in_same_sentence(keyword):
            matching_keywords.append(keyword)
            score -= 1
        elif keyword_near_company(keyword) or keyword in snippet_lower:
            matching_keywords.append(keyword)
            score -= 1


    # Boost for financial distress mentions
    if not matching_keywords:
       for term in ["insolvency", "bankruptcy", "liquidation", "dissolved"]:
        if term in text_lower:
            score += 5
            matching_keywords.append(term)

    return score, matching_keywords or ["No relevant keywords"]

# Process Each Company
def process_company(company_name):
    results = bing_search_scrape(company_name)
    company_data = []

    for result in results:
        url = result['link']
        snippet = result['snippet']
        extracted_text = extract_text_from_url(url, company_name)

        if extracted_text != "Non-text content skipped":
            extracted_text = clean_text(extracted_text)
            score, reasons = calculate_score_with_reason(extracted_text, snippet, company_name)
            company_data.append({
                'company': company_name,
                'url': url,
                'extracted_text': extracted_text[:300],
                'score': score,
                'matched_keywords': ', '.join(reasons)
            })
    return company_data

# Multithreading for Efficiency
data = []
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(process_company, name): name for name in company_names}
    for future in as_completed(futures):
        data.extend(future.result())

df_results = pd.DataFrame(data)
df_results.to_csv('Step_3.2_company_analysis_with_scores.csv', index=False, encoding='utf-8', quoting=csv.QUOTE_ALL, escapechar='\\')

print("Data saved successfully.")

Data saved successfully.



# IV STEP- companies scoring



In [None]:
import pandas as pd

# Load the data files
belgium_companies = pd.read_excel('BELGIUM_companies_short.xlsx')
step_1 = pd.read_excel('Step_1_evaluated_companies.xlsx')
step_2 = pd.read_csv('Step_2_company_status_report_with_scores.csv')
step_3 = pd.read_csv('Step_3.2_company_analysis_with_scores.csv')

# Standardize company name columns
step_1.rename(columns={'Name': 'company_name'}, inplace=True)
step_2.rename(columns={'OriginalCompanyName': 'company_name'}, inplace=True)
step_3.rename(columns={'company': 'company_name'}, inplace=True)
belgium_companies.rename(columns={'Name': 'company_name'}, inplace=True)

# Combine all score files
all_scores = pd.concat([step_1[['company_name', 'Score_Step_1']],
                        step_2[['company_name', 'Score']],
                        step_3[['company_name', 'score']].rename(columns={'score': 'Score'})],
                       ignore_index=True)

# Filter for companies in the Belgium list
filtered_scores = all_scores[all_scores['company_name'].isin(belgium_companies['company_name'])]

# Group by company and sum scores
total_scores = filtered_scores.groupby('company_name')['Score'].sum().reset_index()

# Apply risk level based on total score
def assign_risk_level(score):
    if score > 30:
        return 'prohibited'
    elif 7 <= score <= 30:
        return 'high'
    elif 1 <= score <= 6:
        return 'medium'
    elif score < 1:
        return 'low'
    else:
        return 'no risk'

# Assign risk levels
total_scores['risk_level'] = total_scores['Score'].apply(assign_risk_level)

# Save results to CSV
total_scores.to_csv('Step_4_company_risk_scores.csv', index=False)
print("Risk scoring completed and saved as 'Step_4_company_risk_scores.csv'")

Risk scoring completed and saved as 'Step_4_company_risk_scores.csv'
