In [77]:
import os
import shutil
import json
import time
import logging
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import google.auth
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload

In [78]:
load_dotenv(override=True)

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")

SERPER_API_KEY = os.getenv("SERPER_API_KEY")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
GOOGLE_CREDS_JSON = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_DRIVE_FOLDER_ID = os.getenv("GOOGLE_DRIVE_FOLDER_ID")
MAX_RETRIES = int(os.getenv("MAX_RETRIES", 3))
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1))
PAGE_START = 1
SERPER_API_KEY_Headers ={
            "X-API-KEY": SERPER_API_KEY,
            "Content-Type": "application/json"
        }
OPENROUTER_API_KEY_Headers={
        "Authorization": f"Bearer {OPENROUTER_API_KEY}",
        "Content-Type": "application/json"
    }
model ='gpt-3.5-turbo'
base_url="https://google.serper.dev/places"
logging.info("Environment variables loaded successfully")

2026-02-07 01:57:25,860 | INFO | Environment variables loaded successfully


In [80]:
def load_input_config():
    config = {}

    with open("input.txt", "r") as file:
        for line in file:
            if "=" in line:
                key, value = line.strip().split("=", 1)
                config[key] = value

    BUSINESS_TYPE = config.get("BUSINESS_TYPE")
    COUNTRY = config.get("COUNTRY")
    ZIP_CODES = [z.strip() for z in config.get("ZIP_CODES", "").split(",") if z.strip()]
    Page_Start = int(config.get("PAGE_START", 1))
    Page_End = int(config.get("PAGE_END"))

    logging.info(f"Business Type: {BUSINESS_TYPE}")
    logging.info(f"Country: {COUNTRY}")
    logging.info(f"ZIP Codes: {', '.join(ZIP_CODES)}")
    logging.info(f"Pages: {Page_Start} to {Page_End}")

    return BUSINESS_TYPE, COUNTRY, ZIP_CODES, Page_Start, Page_End

In [81]:
def safe_post(url, headers, payload, retries=3, delay=2):
    for attempt in range(retries):
        try:
            response = requests.post(url, json=payload, headers=headers)

            if response.status_code == 200:
                return response.json()

            elif response.status_code in (401, 403):
                logging.error(
                    f"API KEY ERROR ({response.status_code}): {response.text}"
                )
                return None

            else:
                logging.warning(
                    f"Request failed ({response.status_code}): {response.text}"
                )

        except requests.exceptions.RequestException as e:
            logging.warning(f"Request exception: {e}")

        time.sleep(delay)

    return None


In [82]:
def fetch_business_data(ZIP_CODES, Page_Start, Page_End, BUSINESS_TYPE, COUNTRY, all_businesses, seen_cids):
    for zip_code in ZIP_CODES:

        if not zip_code.isdigit():
            logging.warning(f"Invalid ZIP skipped: {zip_code}")
            continue

        logging.info(f"Processing ZIP: {zip_code}")

        for page in range(Page_Start, Page_End + 1):
            payload = {
                "q": BUSINESS_TYPE,
                "location": f"{zip_code}, {COUNTRY}",
                "page": page
            }

            headers = SERPER_API_KEY_Headers

            data = safe_post(base_url, headers, payload)

            if not data:
                logging.warning(f"No data returned for ZIP {zip_code} page {page}")
                break

            places = data.get("places", [])

            if not places:
                logging.info(
                    f"No places found on page {page} for ZIP {zip_code}, stopping pagination"
                )
                break

            logging.info(f"Found {len(places)} places on page {page} for ZIP {zip_code}")

            for place in places:
                cid = place.get("cid")

                if not cid or cid in seen_cids:
                    continue

                seen_cids.add(cid)

                all_businesses.append({
                    "Business Name": place.get("title"),
                    "Address": place.get("address"),
                    "Phone": place.get("phoneNumber"),
                    "Website": place.get("website"),
                    "Google Rating": place.get("rating"),
                    "Review Count": place.get("ratingCount"),
                    "Category": place.get("category"),
                    "Latitude": place.get("latitude"),
                    "Longitude": place.get("longitude"),
                    "CID": cid,
                    "GMB URL": f"https://www.google.com/maps?cid={cid}",
                    "Source ZIP": zip_code,
                    "Source Query": f"{BUSINESS_TYPE} {COUNTRY} {zip_code}",
                    "About": ""
                })

            # time.sleep(REQUEST_DELAY)

    logging.info(f"Total unique businesses collected: {len(all_businesses)}")

