# Data Ingestion (Bronze 1)

- Data ingestion from ACRA, MasterDB and SSIC mapping, this data will be merged and filter with selected industry to get the specific company havent been researched by MR.

In [1]:
import os
import requests
import aiohttp
import asyncio
import nest_asyncio
import pandas as pd
import time
import scrapy
from scrapy_playwright.page import PageMethod
from bs4 import BeautifulSoup
import nest_asyncio
import glob
import numpy as np
import matplotlib.pyplot as plt
from fuzzywuzzy import fuzz, process
import re


### Getting Master DB via Google API

In [2]:

#Service Acc: masterdb@axiomatic-atlas-476707-k8.iam.gserviceaccount.com

from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build

sheet_id = '1ipwIl7fciIlddvOUqGLpNlVQufw7Xd26Qa-YuJcx-xE'

SCOPES = ['https://www.googleapis.com/auth/spreadsheets']

# Prefer env var GOOGLE_APPLICATION_CREDENTIALS, else fall back to local credentials.json
SERVICE_ACCOUNT_FILE = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", "credentials.json")
if not os.path.exists(SERVICE_ACCOUNT_FILE):
    raise FileNotFoundError(
        f"Service account file not found at '{SERVICE_ACCOUNT_FILE}'. "
        "Set GOOGLE_APPLICATION_CREDENTIALS to the full path, or place credentials.json next to this notebook."
    )

credentials = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
service = build('sheets', 'v4', credentials=credentials)
sheet = service.spreadsheets()


# Using a large range to ensure we get all data (sheets typically don't exceed 1000 columns)
range_a1 = "'MASTER DATABASE 2025 Template'!A:ZZ"
result = sheet.values().get(spreadsheetId=sheet_id, range=range_a1).execute()
values = result.get('values', [])

if values:
    header = values[0]
    data_rows = values[1:]
    
    # Find the maximum number of columns across all rows
    max_len = max([len(header)] + [len(r) for r in data_rows]) if data_rows else len(header)
    
    # Extend header if data rows have more columns
    if len(header) < max_len:
        header = header + [f'col_{i+1}' for i in range(len(header), max_len)]
    
    # Normalize all rows to have the same length
    normalized_rows = [row + [''] * (max_len - len(row)) for row in data_rows]
    
    Master_DB_df = pd.DataFrame(normalized_rows, columns=header)
    print(f"Successfully loaded {len(Master_DB_df)} rows and {len(Master_DB_df.columns)} columns")
else:
    Master_DB_df = pd.DataFrame()
    print("No data found in the sheet")

# --- CLEANING AND STANDARDIZING THE MASTER DB ---

# --- HELPER FUNCTIONS ---
def clean_uen(u: str) -> str | None:
    """Clean UEN: remove non-alphanumeric, convert to uppercase."""
    if pd.isna(u) or u == '':
        return None
    cleaned = re.sub(r"[^A-Z0-9]", "", str(u).upper().strip())
    return None if cleaned == '' else cleaned

def clean_text(text: str) -> str | None:
    """Clean text: strip, uppercase, handle NaN values."""
    if pd.isna(text) or text == '':
        return None
    text = str(text).strip().upper()
    return None if text in ('', 'NAN', 'NONE') else text

