In [1]:
import os
import requests
import time
import json
from google.cloud import storage, bigquery
from datetime import datetime

# Load Yelp API key from environment variable
API_KEY = os.getenv("YELP_API_KEY")  
# Why? Because we NEVER hardcode secrets in code. Industry standard.

YELP_URL = "https://api.yelp.com/v3/businesses/search"



In [2]:
API_KEY

'_TZZJbs55ENUDK2lRDU0lwmVf1DodAq4ja16ahgD9C_dd1WA8o1bEiCBetWi9cmFS9tdWiH5GKzAUKSh97-RjA62cALkEmAT4VslMrSdO1CzE8FFYVQdMZxw2-JLaHYx'

In [3]:
YELP_URL = "https://api.yelp.com/v3/businesses/search"

# Parameters for the Yelp API
params = {
    "location": "Orange County, CA",
    "categories": "asian restaurants",
    "limit": 50
}

In [4]:
# Make the request
headers = {"Authorization": f"Bearer {API_KEY}"}
response = requests.get(YELP_URL, headers=headers, params=params)

if response.status_code != 200:
    raise Exception(f"Yelp API failed: {response.text}")

data = response.json()

In [5]:
# Save to local as JSON
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"yelp_restaurants_{timestamp}.json"

with open(filename, "w") as f:
    json.dump(data, f)

In [6]:
# Upload to Google Cloud Storage (bronze)
storage_client = storage.Client()
bucket = storage_client.bucket("dri-raw-96")
blob = bucket.blob(f"yelp/{filename}")
blob.upload_from_filename(filename)

In [2]:
import os, google.auth
print("GOOGLE_APPLICATION_CREDENTIALS_DEV =", os.getenv("GOOGLE_APPLICATION_CREDENTIALS_DEV"))
creds, proj = google.auth.default()
print("ADC project =", proj)

GOOGLE_APPLICATION_CREDENTIALS_DEV = /app/gcp-sa-dri-dev.json
ADC project = dri-dev-96


In [9]:
from datetime import datetime, timezone

# Add ingestion timestamp to each record
ingested_at = datetime.now(timezone.utc).isoformat()
rows = [
    {**biz, "_ingested_at": ingested_at}
    for biz in data["businesses"]
]

# Define table_id
table_id = "dri-dev-96.bronze.yelp_restaurants"

# Load into BigQuery
job = bq_client.load_table_from_json(
    rows,
    table_id
)

job.result()
print(f"Inserted {len(rows)} rows into {table_id}")

Inserted 50 rows into dri-dev-96.bronze.yelp_restaurants


In [None]:
## Restarting with new code

In [10]:
# src/ingestion/yelp_ingest.py
import os, sys, json, time, argparse, logging
from datetime import datetime, timezone

import requests                     # HTTP calls to Yelp
from google.cloud import storage    # write files to GCS
from google.cloud import bigquery   # optional: load raw table to BQ

# Console logger (simple and readable)
log = logging.getLogger("yelp_ingest")
log.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
log.addHandler(handler)

In [11]:
def make_timestamps():
    # Folder partition (day) and precise row timestamp (UTC)
    dt = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    ingested_at = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
    return dt, ingested_at

In [12]:
def resolve_env():
    env = os.getenv("ENV", "dev").lower()
    if env not in ("dev", "prod"):
        raise ValueError("ENV must be 'dev' or 'prod'")

    if env == "dev":
        return {
            "env": env,
            "project": os.getenv("GCP_PROJECT_ID_DEV"),
            "bucket":  os.getenv("GCS_BUCKET_NAME_DEV"),
            "keyfile": os.getenv("GOOGLE_APPLICATION_CREDENTIALS_DEV"),
        }
    else:
        return {
            "env": env,
            "project": os.getenv("GCP_PROJECT_ID_PROD"),
            "bucket":  os.getenv("GCS_BUCKET_NAME_PROD"),
            "keyfile": os.getenv("GOOGLE_APPLICATION_CREDENTIALS_PROD"),
        }


In [13]:
def parse_args():
    p = argparse.ArgumentParser(description="Ingest Yelp businesses to GCS bronze (optional BQ).")
    p.add_argument("--term", type=str, help="search term, e.g. 'korean bbq'")
    p.add_argument("--location", type=str, help="e.g. 'Los Angeles, CA'")
    p.add_argument("--latitude", type=float, help="latitude if no location")
    p.add_argument("--longitude", type=float, help="longitude if no location")
    p.add_argument("--categories", type=str, help="e.g. 'korean,bbq'")
    p.add_argument("--per-page", type=int, default=50, help="Yelp max is 50")
    p.add_argument("--max-records", type=int, default=150, help="total cap to fetch")
    p.add_argument("--dt", type=str, help="override partition date YYYY-MM-DD")
    p.add_argument("--write-bq", action="store_true", help="also load a raw BQ table")
    return p.parse_args()


In [14]:
YELP_SEARCH_URL = "https://api.yelp.com/v3/businesses/search"

