In [None]:
import os
import re
import glob
import time
import asyncio
import requests
import aiohttp
import nest_asyncio

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import scrapy
from scrapy_playwright.page import PageMethod
from bs4 import BeautifulSoup

from fuzzywuzzy import fuzz, process

from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build


### Getting ACRA List for scrapping to get the names

In [None]:
parquet_path = "./Staging/Bronze/bronze_data_1.parquet"
if os.path.exists(parquet_path):
    acra_data_filtered_by_industry = pd.read_parquet(parquet_path, engine="fastparquet")
    print(f"Loaded {len(acra_data_filtered_by_industry)} rows from {parquet_path}")
    print(acra_data_filtered_by_industry.shape)
else:
    raise FileNotFoundError(f"Parquet file not found at {parquet_path}")

print(acra_data_filtered_by_industry.columns.tolist())


### Merge Silver_data_2_Phone with acra_data_filtered_by_industry to get Acra registered Name 

### with phone

In [None]:
parquet_path = "./Staging/Silver/Silver_data_2_Phone.parquet"
if os.path.exists(parquet_path):
    Silver_data_2_Phone = pd.read_parquet(parquet_path, engine="fastparquet")
    print(f"Loaded {len(Silver_data_2_Phone)} rows from {parquet_path}")
    print(Silver_data_2_Phone.shape)
else:
    raise FileNotFoundError(f"Parquet file not found at {parquet_path}")

In [None]:
With_Phone_df = Silver_data_2_Phone.copy()

In [None]:
def create_search_queries(df):
    queries = []
    for idx, row in df.iterrows():
        entity_name = str(row.get('ENTITY_NAME', '')).strip()
        address = str(row.get('operational_address', '')).strip()
        
        if not entity_name or entity_name == 'nan':
            continue
            
        if address and address != 'nan':
            search_query = f"{entity_name} {address} Singapore"
        else:
            search_query = f"{entity_name} Singapore"
            
        queries.append({'idx': idx, 'entity_name': entity_name, 'search_query': search_query})
    return queries


# Merge With_Phone_df with acra_data_filtered_by_industry to get ENTITY_NAME
With_Phone_df = With_Phone_df.merge(
    acra_data_filtered_by_industry[['UEN', 'ENTITY_NAME']], 
    on='UEN', 
    how='left'
)

print(f"Rows: {len(With_Phone_df)}")
print(f"ENTITY_NAME filled: {With_Phone_df['ENTITY_NAME'].notna().sum()}")


In [None]:
With_Phone_df[["UEN", "ENTITY_NAME", "Phones"]].head(20)

### Merge with Silver_data_2_No_Phone with acra_data_filtered_by_industry to get Acra registered Name 

### without phone

In [None]:
parquet_path = "./Staging/Silver/Silver_data_2_No_Phone.parquet"
if os.path.exists(parquet_path):
    Silver_data_2_No_Phone = pd.read_parquet(parquet_path, engine="fastparquet")
    print(f"Loaded {len(Silver_data_2_No_Phone)} rows from {parquet_path}")
    print(Silver_data_2_No_Phone.shape)
else:
    raise FileNotFoundError(f"Parquet file not found at {parquet_path}")

In [None]:
No_Phone_df = Silver_data_2_No_Phone.copy()

### Formatting the duplicate phones and source 

In [None]:
No_Phone_df['Phones'] = None
No_Phone_df['PIC Source 1'] = None

No_Phone_df["UEN"].is_unique
print(No_Phone_df.columns.tolist())

In [None]:
def create_search_queries(df):
    queries = []
    for idx, row in df.iterrows():
        entity_name = str(row.get('ENTITY_NAME', '')).strip()
        address = str(row.get('operational_address', '')).strip()
        
        if not entity_name or entity_name == 'nan':
            continue
            
        if address and address != 'nan':
            search_query = f"{entity_name} {address} Singapore"
        else:
            search_query = f"{entity_name} Singapore"
            
        queries.append({'idx': idx, 'entity_name': entity_name, 'search_query': search_query})
    return queries


# Merge No_Phone_df with acra_data_filtered_by_industry to get ENTITY_NAME
No_Phone_df = No_Phone_df.merge(
    acra_data_filtered_by_industry[['UEN', 'ENTITY_NAME']], 
    on='UEN', 
    how='left'
)

print(f"Rows: {len(No_Phone_df)}")
print(f"ENTITY_NAME filled: {No_Phone_df['ENTITY_NAME'].notna().sum()}")


In [None]:
No_Phone_df[["UEN", "ENTITY_NAME", "Phones"]].head(20)

In [None]:
No_Phone_df

In [None]:
# =============================================================================
# GOOGLE MAPS PHONE NUMBER SEARCH - COST OPTIMIZED
# =============================================================================

import os
from apify_client import ApifyClient
import pandas as pd
import time
import re
from fuzzywuzzy import fuzz
from dotenv import load_dotenv

