# Modern Slavery Compliance Analysis Notebook
## Implementation of Phases 1A and 1B
Last updated: 08 Oct 2025, 09:50 AM AEDT

In [None]:
# Install required packages (if not present)
!pip install polars loguru -q

import polars as pl
import os
import json
from loguru import logger
from google.colab import drive
import re

# Configuration
try:
    drive.mount('/content/drive', force_remount=True)
    DRIVE_PATH = '/content/drive/MyDrive/ModernSlaveryProject/'
    logger.info("Google Drive mounted successfully.")
except ImportError:
    DRIVE_PATH = './'
    logger.warning("Not in Google Colab. Using local directory.")

# File paths
abr_bulk_path = os.path.join(DRIVE_PATH, 'abn_bulk_data.jsonl')
asic_names_path = os.path.join(DRIVE_PATH, 'BUSINESS_NAMES_202510.csv')
ato_tax_paths = sorted(glob.glob(os.path.join(DRIVE_PATH, 'CorporateTaxTransparency/*-corporate-report-of-entity-tax-information.xlsx')))
asic_company_path = os.path.join(DRIVE_PATH, 'COMPANY_202509.csv')
acnc_charity_path = os.path.join(DRIVE_PATH, 'acnc-registered-charities.csv')

abr_intermediate_path = os.path.join(DRIVE_PATH, 'intermediate_abr_pairs.parquet')
asic_intermediate_path = os.path.join(DRIVE_PATH, 'intermediate_asic_pairs.parquet')
identity_universe_path = os.path.join(DRIVE_PATH, 'abn_name_lookup.parquet')  # Changed to Parquet
obligation_universe_path = os.path.join(DRIVE_PATH, 'obligated_entities.parquet')  # Changed to Parquet


In [None]:
# Phase 1A: Build the Universe of Identity

