# 03_stage_unprocessed_raw.ipynb

## 1. Objective

This notebook implements the **staging layer** of the Raycon Google Shopping data pipeline.

Its purpose is to **incrementally process newly ingested raw API responses** and transform them into structured, analysis-ready staging tables.

Specifically, this pipeline:

- Identifies raw search records that have **not yet been staged**
- Flattens and normalizes semi-structured Google Shopping JSON responses
- Handles the presence or absence of different result sections (uncategorized, categorized, and inline results)
- Inserts the transformed data into the `raycon.stg_searches` and `raycon.stg_results` tables

The notebook is designed to be **repeatable and safe to re-run**, ensuring that previously staged data is not duplicated.

## 2. Setup and Configuration
This section loads required libraries and establishes a connection to the PostgreSQL database using environment variables.

### 2.1 Imports

In [8]:
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option('future.no_silent_downcasting', True)
import json

from sqlalchemy import create_engine, text
import os
from dotenv import load_dotenv

## 2.2 Connect to Database

In [9]:
load_dotenv()

user = os.getenv('PGUSER')
password = os.getenv('PGPASSWORD')
pghost = os.getenv('PGHOST')
pgport = os.getenv('PGPORT')
pgdatabase = os.getenv('PGDATABASE')

engine = create_engine(
    f'postgresql+psycopg2://{user}:{password}@{pghost}:{pgport}/{pgdatabase}')

# Verify database connection by querying recent raw records
pd.read_sql("SELECT * FROM raycon.raw_google_shopping ORDER BY id DESC LIMIT 3", engine)

Unnamed: 0,id,pulled_at,keyword,page,response_json
0,98,2025-12-15 19:04:24.094346+00:00,bluetooth headphones,1,"{'filters': [{'type': 'Refine results', 'optio..."
1,97,2025-12-15 19:04:13.742109+00:00,wireless earbuds,1,"{'filters': [{'type': 'Refine results', 'optio..."
2,96,2025-12-15 19:04:01.556488+00:00,wireless headphones,1,"{'filters': [{'type': 'Refine results', 'optio..."


## 3. Pull Raw JSON Records Not Yet Staged
This step identifies raw Google Shopping search records that have not yet been processed into the staging tables.
Only raw records whose `id` does not appear in `raycon.stg_searches` are selected, allowing the staging pipeline to run incrementally without duplicating previously staged data.

In [10]:
# Query and load all raw rows that have not yet been staged
query = '''
SELECT id, pulled_at, keyword, page, response_json
FROM raycon.raw_google_shopping
WHERE id NOT IN (SELECT raw_id FROM raycon.stg_searches)
ORDER BY pulled_at;
'''

with engine.connect() as conn:
    df_raw = pd.read_sql(query, conn)
df_raw.tail(3)

Unnamed: 0,id,pulled_at,keyword,page,response_json
4,96,2025-12-15 19:04:01.556488+00:00,wireless headphones,1,"{'filters': [{'type': 'Refine results', 'optio..."
5,97,2025-12-15 19:04:13.742109+00:00,wireless earbuds,1,"{'filters': [{'type': 'Refine results', 'optio..."
6,98,2025-12-15 19:04:24.094346+00:00,bluetooth headphones,1,"{'filters': [{'type': 'Refine results', 'optio..."


## 4. Helper Functions

This section contains helper functions used to transform raw SerpAPI JSON
responses into staging-ready search- and product-level DataFrames.

The logic here is currently copied from `02_parse_raw` and defined inline to
support iterative development and debugging. These functions are written as
pure transformations (no database writes or side effects) and will be
extracted into reusable `.py` modules in a later refactor.

### 4.1 build_searches_for_keyword

In [11]:
def build_searches_for_keyword(keyword_raw):
    """
    Build the stg_searches record for a single raw search event.

    This function:
    - Takes the first row of df_raw (prototype mode)
    - Extracts identifiers (raw_id, pulled_at, keyword, page)
    - Extracts search parameters from response_json["search_parameters"]
    - Places everything into a clean, single-row DataFrame

    Returns:
        pd.DataFrame: one clean stg_search row
    """
    
     # Grab the first raw record (example row in this case)
    search_row = keyword_raw

     # Pull identifiers & search metadata from the raw table
    raw_id, pulled_at, keyword, page = search_row[['id', 'pulled_at', 'keyword', 'page']]
    
    # Extract the "search_parameters" object from the JSON
    params = search_row["response_json"]["search_parameters"]

    # Build the clean staging DataFrame
    search_df_clean = pd.DataFrame([{
    "raw_id": raw_id,
    "pulled_at": pulled_at,
    "keyword": keyword,
    "page": page,
    "location_used": params.get("location_used"),
    "location_requested": params.get("location_requested"),
    "gl": params.get("gl"),
    "hl": params.get("hl"),
    "device": params.get("device"),
    "num_results_requested": int(params.get("num")),
    "engine": params.get("engine"),
    "google_domain": params.get("google_domain"),
    }])
    return search_df_clean

### 4.2 transform_results_df