load_dotenv()
client = ApifyClient(os.getenv("APIFY_API_KEY"))

BATCH_SIZE = 800
FUZZY_MATCH_THRESHOLD = 1
companies_to_search = No_Phone_df.copy()


def validate_singapore_phone(phone):
    if not phone:
        return None
    cleaned = re.sub(r'[\s\-\(\)\.\|/\+]', '', str(phone))
    if cleaned.startswith('65') and len(cleaned) == 10:
        number_part = cleaned[2:]
        if re.match(r'^[689]\d{7}$', number_part):
            return f"+65{number_part}"
    elif re.match(r'^[689]\d{7}$', cleaned):
        return f"+65{cleaned}"
    return None


def create_search_queries(df):
    queries = []
    for idx, row in df.iterrows():
        entity_name = str(row.get('ENTITY_NAME', '')).strip()
        address = str(row.get('operational_address', '')).strip()
        if not entity_name or entity_name == 'nan':
            continue
        if address and address != 'nan':
            search_query = f"{entity_name} {address} Singapore"
        else:
            search_query = f"{entity_name} Singapore"
        queries.append({'idx': idx, 'entity_name': entity_name, 'search_query': search_query})
    return queries


def run_google_places_scraper(client, search_queries_batch):
    search_strings = [q['search_query'] for q in search_queries_batch]
    run_input = {
        "searchStringsArray": search_strings,
        "maxCrawledPlacesPerSearch": 1,
        "language": "en",
        "scrapeContacts": False,
        "scrapePlaceDetailPage": False,
        "scrapeTableReservationProvider": False,
        "scrapeDirectories": False,
        "includeWebResults": False,
        "maxReviews": 0,
        "maxImages": 0,
        "scrapeReviewsPersonalData": False,
        "skipClosedPlaces": False,
        "maxQuestions": 0,
        "maximumLeadsEnrichmentRecords": 0,
    }
    try:
        run = client.actor("compass/crawler-google-places").call(run_input=run_input)
        if not run or not isinstance(run, dict) or 'id' not in run:
            print(f"ERROR: API returned invalid response: {run}")
            return [], "API returned invalid response"
        run_client = client.run(run["id"])
        run_info = run_client.wait_for_finish()
        status = run_info.get('status', 'UNKNOWN')
        if status in ['FAILED', 'TIMED-OUT', 'ABORTED']:
            print(f"ERROR: Actor run {status}")
            return [], f"Actor run {status}"
        if status == "SUCCEEDED" and "defaultDatasetId" in run:
            dataset = client.dataset(run["defaultDatasetId"])
            return list(dataset.iterate_items()), None
        print(f"ERROR: Scraping failed with status: {status}")
        return [], f"Scraping failed: {status}"
    except Exception as e:
        print(f"ERROR: {type(e).__name__}: {str(e)}")
        return [], f"Error: {type(e).__name__}: {str(e)}"


def fuzzy_match_company(entity_name, google_results, threshold=FUZZY_MATCH_THRESHOLD):
    if not google_results or not entity_name:
        return None, 0
    entity_name_clean = entity_name.upper().strip()
    best_match, best_score = None, 0
    for result in google_results:
        google_name = result.get('title', '') or result.get('name', '')
        if not google_name:
            continue
        google_name_clean = google_name.upper().strip()
        max_score = max(
            fuzz.ratio(entity_name_clean, google_name_clean),
            fuzz.partial_ratio(entity_name_clean, google_name_clean),
            fuzz.token_sort_ratio(entity_name_clean, google_name_clean),
            fuzz.token_set_ratio(entity_name_clean, google_name_clean)
        )
        if max_score > best_score:
            best_score = max_score
            best_match = result
    return (best_match, best_score) if best_score >= threshold else (None, best_score)


# ---- Main Execution ----
search_queries = create_search_queries(companies_to_search)
total_queries = len(search_queries)
num_batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE

print(f"Total search queries: {total_queries}")
print(f"Number of batches: {num_batches}")

if total_queries == 0:
    print("WARNING: No search queries generated. Check that ENTITY_NAME column exists and has values.")

all_results = []
phones_found = 0