def validate_abn(abn_str):
    if not (isinstance(abn_str, str) and len(abn_str) == 11 and abn_str.isdigit()):
        return False
    weights = [10, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
    digits = [int(d) for d in abn_str]
    total = sum(w * d for w, d in zip(weights, digits))
    return total % 89 == 0

def build_abr_intermediate(source_path, output_path, force_rerun=False):
    if os.path.exists(output_path) and not force_rerun:
        logger.info(f"Intermediate file '{os.path.basename(output_path)}' exists. Skipping.")
        return
    logger.info(f"Ingesting from '{os.path.basename(source_path)}'...")
    records = []
    with open(source_path, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            if (i + 1) % 2_000_000 == 0:
                logger.info(f"Processed {i + 1:,} lines")
            try:
                record = json.loads(line)
                abn = record.get('ABN')
                if not abn or not validate_abn(abn):
                    continue
                if record.get('MainEntity') and record['MainEntity'].get('NonIndividualName'):
                    name = record['MainEntity']['NonIndividualName']['NonIndividualNameText']
                    records.append({'ABN': abn, 'Name': name, 'CleanName': re.sub(r'[^A-Za-z0-9\s]', '', name).strip().upper()})
                if record.get('BusinessName'):
                    for bn in record['BusinessName']:
                        if bn.get('BusinessNameText'):
                            name = bn['BusinessNameText']
                            records.append({'ABN': abn, 'Name': name, 'CleanName': re.sub(r'[^A-Za-z0-9\s]', '', name).strip().upper()})
            except (json.JSONDecodeError, KeyError) as e:
                logger.warning(f"Line {i + 1}: JSON error - {e}")
    df = pl.DataFrame(records)
    df.write_parquet(output_path)
    logger.info(f"Extracted {len(df):,} pairs to '{os.path.basename(output_path)}'")

def build_asic_intermediate(source_path, output_path, force_rerun=False):
    if os.path.exists(output_path) and not force_rerun:
        logger.info(f"Intermediate file '{os.path.basename(output_path)}' exists. Skipping.")
        return
    logger.info(f"Ingesting from '{os.path.basename(source_path)}'...")
    df = pl.read_csv(source_path, separator='\t', columns=['BN_NAME', 'BN_ABN'], dtypes={'BN_NAME': pl.Utf8, 'BN_ABN': pl.Utf8})
    df = df.filter(pl.col('BN_ABN').is_not_null() & pl.col('BN_NAME').is_not_null()).rename({'BN_NAME': 'Name', 'BN_ABN': 'ABN'})
    df = df.with_columns([
        pl.col('ABN').apply(validate_abn).alias('is_valid_abn'),
        pl.col('Name').str.replace_all(r'[^A-Za-z0-9\s]', '').str.strip().str.to_upper().alias('CleanName')
    ]).filter(pl.col('is_valid_abn')).drop('is_valid_abn')
    df.write_parquet(output_path)
    logger.info(f"Extracted {len(df):,} pairs to '{os.path.basename(output_path)}'")

def combine_identity_universe(abr_path, asic_path, output_path):
    logger.info("Combining Identity Universe...")
    df_abr = pl.read_parquet(abr_path)
    df_asic = pl.read_parquet(asic_path)
    df = pl.concat([df_abr, df_asic], how='vertical')
    df = df.unique(subset=['ABN', 'Name', 'CleanName']).drop_nulls()
    df.write_parquet(output_path)
    logger.info(f"Final Identity Universe saved with {len(df):,} unique pairs to '{os.path.basename(output_path)}'")

# Execute Phase 1A
build_abr_intermediate(abr_bulk_path, abr_intermediate_path)
build_asic_intermediate(asic_names_path, asic_intermediate_path)
combine_identity_universe(abr_intermediate_path, asic_intermediate_path, identity_universe_path)


In [None]:
# Phase 1B: Build the Universe of Obligation

def get_threshold(row):
    year_start = int(row['Year'].split('-')[0])
    if year_start >= 2022:
        return 100_000_000
    else:
        if row['ASIC_Type'] == 'APTY':
            return 200_000_000
        else:
            return 100_000_000

def build_obligation_universe(ato_paths, asic_path, acnc_path, identity_path, output_path):
    logger.info("Building Universe of Obligation...")

    # Process ATO Corporate Tax Transparency Reports
    all_corporate_df = pl.concat([
        pl.read_excel(file, sheet_name='Income tax details', columns=['ABN', 'Year', 'Total income $'], dtypes={'ABN': pl.Utf8, 'Year': pl.Utf8, 'Total income $': pl.Float64})
        .rename({'Total income $': 'TotalIncome'})
        .filter(pl.col('TotalIncome').is_not_null() & (pl.col('TotalIncome') > 0))
        for file in ato_paths
    ])
    all_corporate_df = all_corporate_df.with_columns([
        pl.col('ABN').str.replace(r'\.0$', '').str.zfill(11),
        pl.col('TotalIncome').cast(pl.Int64)
    ])

    # Join with ASIC for company type
    asic_df = pl.read_csv(asic_company_path, separator='\t', columns=['ABN', 'Type'], dtypes={'ABN': pl.Utf8, 'Type': pl.Utf8})
    asic_df = asic_df.rename({'Type': 'ASIC_Type'}).filter(pl.col('ABN').is_not_null())
    corporate_df = all_corporate_df.join(asic_df, on='ABN', how='left')

    # Apply thresholds
    corporate_df = corporate_df.with_columns([
        pl.col('TotalIncome').apply(get_threshold).alias('Threshold'),
        (pl.col('TotalIncome') >= pl.col('Threshold')).alias('IsObligated')
    ]).filter(pl.col('IsObligated'))

    # Process ACNC Charities
    acnc_df = pl.read_csv(acnc_charity_path, columns=['ABN', 'CharitySize'], dtypes={'ABN': pl.Utf8, 'CharitySize': pl.Utf8})
    charity_df = acnc_df.filter(pl.col('CharitySize') == 'Large').select(['ABN'])

    # Combine obligated entities
    obligated_df = pl.concat([corporate_df.select(['ABN', 'Year']), charity_df], how='vertical').unique(subset=['ABN'])
    obligated_df = obligated_df.join(pl.read_parquet(identity_path).select(['ABN', 'Name']), on='ABN', how='left')

    # Pivot for year-specific flags
    pivot_df = obligated_df.group_by('ABN').agg(
        [pl.lit(1).alias(f'Obligated_{year}') for year in sorted(obligated_df['Year'].unique())]
    ).fill_null(0)

    # Save
    pivot_df.write_parquet(output_path)
    logger.info(f"Universe of Obligation saved with {len(pivot_df):,} entities to '{os.path.basename(output_path)}'")

# Execute Phase 1B
build_obligation_universe(ato_tax_paths, asic_company_path, acnc_charity_path, identity_universe_path, obligation_universe_path)
