# Mini Streaming Pipeline v2: RapidAPI (Amazon) → Transform → MongoDB

**Changes in v2**
- Secrets file now supports **rapidhost:** on its own line (beside `rapidkey:` and `mongodb-server:`).
- All API headers use the provided `rapidhost` value.
- Same tiny, free‑tier friendly pipeline and streaming logs.

### Secrets file format (3 lines):
```
rapidkey: YOUR_RAPID_KEY
rapidhost: real-time-amazon-data.p.rapidapi.com
mongodb-server: YOUR_MONGODB_URI
```
Set `SECRETS_TXT` to the path of this file and run.

In [None]:
import sys, subprocess
def pip_install(pkgs):
    try:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--quiet', *pkgs])
    except Exception as e:
        print('[WARN] pip install issue:', e)

pip_install(['requests', 'pymongo', 'tenacity'])

import os, time, json, re
from datetime import datetime
import requests
from pymongo import MongoClient
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

In [None]:
RAPID_KEY = '1316cdc108msh9ae1403350f00f2p16a7ecjsndfc84249a446'
RAPID_HOST = 'real-time-amazon-data.p.rapidapi.com'
MONGODB_URI = 'mongodb+srv://admin:adminpassword@assignment3.dhfn7vh.mongodb.net/?retryWrites=true&w=majority&appName=Assignment3'

print('RAPID_KEY:', ('****' + RAPID_KEY[-6:]) if RAPID_KEY else None)
print('RAPID_HOST:', RAPID_HOST)
print('MONGODB_URI:', ('...' + MONGODB_URI[-24:]) if MONGODB_URI else None)


RAPID_KEY: ****49a446
RAPID_HOST: real-time-amazon-data.p.rapidapi.com
MONGODB_URI: ...rity&appName=Assignment3


In [None]:
ELECTRONICS_TYPE = [
    'laptop', 'notebook', 'ultrabook', 'gaming laptop',
    'tv', 'television', 'oled', 'qled', '4k tv', 'smart tv'
]

ELECTRONICS_BRAND = [
    'apple','lenovo','dell','hp','asus','acer','msi','razer','gigabyte','alienware','microsoft','huawei','xiaomi',
    'samsung','lg','sony','tcl','hisense','panasonic','philips','toshiba','vizio'
]

def classify_type(title: str):
    t = (title or '').lower()
    for key in ELECTRONICS_TYPE:
        if key in t:
            return key
    if any(k in t for k in ['laptop','notebook','ultrabook']):
        return 'laptop'
    if 'tv' in t or 'television' in t:
        return 'tv'
    return 'other'

def classify_brand(title: str):
    t = (title or '').lower()
    for b in ELECTRONICS_BRAND:
        if re.search(rf'\b{re.escape(b)}\b', t):
            return b
    if re.search(r'\bhewlett\s*packard\b', t):
        return 'hp'
    return 'unknown'

In [None]:
BASE_URL = f'https://{RAPID_HOST}' if RAPID_HOST else 'https://real-time-amazon-data.p.rapidapi.com'
HEADERS = {
    'x-rapidapi-key': RAPID_KEY or '',
    'x-rapidapi-host': RAPID_HOST or 'real-time-amazon-data.p.rapidapi.com'
}

class RapidApiError(Exception):
    pass

from requests.exceptions import RequestException
from tenacity import RetryError

@retry(reraise=True,
       retry=retry_if_exception_type((RapidApiError, RequestException)),
       wait=wait_exponential(multiplier=1, min=2, max=16),
       stop=stop_after_attempt(5))
def call_api(path: str, params: dict):
    url = BASE_URL.rstrip('/') + path
    r = requests.get(url, headers=HEADERS, params=params, timeout=30)
    if r.status_code == 429:
        raise RapidApiError('429 Too Many Requests')
    if r.status_code >= 400:
        raise RapidApiError(f'HTTP {r.status_code}: {r.text[:200]}')
    try:
        return r.json()
    except ValueError:
        raise RapidApiError('Invalid JSON in response')