def fetch_yelp(api_key, *, term=None, location=None, latitude=None, longitude=None,
               categories=None, per_page=50, max_records=150):
    headers = {"Authorization": f"Bearer {api_key}"}
    results = []
    offset = 0

    while len(results) < max_records:
        page_limit = min(per_page, max_records - len(results))
        params = {"limit": page_limit, "offset": offset}
        if term: params["term"] = term
        if location: params["location"] = location
        if latitude is not None and longitude is not None:
            params["latitude"] = latitude; params["longitude"] = longitude
        if categories: params["categories"] = categories

        # retry up to 4 times on 429/5xx with simple exponential backoff
        for attempt in range(1, 5):
            resp = requests.get(YELP_SEARCH_URL, headers=headers, params=params, timeout=30)
            if resp.status_code == 200:
                break
            if resp.status_code in (429, 500, 502, 503, 504) and attempt < 4:
                sleep_s = 2 ** (attempt - 1)  # 1s,2s,4s
                log.warning(json.dumps({"event":"retry", "status":resp.status_code, "sleep":sleep_s}))
                time.sleep(sleep_s)
                continue
            raise RuntimeError(f"Yelp failed: {resp.status_code} {resp.text[:300]}")

        items = resp.json().get("businesses", [])
        results.extend(items)
        log.info(json.dumps({"event":"page", "page_count":len(items), "total":len(results), "offset":offset}))
        if not items or len(items) < page_limit:
            break
        offset += page_limit

    return results


In [15]:
def write_gcs_ndjson(bucket_url, dt, records, shard="0001"):
    assert bucket_url.startswith("gs://"), "bucket must start with gs://"
    bucket_name, *maybe_prefix = bucket_url.replace("gs://","").split("/", 1)
    root = maybe_prefix[0] if maybe_prefix else ""
    path = (f"{root}/bronze/yelp/dt={dt}/part-{shard}.json"
            if root else f"bronze/yelp/dt={dt}/part-{shard}.json")

    body = "\n".join(json.dumps(r, ensure_ascii=False) for r in records)

    client = storage.Client()
    blob = client.bucket(bucket_name).blob(path)
    blob.upload_from_string(body, content_type="application/json")
    uri = f"gs://{bucket_name}/{path}"
    log.info(json.dumps({"event":"gcs_write", "uri":uri, "records":len(records)}))
    return uri


In [16]:
def load_bq_raw(project_id, dataset, table, gcs_uri):
    client = bigquery.Client(project=project_id)
    job_cfg = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        autodetect=True,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND
    )
    table_id = f"{project_id}.{dataset}.{table}"
    job = client.load_table_from_uri(gcs_uri, table_id, job_config=job_cfg)
    job.result()
    log.info(json.dumps({"event":"bq_load", "table":table_id, "uri":gcs_uri}))


In [17]:
def main():
    args = parse_args()
    cfg = resolve_env()

    api_key = os.getenv("YELP_API_KEY")
    if not api_key:
        raise EnvironmentError("Missing YELP_API_KEY")

    # Sanity: required envs for chosen environment
    for k in ("project","bucket","keyfile"):
        if not cfg.get(k):
            raise EnvironmentError(f"Missing {k} for ENV={cfg['env']}")

    # Basic arg validation: need either location OR lat/lon
    if not args.location and (args.latitude is None or args.longitude is None):
        raise ValueError("Provide --location OR both --latitude and --longitude")

    # Timestamps (folder + row)
    dt = args.dt or make_timestamps()[0]
    ingested_at = make_timestamps()[1]

    # Fetch from Yelp
    t0 = time.time()
    rows = fetch_yelp(
        api_key,
        term=args.term, location=args.location,
        latitude=args.latitude, longitude=args.longitude,
        categories=args.categories, per_page=args.per_page, max_records=args.max_records
    )
    log.info(json.dumps({"event":"fetch_done","count":len(rows),"seconds":round(time.time()-t0,2)}))
    if not rows:
        log.warning(json.dumps({"event":"no_results"}))
        return

    # Add row-level timestamp for lineage
    enriched = [{**r, "ingested_at": ingested_at} for r in rows]

    # Write to GCS bronze (partitioned)
    gcs_uri = write_gcs_ndjson(cfg["bucket"], dt, enriched, shard="0001")

    # Optionally load a raw BQ table (one table per day)
    if args.write_bq:
        table = f"yelp_raw_{dt.replace('-','')}"  # e.g., yelp_raw_20250815
        load_bq_raw(cfg["project"], "bronze", table, gcs_uri)

if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] [--term TERM] [--location LOCATION]
                             [--latitude LATITUDE] [--longitude LONGITUDE]
                             [--categories CATEGORIES] [--per-page PER_PAGE]
                             [--max-records MAX_RECORDS] [--dt DT]
                             [--write-bq]
ipykernel_launcher.py: error: unrecognized arguments: --f=/root/.local/share/jupyter/runtime/kernel-v3cc263cd4ccc201afa3d0c80f69a2889d03eeb922.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [18]:
# scripts/ingest_yelp.py
import argparse

def build_parser():
    p = argparse.ArgumentParser()
    p.add_argument("--term")
    p.add_argument("--location")
    p.add_argument("--latitude", type=float)
    p.add_argument("--longitude", type=float)
    p.add_argument("--categories")
    p.add_argument("--per-page", type=int, default=50)
    p.add_argument("--max-records", type=int, default=100)
    p.add_argument("--dt")
    p.add_argument("--write-bq", action="store_true")
    return p

def main(args):
    # your logic here, using args.term, args.location, etc.
    ...

if __name__ == "__main__":
    parser = build_parser()
    # NOTE: parse_known_args ignores any extra flags (like Jupyter's --f=...)
    args, _ = parser.parse_known_args()
    main(args)


In [19]:
# stop/remove old container name if exists
docker stop dri-dev 2>/dev/null || true
docker rm dri-dev 2>/dev/null || true

docker run -it --name dri-dev \
  --env-file .env \
  -e ENV=dev \
  -e DBT_TARGET=dev \
  -p 8888:8888 \
  -v "$(pwd)":/app \
  -v "$(pwd)/gcp-sa-dri-dev.json":/app/gcp-sa-dev.json:ro \
  dri:latest bash


SyntaxError: invalid syntax (1004905283.py, line 2)