In [83]:
def generate_ai_descriptions(all_businesses):
    for business in all_businesses:
        prompt = f"""
        Write a short professional SEO-friendly description.
        Name: {business['Business Name']}
        Category: {business['Category']}
        Address: {business['Address']}
        Rating: {business['Google Rating']}
        Reviews: {business['Review Count']}
        """
        headers = OPENROUTER_API_KEY_Headers
        payload = {
            "model": f"openai/{model}",
            "messages": [{"role": "user", "content": prompt}]
        }
        data = safe_post("https://openrouter.ai/api/v1/chat/completions", headers, payload, retries=MAX_RETRIES,
                         delay=REQUEST_DELAY)
        if data and "choices" in data and len(data["choices"]) > 0:
            business["About"] = data["choices"][0]["message"]["content"]

    logging.info("AI descriptions generated")

In [84]:
def create_csv_output(all_businesses):
    OUTPUT_DIR = "output"
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    today = datetime.today().strftime("%Y%m%d")
    csv_path = os.path.join(OUTPUT_DIR, f"Google_Map_Extraction_{today}.csv")

    df = pd.DataFrame(all_businesses)
    df.to_csv(csv_path, index=False, encoding="utf-8-sig")
    logging.info(f"CSV created at: {csv_path} with {df.shape[0]} rows")

    return csv_path, df, OUTPUT_DIR