def product_search(query: str, country='US', page=1):
    params = {'query': query, 'country': country, 'page': page}
    data = call_api('/search', params)
    items = (data.get('data') or {}).get('products') or data.get('results') or []
    return items

def product_offers(asin: str, country='US'):
    params = {'asin': asin, 'country': country}
    data = call_api('/product-offers', params)
    return data.get('data') or data

In [None]:
def product_reviews(asin: str, country='US'):
    """
    Fetch reviews for an ASIN. Endpoint names differ by plan; try common paths.
    Returns raw JSON (we will trim later).
    """
    paths = ['/product-reviews', '/reviews']
    last_err = None
    for p in paths:
        try:
            data = call_api(p, {'asin': asin, 'country': country})
            return data.get('data') or data
        except Exception as e:
            last_err = e
            continue
    if last_err:
        raise last_err

def trim_reviews(reviews_raw: dict, title_limit=8):
    """Return a compact object with a list of review titles and basic counts."""
    titles = []
    items = []
    if isinstance(reviews_raw, dict):
        items = reviews_raw.get('reviews') or reviews_raw.get('data') or []
    elif isinstance(reviews_raw, list):
        items = reviews_raw
    for r in items:
        t = (r.get('title') or r.get('review_title') or r.get('headline') or '').strip()
        if t:
            titles.append(t)
        if len(titles) >= title_limit:
            break
    return {
        'count': (len(items) if isinstance(items, list) else 0),
        'titles': titles
    }


In [None]:
from pymongo import MongoClient
client = MongoClient(MONGODB_URI) if MONGODB_URI else None
db = client['Assignment3'] if client is not None else None
COLLECTION_NAME = 'product_catalog'
coll = db[COLLECTION_NAME] if db is not None else None

if coll is not None:
    coll.create_index('asin', unique=True)
    coll.create_index([('category.type', 1)])
    coll.create_index([('category.brand', 1)])
print('[INFO] Mongo connected?', (coll is not None))


[INFO] Mongo connected? True


In [None]:
def extract_core_fields(prod: dict):
    asin = prod.get('asin') or prod.get('ASIN')
    title = prod.get('product_title') or prod.get('title') or ''
    rating = prod.get('product_star_rating') or prod.get('rating')
    ratings_total = (prod.get('product_num_ratings') or prod.get('ratings_total')
                     or prod.get('reviews_count') or prod.get('review_count'))
    brand = classify_brand(title)
    typ = classify_type(title)
    return asin, title, rating, ratings_total, {'type': typ, 'brand': brand}

def trim_offers(offers_raw: dict, limit=3):
    if not isinstance(offers_raw, dict):
        return offers_raw
    sellers = offers_raw.get('sellers') or offers_raw.get('offers') or []
    trimmed = []
    for s in sellers[:limit]:
        trimmed.append({
            'price': s.get('price') or s.get('price_raw') or s.get('buybox_price'),
            'currency': s.get('currency') or s.get('currency_symbol'),
            'condition': s.get('condition'),
            'is_prime': s.get('is_prime') or s.get('prime')
        })
    summary = { 'buybox_price': offers_raw.get('buybox_price'),
                'total_offers': offers_raw.get('total_offers') or len(sellers) }
    return {'summary': summary, 'top_offers': trimmed}

