Scraping Data from Web

Install dependencies

In [1]:
pip install requests python-dotenv

Collecting requests
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting charset_normalizer<4,>=2 (from requests)
  Downloading charset_normalizer-3.4.3-cp313-cp313-win_amd64.whl.metadata (37 kB)
Collecting idna<4,>=2.5 (from requests)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting urllib3<3,>=1.21.1 (from requests)
  Downloading urllib3-2.5.0-py3-none-any.whl.metadata (6.5 kB)
Collecting certifi>=2017.4.17 (from requests)
  Downloading certifi-2025.8.3-py3-none-any.whl.metadata (2.4 kB)
Downloading requests-2.32.5-py3-none-any.whl (64 kB)
Downloading charset_normalizer-3.4.3-cp313-cp313-win_amd64.whl (107 kB)
Using cached idna-3.10-py3-none-any.whl (70 kB)
Downloading urllib3-2.5.0-py3-none-any.whl (129 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Downloading certifi-2025.8.3-py3-none-any.whl (161 kB)
Installing collected packages: 

Import Packages

In [8]:
import os, time, csv, random, sys
from typing import Dict, List, Tuple, Optional
from dotenv import load_dotenv
import requests

Load API

In [9]:
load_dotenv()
API_KEY = os.environ.get("PLACES_API_KEY")
if not API_KEY:
    raise RuntimeError("ERROR: PLACES_API_KEY not found in .env")

TEXT_SEARCH_URL   = "https://places.googleapis.com/v1/places:searchText"
PLACE_DETAILS_URL = "https://places.googleapis.com/v1/places/{place_id}"

SESSION = requests.Session()
SESSION.headers.update({
    "Content-Type": "application/json",
    "X-Goog-Api-Key": API_KEY,
})

def backoff_sleep(attempt):
    time.sleep(min(2 ** attempt + random.random(), 20))

In [10]:
SG_RECT = {
    "low_lat": 1.130, "low_lng": 103.600,
    "high_lat": 1.475, "high_lng": 104.100
}

QUERIES = [
    "hawker centre", "food court", "coffee shop", "bar", "nightclub", "karaoke",
    "massage", "mobile phone repair", "electronics store", "budget hotel", "hostel",
    "car workshop", "clinic", "pawn shop", "arcade", "supermarket", "convenience store",
    "tourist attraction"
]

In [11]:
def text_search_batch(query, rect, page_size=20, max_pages=3):
    places = []
    body = {
        "textQuery": query,
        "locationRestriction": {
            "rectangle": {
                "low": {"latitude": rect["low_lat"], "longitude": rect["low_lng"]},
                "high": {"latitude": rect["high_lat"], "longitude": rect["high_lng"]},
            }
        },
        "pageSize": page_size
    }
    headers = {"X-Goog-FieldMask": "places.id,places.displayName,nextPageToken"}

    next_token = None
    for _ in range(max_pages):
        payload = dict(body)
        if next_token:
            payload["pageToken"] = next_token
        r = SESSION.post(TEXT_SEARCH_URL, headers=headers, json=payload)
        r.raise_for_status()
        data = r.json()
        places.extend(data.get("places", []) or [])
        next_token = data.get("nextPageToken")
        if not next_token:
            break
    return places

In [12]:
def place_details_with_reviews(place_id, verbose=False):
    """
    Fetch Place Details (New) with reviews + first photo URL.
    Returns (meta, reviews_list).
    """
    field_mask = ",".join([
        "id","displayName","googleMapsUri","types","location",
        "rating","userRatingCount","reviews","photos"  # <-- photos added
    ])
    headers = {"X-Goog-FieldMask": field_mask}
    url = PLACE_DETAILS_URL.format(place_id=place_id)

    r = SESSION.get(url, headers=headers, timeout=30)
    if r.status_code != 200:
        if verbose:
            print(f"[Details] ERROR for {place_id} — {r.status_code} {r.text[:200]}")
        return None, []

    data = r.json()

    # Build a URL for the *first* photo if present
    photo_url = None
    photos = data.get("photos") or []
    if photos:
        ref = photos[0].get("name")  # photo resource name
        if ref:
            # New Places Photo API style
            photo_url = f"https://places.googleapis.com/v1/{ref}/media?maxHeightPx=400&key={API_KEY}"

    meta = {
        "place_id": data.get("id"),
        "place_name": (data.get("displayName") or {}).get("text"),
        "gmaps_url": data.get("googleMapsUri"),
        "types": ",".join(data.get("types", [])) if data.get("types") else None,
        "place_rating": data.get("rating"),
        "place_user_rating_count": data.get("userRatingCount"),
        "lat": (data.get("location") or {}).get("latitude"),
        "lng": (data.get("location") or {}).get("longitude"),
        "photo_url": photo_url,  # <-- new column
    }
    reviews = data.get("reviews", []) or []
    return meta, reviews


In [13]:
def harvest_reviews(
    queries,
    rect,
    target_reviews=5000,          # total rows desired in the CSV (existing + new)
    per_place_cap=5,
    oversample_places=2000,
    csv_name="google_reviews_singapore.csv"
):
    """
    Resume-safe:
      - If csv exists, append to it and skip duplicates (place_id + publish_time).
      - Writes 'photo_url' (first photo) per place.
      - Returns (csv_name, total_rows_written_in_file).
    """
    # --- load existing rows (if any) to de-dup ---
    existing_pairs = set()
    total_existing = 0
    if os.path.exists(csv_name):
        with open(csv_name, "r", encoding="utf-8") as f:
            r = csv.DictReader(f)
            for row in r:
                total_existing += 1
                pid = row.get("place_id")
                pub = row.get("publish_time")
                if pid and pub:
                    existing_pairs.add((pid, pub))
        print(f"[Resume] Found existing file with {total_existing} rows")

    # --- collect candidate place IDs via Text Search ---
    all_places = []
    for q in queries:
        batch = text_search_batch(q, rect, page_size=20, max_pages=3)
        all_places.extend(batch)

    ids, seen = [], set()
    for p in all_places:
        pid = p.get("id")
        if pid and pid not in seen:
            seen.add(pid)
            ids.append(pid)
    random.shuffle(ids)
    if oversample_places:
        ids = ids[:oversample_places]
    print(f"[Harvest] Candidate place_ids to inspect: {len(ids)}")

    # --- open CSV for append ---
    fieldnames = [
        "place_id","place_name","gmaps_url","types","place_rating","place_user_rating_count",
        "lat","lng","photo_url",               # <-- photo_url included here
        "review_rating","review_text","review_language",
        "relative_time","publish_time","author"
    ]
    file_exists = os.path.exists(csv_name)
    f = open(csv_name, "a", newline="", encoding="utf-8")
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    if not file_exists or total_existing == 0:
        writer.writeheader()

    total_rows = total_existing
    try:
        for i, pid in enumerate(ids, 1):
            meta, reviews = place_details_with_reviews(pid, verbose=False)
            if not meta:
                continue

            kept_here = 0
            for rv in reviews[:per_place_cap]:
                pair = (meta["place_id"], rv.get("publishTime"))
                if pair in existing_pairs:
                    continue  # skip duplicate (already in CSV)

                row = {
                    **meta,  # includes photo_url
                    "review_rating": rv.get("rating"),
                    "review_text": (rv.get("originalText") or {}).get("text") or (rv.get("text") or ""),
                    "review_language": (rv.get("originalText") or {}).get("languageCode"),
                    "relative_time": rv.get("relativePublishTimeDescription"),
                    "publish_time": rv.get("publishTime"),
                    "author": (rv.get("authorAttribution") or {}).get("displayName"),
                }
                writer.writerow(row)
                existing_pairs.add(pair)
                kept_here += 1
                total_rows += 1

                if total_rows >= target_reviews:
                    print(f"[Harvest] Reached target {target_reviews}. Wrote: {csv_name}")
                    return csv_name, total_rows

            if i % 25 == 0:
                print(f"[Harvest] Processed {i} places — CSV now has {total_rows} rows")

        print(f"[Harvest] Finished candidates. CSV has {total_rows} rows. Wrote: {csv_name}")
        return csv_name, total_rows
    finally:
        f.close()


In [14]:
csv_path, n = harvest_reviews(
    QUERIES,                 # your list of text queries
    SG_RECT,            # your bounding box
    target_reviews=5000,     # total rows you want in the file (existing + new)
    per_place_cap=5,         # API returns up to 5 reviews/place
    oversample_places=2500,  # more IDs → more chances to reach target
    csv_name="google_reviews_singapore.csv"
)
print("Result:", csv_path)

[Harvest] Candidate place_ids to inspect: 1055
[Harvest] Processed 25 places — CSV now has 120 rows
[Harvest] Processed 50 places — CSV now has 241 rows
[Harvest] Processed 75 places — CSV now has 358 rows
[Harvest] Processed 100 places — CSV now has 468 rows
[Harvest] Processed 125 places — CSV now has 584 rows
[Harvest] Processed 150 places — CSV now has 701 rows
[Harvest] Processed 175 places — CSV now has 811 rows
[Harvest] Processed 200 places — CSV now has 927 rows
[Harvest] Processed 225 places — CSV now has 1051 rows
[Harvest] Processed 250 places — CSV now has 1167 rows
[Harvest] Processed 275 places — CSV now has 1285 rows
[Harvest] Processed 300 places — CSV now has 1400 rows
[Harvest] Processed 325 places — CSV now has 1516 rows
[Harvest] Processed 350 places — CSV now has 1636 rows
[Harvest] Processed 375 places — CSV now has 1755 rows
[Harvest] Processed 400 places — CSV now has 1877 rows
[Harvest] Processed 425 places — CSV now has 1991 rows
[Harvest] Processed 450 place