In [85]:
def upload_to_google_sheets(df):
    SCOPES = [
        "https://www.googleapis.com/auth/spreadsheets",
        "https://www.googleapis.com/auth/drive"
    ]

    creds = Credentials.from_service_account_file(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"), scopes=SCOPES)
    sheets_service = build("sheets", "v4", credentials=creds)
    drive_service = build("drive", "v3", credentials=creds)

    SPREADSHEET_NAME = f"Google_Map_Extraction_"
    today = datetime.today().strftime("%Y%m%d")
    sheet_title = f"{today}"

    response = drive_service.files().list(
        q=f"name='{SPREADSHEET_NAME}' and mimeType='application/vnd.google-apps.spreadsheet'",
        spaces="drive"
    ).execute()
    files = response.get("files", [])

    if files:
        spreadsheet_id = files[0]["id"]
        logging.info(f"Found existing spreadsheet: {SPREADSHEET_NAME}")
    else:
        spreadsheet_body = {
            'properties': {'title': SPREADSHEET_NAME},
            'sheets': [{'properties': {'title': sheet_title}}]
        }
        spreadsheet = sheets_service.spreadsheets().create(body=spreadsheet_body).execute()
        spreadsheet_id = spreadsheet["spreadsheetId"]

        permission = {'type': 'user', 'role': 'writer', 'emailAddress': creds.service_account_email}
        drive_service.permissions().create(fileId=spreadsheet_id, body=permission).execute()

        logging.info(f"Spreadsheet created: {SPREADSHEET_NAME}")

    try:
        sheets_service.spreadsheets().batchUpdate(
            spreadsheetId=spreadsheet_id,
            body={"requests": [{"addSheet": {"properties": {"title": sheet_title}}}]}
        ).execute()
        logging.info(f"Sheet created: {sheet_title}")
    except HttpError as e:
        if "already exists" in str(e):
            logging.info(f"Sheet {sheet_title} already exists")
        else:
            raise

    existing_cids = set()
    sheet_empty = True

    try:
        existing_data = sheets_service.spreadsheets().values().get(
            spreadsheetId=spreadsheet_id,
            range=f"{sheet_title}!A2:Z"
        ).execute()

        rows = existing_data.get("values", [])
        if rows:
            sheet_empty = False

        CID_COL_INDEX = df.columns.get_loc("CID")

        for row in rows:
            if len(row) > CID_COL_INDEX:
                cid = row[CID_COL_INDEX]
                if cid:
                    existing_cids.add(cid)

        logging.info(f"Loaded {len(existing_cids)} existing CIDs from sheet")

    except HttpError:
        logging.warning("Sheet empty or unreadable, assuming no existing data")

    def clean_cell(cell):
        if cell is None or (isinstance(cell, float) and pd.isna(cell)):
            return None
        if isinstance(cell, str):
            cell = cell.replace("\n", " ").replace("\r", " ").replace('"', "'")
            if len(cell) > 50000:
                cell = cell[:50000]
            return cell
        return cell

    df_clean = pd.DataFrame(df).where(pd.notnull(df), None)
    cleaned_values = [[clean_cell(cell) for cell in row] for row in df.values.tolist()]

    filtered_rows = []
    for _, row in df.iterrows():
        cid = row["CID"]
        if cid not in existing_cids:
            filtered_rows.append([clean_cell(cell) for cell in row.tolist()])
        else:
            logging.info(f"Skipped duplicate CID {cid}")

    if not filtered_rows:
        logging.info("No new records found. Nothing appended.")
    else:
        if sheet_empty:
            values = [df.columns.tolist()] + filtered_rows
        else:
            values = filtered_rows

        body = {"values": values}
        sheets_service.spreadsheets().values().append(
            spreadsheetId=spreadsheet_id,
            range=f"{sheet_title}!A1",
            valueInputOption="RAW",
            insertDataOption="INSERT_ROWS",
            body=body
        ).execute()
        logging.info(f"Appended {len(filtered_rows)} new records to Google Sheets")

In [86]:
def cleanup_output_directory(OUTPUT_DIR):
    """Delete the output directory after successful processing"""
    if os.path.exists(OUTPUT_DIR):
        shutil.rmtree(OUTPUT_DIR)
        logging.info(f"{OUTPUT_DIR} deleted after successful processing")
    else:
        logging.warning(f"{OUTPUT_DIR} not found, nothing to delete")


In [87]:
if __name__ == "__main__":
    all_businesses = []
    seen_cids = set()

    BUSINESS_TYPE, COUNTRY, ZIP_CODES, Page_Start, Page_End = load_input_config()

    fetch_business_data(ZIP_CODES, Page_Start, Page_End, BUSINESS_TYPE, COUNTRY, all_businesses, seen_cids)

    generate_ai_descriptions(all_businesses)

    csv_path, df, OUTPUT_DIR = create_csv_output(all_businesses)

    # upload_to_google_sheets(df)

    # cleanup_output_directory(OUTPUT_DIR)

2026-02-07 01:57:46,723 | INFO | Business Type: software house
2026-02-07 01:57:46,724 | INFO | Country: USA
2026-02-07 01:57:46,724 | INFO | ZIP Codes: 11223, 12345
2026-02-07 01:57:46,725 | INFO | Pages: 1 to 2
2026-02-07 01:57:46,725 | INFO | Processing ZIP: 11223
2026-02-07 01:57:48,683 | INFO | Found 10 places on page 1 for ZIP 11223
2026-02-07 01:57:50,209 | INFO | Found 10 places on page 2 for ZIP 11223
2026-02-07 01:57:50,210 | INFO | Processing ZIP: 12345
2026-02-07 01:57:51,667 | INFO | Found 10 places on page 1 for ZIP 12345
2026-02-07 01:57:53,183 | INFO | Found 10 places on page 2 for ZIP 12345
2026-02-07 01:57:53,184 | INFO | Total unique businesses collected: 40
2026-02-07 01:59:54,040 | INFO | AI descriptions generated
2026-02-07 01:59:54,048 | INFO | CSV created at: output\Google_Map_Extraction_20260207.csv with 40 rows