In [None]:
def stream_keywords(keywords, country='US', max_per_keyword=3, sleep_sec=1.8):
    if not RAPID_KEY:
        print('[ERROR] RAPID_KEY missing. Fix demo constants and rerun.')
        return
    if not RAPID_HOST:
        print('[ERROR] RAPID_HOST missing.')
        return
    if coll is None:
        print('[ERROR] Mongo not connected. Fix MONGODB_URI and rerun.')
        return

    total_seen, total_upserted = 0, 0
    for kw in keywords:
        print(f"[INFO] Searching '{kw}'...")
        try:
            products = product_search(kw, country=country, page=1)
        except Exception as e:
            print(f"[WARN] search fail '{kw}': {e}")
            time.sleep(sleep_sec)
            continue

        count = 0
        for prod in products:
            if count >= max_per_keyword:
                break
            asin, title, rating, ratings_total, category = extract_core_fields(prod)
            if not asin:
                continue

            offers_obj = None
            try:
                offers_raw = product_offers(asin, country=country)
                offers_obj = trim_offers(offers_raw)
            except Exception as e:
                print(f"[INFO] offers fail ASIN={asin}: {e}")

            reviews_obj = None
            try:
                reviews_raw = product_reviews(asin, country=country)
                reviews_obj = trim_reviews(reviews_raw)
            except Exception as e:
                print(f"[INFO] reviews fail ASIN={asin}: {e}")

            doc = {
                'asin': asin,
                'product_title': title,
                'rating': rating,
                'ratings_total': ratings_total,
                'category': category,
                'offers': offers_obj,
                'product_review': reviews_obj,
                'fetched_at': datetime.utcnow()
            }

            try:
                res = coll.update_one({'asin': asin}, {'$set': doc}, upsert=True)
                if res.upserted_id or res.modified_count:
                    total_upserted += 1
                total_seen += 1
                count += 1
                print(f"[INFO] Upserted ASIN={asin} | type={category['type']} | brand={category['brand']} | reviews={reviews_obj['count'] if reviews_obj else 0}")
            except Exception as e:
                print(f"[ERROR] Mongo upsert ASIN={asin}: {e}")

            time.sleep(sleep_sec)

    print(f"[DONE] total_seen={total_seen}, total_upserted={total_upserted}")


In [None]:
KEYWORDS = [
    'laptop', 'notebook', 'ultrabook', 'gaming laptop',
    'tv', 'television', 'oled', 'qled', '4k tv', 'smart tv'
]
stream_keywords(KEYWORDS, country='US', max_per_keyword=5, sleep_sec=1.8)

[INFO] Searching 'laptop'...
[INFO] Upserted ASIN=B0FG4MGVJP | type=laptop | brand=hp | reviews=10
[INFO] Upserted ASIN=B0947BJ67M | type=laptop | brand=hp | reviews=10
[INFO] Upserted ASIN=B0DZDC3WW5 | type=laptop | brand=apple | reviews=10
[INFO] Upserted ASIN=B0FLXTK4HL | type=laptop | brand=hp | reviews=10
[INFO] Upserted ASIN=B0FLKNZJ1H | type=laptop | brand=hp | reviews=10
[INFO] Searching 'gaming laptop'...
[INFO] Upserted ASIN=B0DZZWMB2L | type=laptop | brand=asus | reviews=10
[INFO] Upserted ASIN=B0F6PLQ93N | type=laptop | brand=acer | reviews=9
[INFO] Upserted ASIN=B0FD3BL5QZ | type=laptop | brand=unknown | reviews=10
[INFO] Upserted ASIN=B0FCSC6STG | type=laptop | brand=unknown | reviews=10
[INFO] Upserted ASIN=B0DW1FVPK8 | type=laptop | brand=asus | reviews=10
[INFO] Searching 'ultrabook'...
[INFO] Upserted ASIN=B0FCYT7LYT | type=ultrabook | brand=hp | reviews=10
[INFO] Upserted ASIN=B0F22H6BL8 | type=laptop | brand=asus | reviews=10
[INFO] Upserted ASIN=B0DLHBYRPS | type=l

KeyboardInterrupt: 

In [None]:
if coll is not None:
    print('Docs in collection:', coll.count_documents({}))
    for x in coll.find({}, {'_id': 0}).limit(5):
        print(json.dumps(x, default=str)[:400] + '...')
else:
    print('[SKIP] Mongo not connected.')