def clean_ssic_code(value) -> int | None:
    """Convert SSIC code to integer, handling empty strings and invalid values."""
    if pd.isna(value) or value == '':
        return None
    try:
        # Remove any non-numeric characters and convert
        cleaned = re.sub(r"[^0-9]", "", str(value).strip())
        return int(cleaned) if cleaned else None
    except (ValueError, TypeError):
        return None

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Convert column names to uppercase, replace non-alphanumeric with single underscore."""
    new_cols = [
        re.sub(r"_+", "_", re.sub(r"[^A-Z0-9]", "_", col.upper().strip())).strip("_")
        for col in df.columns
    ]
    df.columns = new_cols
    return df

# --- PROCESS DATA ---
# Select relevant columns first (more efficient than copying entire dataframe)
columns_to_keep = [
    "Company Registration Number (UEN)",
    "ACRA REGISTERED NAME",
    "Brand/Deal Name/Business Name",
    "Primary SSIC Code",
    "PIC NAME 1 Contact Number",
    "PIC 1 email address",
    "Website URL",
    "Parent Industry Type",
    "Sub Industry"
]

# Filter columns that exist in the dataframe
existing_cols = [c for c in columns_to_keep if c in Master_DB_df.columns]
if not existing_cols:
    raise ValueError("None of the required columns found in the dataframe")

master_db_df = Master_DB_df[existing_cols].copy()

# Standardize column names
master_db_df = standardize_columns(master_db_df)

# Find and process UEN column
uen_cols = [c for c in master_db_df.columns if "UEN" in c]
if not uen_cols:
    raise ValueError("UEN column not found after standardization")
uen_col = uen_cols[0]

# Clean UEN using vectorized operations (faster than apply)
master_db_df["UEN"] = master_db_df[uen_col].astype(str).str.upper().str.replace(r"[^A-Z0-9]", "", regex=True)
master_db_df["UEN"] = master_db_df["UEN"].replace(['', 'NAN', 'NONE'], None)
master_db_df = master_db_df.drop(columns=[uen_col])

# Rename columns
rename_map = {
    "BRAND_DEAL_NAME_BUSINESS_NAME": "BRAND_NAME",
    "PRIMARY_SSIC_CODE": "SSIC_CODE",
}
master_db_df = master_db_df.rename(columns={k: v for k, v in rename_map.items() if k in master_db_df.columns})

# Clean text columns using vectorized operations
for col in ["ACRA_REGISTERED_NAME", "BRAND_NAME"]:
    if col in master_db_df.columns:
        master_db_df[col] = (
            master_db_df[col].astype(str).str.strip().str.upper()
            .replace(['', 'NAN', 'NONE'], None)
        )

# Convert SSIC_CODE to integer (handles empty strings and invalid values)
if "SSIC_CODE" in master_db_df.columns:
    master_db_df["SSIC_CODE"] = master_db_df["SSIC_CODE"].apply(clean_ssic_code)

# Keep only required columns that exist
required_cols = ["UEN", "ACRA_REGISTERED_NAME", "BRAND_NAME", "SSIC_CODE"]
available_cols = [c for c in required_cols if c in master_db_df.columns]
master_db_df = master_db_df[available_cols].copy()

# Filter out rows with missing or empty UEN
master_db_df = master_db_df[
    master_db_df["UEN"].notna() & 
    (master_db_df["UEN"].astype(str).str.strip() != "")
].copy()

print(f"Final dataset: {len(master_db_df)} rows, {len(master_db_df.columns)} columns")
master_db_df



RefreshError: ('invalid_grant: Invalid JWT Signature.', {'error': 'invalid_grant', 'error_description': 'Invalid JWT Signature.'})

### Getting ACRA Data (Filter by Live, Live Company only & non relevant ssic code)
- last downloaded oct 25

In [None]:

folder_path = "Acra_Data"

# Get all CSV file paths inside the folder
csv_files = glob.glob(os.path.join(folder_path, "*.csv"))

# Read and combine all CSVs
# Using low_memory=False to avoid DtypeWarning for mixed types
df = pd.concat((pd.read_csv(f, low_memory=False) for f in csv_files), ignore_index=True)


df.columns = df.columns.str.upper()


acra_data = df[[
    "UEN",
    "ENTITY_NAME",
    "BUSINESS_CONSTITUTION_DESCRIPTION",
    "ENTITY_TYPE_DESCRIPTION",
    "ENTITY_STATUS_DESCRIPTION",
    "REGISTRATION_INCORPORATION_DATE",
    "PRIMARY_SSIC_CODE",
    "SECONDARY_SSIC_CODE",
    "STREET_NAME",
    "POSTAL_CODE"
]].copy()

# Convert to proper data types
acra_data['UEN'] = acra_data['UEN'].astype('string')
acra_data['ENTITY_NAME'] = acra_data['ENTITY_NAME'].astype('string')
acra_data['BUSINESS_CONSTITUTION_DESCRIPTION'] = acra_data['BUSINESS_CONSTITUTION_DESCRIPTION'].astype('string')
acra_data['ENTITY_TYPE_DESCRIPTION'] = acra_data['ENTITY_TYPE_DESCRIPTION'].astype('string')
acra_data['ENTITY_STATUS_DESCRIPTION'] = acra_data['ENTITY_STATUS_DESCRIPTION'].astype('string')
acra_data['REGISTRATION_INCORPORATION_DATE'] = pd.to_datetime(acra_data['REGISTRATION_INCORPORATION_DATE'], errors='coerce')

# Clean string columns â€” trim, remove extra spaces, uppercase
for col in [
    'UEN',
    'ENTITY_NAME',
    'BUSINESS_CONSTITUTION_DESCRIPTION',
    'ENTITY_TYPE_DESCRIPTION',
    'ENTITY_STATUS_DESCRIPTION',
    'STREET_NAME',
    'POSTAL_CODE'
]:
    acra_data[col] = (
        acra_data[col]
        .fillna('')
        .str.strip()
        .str.replace(r'\s+', ' ', regex=True)
        .str.upper()
    )

# Replace placeholders with NaN for standardization
acra_data.replace(['NA', 'N/A', '-', ''], np.nan, inplace=True)

# Convert registration date to dd-mm-yyyy string (optional)
acra_data['REGISTRATION_INCORPORATION_DATE'] = acra_data['REGISTRATION_INCORPORATION_DATE'].dt.strftime('%d-%m-%Y')

# Filter only live entities (LIVE COMPANY or LIVE)
acra_data = acra_data[
    acra_data['ENTITY_STATUS_DESCRIPTION'].isin(['LIVE COMPANY', 'LIVE'])
].reset_index(drop=True)

# Exclude specific PRIMARY_SSIC_CODE values (supposedly the data would be 600k plus but when we exclude this would lessen)
exclude_codes = [
    46900, 47719, 47749, 47539, 47536, 56123,
    10711, 10712, 10719, 10732, 10733, 93209
]

acra_data = acra_data[~acra_data['PRIMARY_SSIC_CODE'].isin(exclude_codes)].reset_index(drop=True)

In [None]:
acra_data

### Getting SSIC Industry code

In [None]:
# --- CONFIG ---
file_path = "./SSIC_Code/mapped_ssic_code.xlsx"

# --- LOAD DATA ---
mapped_ssic_code = pd.read_excel(file_path)

# --- STANDARDIZE COLUMN NAMES ---
# Uppercase, strip spaces, replace spaces with underscores
mapped_ssic_code.columns = (
    mapped_ssic_code.columns
    .str.strip()
    .str.upper()
    .str.replace(" ", "_")
)

# --- KEEP ONLY DESIRED COLUMNS ---
columns_to_keep = ["PARENT_INDUSTRY", "INDUSTRY_TYPE", "SUB_INDUSTRY", "SSIC_CODES", "DESCRIPTION"]
mapped_ssic_code = mapped_ssic_code[columns_to_keep].copy()

# --- CLEAN SSIC_CODES COLUMN ---
mapped_ssic_code["SSIC_CODES"] = (
    pd.to_numeric(mapped_ssic_code["SSIC_CODES"], errors="coerce")  # safely convert to numeric
    .fillna(0)
    .astype(int)
)

# --- CLEAN TEXT COLUMNS ---
text_cols = ["PARENT_INDUSTRY", "INDUSTRY_TYPE", "SUB_INDUSTRY", "DESCRIPTION"]
mapped_ssic_code[text_cols] = mapped_ssic_code[text_cols].apply(
    lambda col: col.astype(str).str.strip().str.title()
)

# --- REMOVE DUPLICATES & RESET INDEX ---
mapped_ssic_code = mapped_ssic_code.drop_duplicates().reset_index(drop=True)

mapped_ssic_code.head()


### Merge ACRA data with SSIC code

In [None]:
# Convert PRIMARY_SSIC_CODE to int
acra_data["PRIMARY_SSIC_CODE"] = (
    pd.to_numeric(acra_data["PRIMARY_SSIC_CODE"], errors="coerce")
    .fillna(0)
    .astype(int)
)

# Merge based on SSIC code
acra_data_filtered = acra_data.merge(
    mapped_ssic_code,
    how="left",
    left_on="PRIMARY_SSIC_CODE",
    right_on="SSIC_CODES"
)

# Optional: drop the duplicate 'SSIC CODES' column (keep only PRIMARY_SSIC_CODE)
acra_data_filtered = acra_data_filtered.drop(columns=["SSIC_CODES"], errors="ignore")


### FIlter Acra data with Master DB to get list of companies havent been researched  by MR

In [None]:

# Ensure both UEN columns are strings for accurate matching
acra_data_filtered['UEN'] = acra_data_filtered['UEN'].astype(str).str.strip().str.upper()
master_db_df['UEN'] = master_db_df['UEN'].astype(str).str.strip().str.upper()

# Filter out rows in acra_data_filtered whose UEN is already in master_db_df
acra_data_filtered = acra_data_filtered[~acra_data_filtered['UEN'].isin(master_db_df['UEN'])]

acra_data_filtered.shape

### Filter by  Industry

In [None]:
# tuition, child care and education training

ssic_codes = [
    "85332", "8536", "85360", "85403", "85404",
    "855", "8550", "85501", "85502", "85503", "85504", "85505", "85506", "85507", "85508", "85509",
    "856", "8560", "85601", "85602", "85609",
    "87022", "8891", "88911", "88912", "88991",
    "96094"
]



acra_data_filtered_by_industry = acra_data_filtered[
    (
        (acra_data_filtered["ENTITY_STATUS_DESCRIPTION"].str.lower() == "live") |
        (acra_data_filtered["ENTITY_STATUS_DESCRIPTION"].str.lower() == "live company")
    )
    &
    (acra_data_filtered["PRIMARY_SSIC_CODE"].astype(str).isin(ssic_codes))
]


acra_data_filtered_by_industry

### Filter with existing Fresh Leads

In [None]:
### Filter with existing Fresh Leads

# --- UPDATE HERE: Remove rows if UEN exists in recordowl_results.xlsx ---
Fresh_Leads = pd.read_excel("./Golden_Data/Fresh_Leads_Nov11_Latest.xlsx")

# Select only the required columns
columns_to_select = [
    "Company Registration Number (UEN)",
    "ACRA REGISTERED NAME",
    "Brand/Deal Name/Business Name"
]

# Check which columns exist and select them
existing_columns = [col for col in columns_to_select if col in Fresh_Leads.columns]

if len(existing_columns) != len(columns_to_select):
    missing = [col for col in columns_to_select if col not in Fresh_Leads.columns]
    print(f"Warning: Missing columns: {missing}")
    print(f"Available columns: {list(Fresh_Leads.columns)}")

Fresh_Leads = Fresh_Leads[existing_columns]

if "Company Registration Number (UEN)" in Fresh_Leads.columns and "UEN" in acra_data_filtered_by_industry.columns:
    filtered = acra_data_filtered_by_industry[~acra_data_filtered_by_industry["UEN"].isin(Fresh_Leads["Company Registration Number (UEN)"])]
else:
    raise ValueError("Column 'UEN' not found in one of the dataframes.")

# get sample data 
acra_data_filtered_by_industry = filtered.sample(n=10).reset_index(drop=True)

acra_data_filtered_by_industry.head(10)


In [None]:
acra_data_filtered_by_industry.to_parquet("./Staging/Bronze/bronze_data_1.parquet", index=False, engine="fastparquet")