In [1]:
import pandas as pd

tax_ids = ["22817612"]

df = pd.DataFrame({"tax_id": tax_ids})

In [2]:
from fin_groups.db import OwnershipDB
from fin_groups.crawler import CompanyCrawler
import os
import time
import random
from typing import List

# 1. Initialize
db = OwnershipDB("ownership.db")
crawler = CompanyCrawler(db=db)

def run_company_crawler(db, tax_ids: List[str], cache_file: str = "processed_ids.txt", delay_range: tuple = (0.5, 1.5)):
    """
    Runs the crawler with a local cache file to prevent redundant calls after a crash.
    """
    crawler = CompanyCrawler(db=db)
    
    # 1. Load already processed IDs from cache
    processed_ids = set()
    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            processed_ids = set(line.strip() for line in f)
    
    # 2. Filter the input list to exclude already processed ones
    pending_ids = [str(tid).zfill(8) for tid in tax_ids if str(tid).zfill(8) not in processed_ids]
    total_original = len(tax_ids)
    total_pending = len(pending_ids)
    
    print(f"?? Cache: {len(processed_ids)} already processed.")
    print(f"?? Starting crawl for {total_pending} remaining companies (Total: {total_original})...\n")
    
    stats = {"success": 0, "failed": 0, "empty": 0}

    for index, tid_str in enumerate(pending_ids, 1):
        retries = 3
        success = False
        
        while retries > 0 and not success:
            try:
                time.sleep(random.uniform(*delay_range))
                
                owners = crawler.crawl_company(tid_str)
                
                # We consider it "processed" if it succeeded OR if it's confirmed empty
                # because we don't want to re-scrape empty companies every time.
                if owners:
                    print(f"[{index}/{total_pending}] ID: {tid_str} ? Success ({len(owners)} owners)")
                    stats["success"] += 1
                else:
                    print(f"[{index}/{total_pending}] ID: {tid_str} ??  No owners found")
                    stats["empty"] += 1
                
                # 3. Update Cache File immediately after success
                with open(cache_file, "a") as f:
                    f.write(f"{tid_str}\n")
                
                success = True
                
            except Exception as e:
                retries -= 1
                if retries > 0:
                    wait = 5 * (3 - retries)
                    print(f"[{index}/{total_pending}] ID: {tid_str} ?? Error: {e}. Retrying...")
                    time.sleep(wait)
                else:
                    print(f"[{index}/{total_pending}] ID: {tid_str} ? Failed: {e}")
                    stats["failed"] += 1

    print(f"\n{'='*30}\n?? CRAWL COMPLETE\n? New: {stats['success']}\n??  Empty: {stats['empty']}\n? Failed: {stats['failed']}\n{'='*30}")

# Execution
run_company_crawler(db, df['tax_id'].tolist())

?? Cache: 0 already processed.
?? Starting crawl for 1 remaining companies (Total: 1)...

[{'name': 'ФІЗИЧНІ ТА ЮРИДИЧНІ ОСОБИ', 'profile_link': None, 'country': None, 'role': 'Засновник', 'amount_uah': 3820000, 'share_percent': 100}, {'name': 'Сенчик Олександр Васильович', 'profile_link': 'https://opendatabot.ua/p/senchyk-oleksandr-vasylovych-h3AEVNL-pUlBFZK8qYH6Sg', 'country': 'Україна', 'role': 'Кінцевий бенефіціарний власник', 'amount_uah': None, 'share_percent': None}, {'name': 'Сенчик Олександр Олександрович', 'profile_link': 'https://opendatabot.ua/p/senchyk-oleksandr-oleksandrovych-ys9JRbFNUQ4PxV6PJ8tkww', 'country': 'Україна', 'role': 'Кінцевий бенефіціарний власник', 'amount_uah': None, 'share_percent': None}]
[1/1] ID: 22817612 ? Success (3 owners)

?? CRAWL COMPLETE
? New: 1
??  Empty: 0
? Failed: 0


In [4]:
from typing import Set
from fin_groups.db import OwnershipDB
from fin_groups.normalize import company_entity_id

def get_group_tax_ids(db: OwnershipDB, tax_id: str) -> Set[str]:
    """
    Given a company tax_id, return all tax_ids belonging to the same ownership group.
    """

    # Normalize tax_id exactly as crawler does
    tax_id = str(tax_id).strip()
    company_id = company_entity_id("UA", tax_id)

    # Ensure entity exists
    if not db.get_entity(company_id):
        return set()

    # Get connected component (ownership group)
    group_entity_ids = db.extract_group_ids(company_id)

    if not group_entity_ids:
        return set()

    placeholders = ",".join("?" * len(group_entity_ids))

    rows = db.query_rows(
        f"""
        SELECT DISTINCT tax_id
        FROM entities
        WHERE entity_id IN ({placeholders})
          AND tax_id IS NOT NULL
        """,
        tuple(group_entity_ids)
    )

    return {row["tax_id"] for row in rows}

db = OwnershipDB("ownership.db")

get_group_tax_ids(db, "38803128")

{'33988668', '37829040', '38803128', '39765398', '42826722'}

In [4]:
import sqlite3
import pandas as pd
import os

DB_PATH = "ownership.db"  # or full Drive path if mounted

def query_db(sql: str, params: tuple = ()):
    """
    Execute SQL query on SQLite DB and return pandas DataFrame
    """
    if not os.path.exists(DB_PATH):
        raise FileNotFoundError(f"Database not found at {DB_PATH}")

    conn = sqlite3.connect(DB_PATH)
    try:
        df = pd.read_sql_query(sql, conn, params=params)
    finally:
        conn.close()

    return df

df_bad = query_db("""
    SELECT
        owned_id,
        SUM(share_percent) AS total_share
    FROM ownerships
    WHERE share_percent IS NOT NULL
    GROUP BY owned_id
    HAVING total_share > 100
""")


print(len(df_bad))  # 3970

df_bad["tax_id"] = df_bad["owned_id"].str.split(":").str[-1]
bad_tax_ids = df_bad["tax_id"].tolist()
df_bad.to_excel("companies_to_reprocess.xlsx", index=False)

0