for batch_idx in range(0, total_queries, BATCH_SIZE):
    batch_num = (batch_idx // BATCH_SIZE) + 1
    batch = search_queries[batch_idx:batch_idx + BATCH_SIZE]
    print(f"\nProcessing batch {batch_num}/{num_batches} ({len(batch)} queries)...")
    
    items, error = run_google_places_scraper(client, batch)
    
    if error:
        print(f"  Batch {batch_num} error: {error}")
        for query in batch:
            all_results.append({'idx': query['idx'], 'GMaps_Phone': None, 'GMaps_Status': 'error'})
        continue
    
    print(f"  Got {len(items)} results from Google Maps")
    
    results_by_query = {}
    for item in items:
        search_string = item.get('searchString', '')
        if search_string not in results_by_query:
            results_by_query[search_string] = []
        results_by_query[search_string].append(item)
    
    batch_phones = 0
    for query in batch:
        google_results = results_by_query.get(query['search_query'], [])
        if not google_results:
            all_results.append({'idx': query['idx'], 'GMaps_Phone': None, 'GMaps_Status': 'no_results'})
            continue
        best_match, score = fuzzy_match_company(query['entity_name'], google_results)
        if best_match:
            raw_phone = best_match.get('phone') or best_match.get('phoneUnformatted')
            validated_phone = validate_singapore_phone(raw_phone) if raw_phone else None
            if validated_phone:
                phones_found += 1
                batch_phones += 1
            all_results.append({'idx': query['idx'], 'GMaps_Phone': validated_phone, 'GMaps_Status': 'matched'})
        else:
            all_results.append({'idx': query['idx'], 'GMaps_Phone': None, 'GMaps_Status': 'no_match'})
    
    print(f"  Batch {batch_num}: {batch_phones} phones found")
    
    if batch_idx + BATCH_SIZE < total_queries:
        time.sleep(2)

# Update No_Phone_df
for result in all_results:
    if result['GMaps_Phone'] and result['GMaps_Status'] == 'matched':
        idx = result['idx']
        if idx in No_Phone_df.index:
            No_Phone_df.loc[idx, 'Phones'] = result['GMaps_Phone']
            No_Phone_df.loc[idx, 'PIC Source 1'] = "Google"

# Split into two DataFrames
Google_Mapped_Scrapped_with_Phone = No_Phone_df[
    No_Phone_df['Phones'].notna() & 
    (No_Phone_df['Phones'] != '')
].copy()

Google_Mapped_Scrapped_No_Phone = No_Phone_df[
    No_Phone_df['Phones'].isna() | 
    (No_Phone_df['Phones'] == '')
].copy()

# Count unique phones
unique_phones = Google_Mapped_Scrapped_with_Phone['Phones'].nunique()

# Final Summary
print(f"\n{'='*50}")
print(f"SUCCESS")
print(f"Unique phones found: {unique_phones}")
print(f"No phone: {len(Google_Mapped_Scrapped_No_Phone)}")
print(f"Google_Mapped_Scrapped_with_Phone: {len(Google_Mapped_Scrapped_with_Phone)} rows")
print(f"Google_Mapped_Scrapped_No_Phone: {len(Google_Mapped_Scrapped_No_Phone)} rows")

In [None]:
Google_Mapped_Scrapped_with_Phone
print(Google_Mapped_Scrapped_with_Phone.columns.tolist())


In [None]:
Google_Mapped_Scrapped_with_Phone = Google_Mapped_Scrapped_with_Phone[["UEN", "ENTITY_NAME", "Phones", "PIC Source 1", "Emails", "Website", "Facebook", "LinkedIn", "Instagram", "TikTok", "operational_street", "operational_unit", "operational_postal_code",  "operational_address", ]]
Google_Mapped_Scrapped_with_Phone.head(5)

In [None]:

# print(With_Phone_df.columns.tolist())
With_Phone_df = With_Phone_df[["UEN", "ENTITY_NAME", "Phones", "PIC Source 1", "Emails", "Website", "Facebook", "LinkedIn", "Instagram", "TikTok", "operational_street", "operational_unit", "operational_postal_code",  "operational_address", ]]
With_Phone_df.head(5)

In [None]:
phones_unique_1 = With_Phone_df["Phones"].apply(lambda x: tuple(x) if isinstance(x, list) else x).is_unique
phones_unique_2 = Google_Mapped_Scrapped_with_Phone["Phones"].apply(lambda x: tuple(x) if isinstance(x, list) else x).is_unique
uen_unique_1 = With_Phone_df["UEN"].is_unique
uen_unique_2 = Google_Mapped_Scrapped_with_Phone["UEN"].is_unique

print(f"With_Phone_df Phones unique: {phones_unique_1}")
print(f"Google_Mapped Phones unique: {phones_unique_2}")
print(f"With_Phone_df UEN unique: {uen_unique_1}")
print(f"Google_Mapped UEN unique: {uen_unique_2}")

if all([phones_unique_1, phones_unique_2, uen_unique_1, uen_unique_2]):
    With_Phone_df = pd.concat([With_Phone_df, Google_Mapped_Scrapped_with_Phone], ignore_index=True)
    print(f"Appended. With_Phone_df now has {len(With_Phone_df)} rows")
else:
    print("NOT appended â€” duplicate values detected")


In [None]:
With_Phone_df.shape

In [None]:
With_Phone_df.to_parquet("./Staging/Gold/Gold_Scrapped_Data_1.parquet", index=False, engine="fastparquet")