In [12]:
def transform_results_df(
    df_in,
    *,
    raw_id,
    keyword,
    page,
    pulled_at,
    module_type,
    module_label,
    module_index):
    """
    Take a raw shopping-results DataFrame for a single module
    and return a cleaned, standardized results DataFrame.

    - Drops unused SerpAPI fields
    - Normalizes `multiple_sources` to boolean
    - Renames extracted price fields
    - Adds module + search context columns
    - Reorders columns to match `stg_results` schema
    """
    # Work on a copy to avoid mutating the original input
    transform_df = df_in.copy()

    # 1) Drop columns we decided not to stage
    transform_df = transform_df.drop(columns=['snippet', 'price', 'delivery', 'old_price', 'thumbnail',
                                             'extensions', 'source_icon', 'product_link', 'serpapi_thumbnail', 
                                            'immersive_product_page_token', 'serpapi_immersive_product_api'],
                                    errors='ignore')

    # 2) Convert multiple_sources -> proper bool
    if 'multiple_sources' in transform_df:
        transform_df['multiple_sources'] = (
            transform_df['multiple_sources'].replace('True', True).fillna(False).astype(bool))
    else:
        transform_df['multiple_sources'] = False

    # 3) Create and nullify block_position for results which lack this field
    transform_df["block_position"] = transform_df.get("block_position", None)

    # 4) Rename fields to final names
    transform_df = transform_df.rename(columns={
        'position': 'position_in_module',
        'extracted_price': 'price',
        'extracted_old_price': 'old_price'
    })

    # 5) Attach module + search context
    transform_df = transform_df.assign(
        module_type=module_type,
        module_label=module_label,
        module_index=module_index,
        raw_id=raw_id,
        keyword=keyword,
        page=page,
        pulled_at=pulled_at
    )

    # 6) Reorder columns to match target table
    final_cols = [
    "raw_id",
    "keyword",
    "page",
    "pulled_at",
    "title",
    "product_id",
    "price",
    "old_price",
    "reviews",
    "rating",
    "source",
    "multiple_sources",
    "tag",
    "module_type",
    "module_label",
    "module_index",
    "block_position",
    "position_in_module"
    ]
    transform_df = transform_df.reindex(columns=final_cols)
    
    return transform_df

### 4.3 build_results_for_keyword

In [13]:
def build_results_for_keyword(keyword_raw):
    """
    Build a full results DataFrame for ONE raw keyword request row.

    Steps:
    - Pull raw_id / keyword / page / pulled_at from df_raw
    - Flatten `shopping_results` (all products)
    - Flatten each entry in `categorized_shopping_results`
    - Run everything through `transform_results_df`
    - Union uncategorized + categorized into one DataFrame
    """
     # 1) Grab the first raw row (later this will be parameterized / looped)
    keyword_search = keyword_raw

     # 2) Extract search metadata used for context columns
    raw_id, pulled_at, keyword, page = keyword_search.loc[['id', 'pulled_at', 'keyword', 'page']]

    # 3) Extract JSON extract from the keyword request
    keyword_json = keyword_search.loc['response_json']

    # 4) Create a blank dataframe list
    df_list = []

    # 5) Build uncategorized results
    uncategorized_df_raw = pd.DataFrame(keyword_json['shopping_results'])
    uncategorized_df_clean = transform_results_df(uncategorized_df_raw, 
                                                raw_id=raw_id, pulled_at=pulled_at, keyword=keyword, page=page,
                                                module_type='all_products', module_label='All products', module_index=99)
    
    df_list.append(uncategorized_df_clean)

    # 6) (If they exist) Loop over categorized modules, flatten + transform each
    categorized_df_raw = pd.DataFrame(keyword_json.get('categorized_shopping_results', []))
    if categorized_df_raw.empty:
        categorized_df_clean = categorized_df_raw
    else:
        for i in range(len(categorized_df_raw)):
            category_title = categorized_df_raw['title'][i]
            per_category_results_raw = pd.json_normalize(categorized_df_raw['shopping_results'][i])
            per_category_results_clean = transform_results_df(per_category_results_raw,
                                                             raw_id=raw_id, pulled_at=pulled_at, keyword=keyword, page=page,
                                                             module_type='categorized_products', module_label=categorized_df_raw['title'][i], module_index=i+1)
            if i == 0:
                categorized_df_clean = per_category_results_clean.copy()
            else:
                categorized_df_clean = (pd.concat([categorized_df_clean, per_category_results_clean], 
                                            ignore_index=True))
            
    df_list.append(categorized_df_clean)

    #7) (If they exist) Build inline results
    inline_df_raw = pd.DataFrame(keyword_json.get('inline_shopping_results', []))
    if inline_df_raw.empty:
                inline_df_clean = inline_df_raw
    else:
        inline_df_clean = transform_results_df(inline_df_raw, 
                                                raw_id=raw_id, pulled_at=pulled_at, keyword=keyword, page=page,
                                                module_type='inline_products', module_label='Inline products', module_index=100)
    
    df_list.append(inline_df_clean)

    # 8) Return union of uncategorized + categorized (if exists) + inline (if exists) into one df
    return pd.concat(df_list, ignore_index=True)

## 5. Build Staging DataFrames from Unprocessed Searches

Iterate over all raw search rows that have not yet been staged and transform
each into:

- One search-level staging record (`stg_searches`)
- A set of product-level staging records (`stg_results`)

Results are accumulated in memory and concatenated once per table to avoid
repeated database writes.

In [14]:
# Create lists to store the staged results of each raw record
search_rows = []
results_dfs = []

# Append each raw record to its respective list
for _, raw_row in df_raw.iterrows():
    search_rows.append(build_searches_for_keyword(raw_row))
    results_dfs.append(build_results_for_keyword(raw_row))

# Union the lists' dataframes together to create the final staged dataframes
stg_searches_df = pd.concat(search_rows, ignore_index=True)
stg_results_df = pd.concat(results_dfs, ignore_index=True)

## 6. Persist Staged Data

Append the transformed search-level and product-level DataFrames to their
corresponding staging tables within a single database transaction to ensure
atomicity.

In [15]:
# Persist staged data atomically (either both tables succeed or both fail)
with engine.begin() as conn:
    stg_searches_df.to_sql(
        "stg_searches", conn, schema="raycon",
        if_exists="append", index=False)

    stg_results_df.to_sql(
        "stg_results", conn, schema="raycon",
        if_exists="append", index=False)