<a href="https://colab.research.google.com/github/TylerWichman/mgmt467-analytics-portfolio/blob/main/Labs/Unit2_Lab1_PromptPlusExamples_Colab_Kaggle_GCS_BQ_DQ.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# MGMT 467 ‚Äî Prompt-Driven Lab (with Commented Examples)
## Kaggle ‚ûú Google Cloud Storage ‚ûú BigQuery ‚ûú Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you‚Äôll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) ‚Üí Stage on GCS ‚Üí Load into BigQuery ‚Üí Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup ‚Äî What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2‚Äì3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [1]:
# # EXAMPLE (from LLM) ‚Äî Auth + Project/Region (commented; write your own cell using the prompt)
# # from google.colab import auth
# # auth.authenticate_user()
# #
# # import os
# # PROJECT_ID = input("Enter your GCP Project ID: ").strip()
# # REGION = "us-central1"  # keep consistent; change if instructed
# # os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
# # print("Project:", PROJECT_ID, "| Region:", REGION)
# #
# # # Set active project for gcloud/BigQuery CLI
# # !gcloud config set project $GOOGLE_CLOUD_PROJECT
# # !gcloud config get-value project
# # # Done: Auth + Project/Region set

In [2]:
from google.colab import auth
auth.authenticate_user()  # Opens a sign-in flow to authenticate with your Google account

# Prompt for your GCP Project ID and set a default region (editable)
PROJECT_ID = input("Enter your Google Cloud Project ID: ").strip()
REGION = "us-central1"  # Default region; change this if you‚Äôre using another region

# Export environment variables for use by SDKs and notebooks
import os
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION

# Configure gcloud CLI to use your project
!gcloud config set project $GOOGLE_CLOUD_PROJECT

# Print confirmation
print(f"Project set to: {PROJECT_ID}")
print(f"Region set to: {REGION}")

Enter your Google Cloud Project ID: mgmt-labs-unit-two
Updated property [core/project].
Project set to: mgmt-labs-unit-two
Region set to: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don‚Äôt?

In [3]:
!echo "Active Project: $(gcloud config get-value project)"
!echo "Region: $REGION"

Active Project: mgmt-labs-unit-two
Region: us-central1


This ensures that the notebook is consistent and reproducable, if not set future queries may fail.

## 1) Kaggle API ‚Äî What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [4]:
# # EXAMPLE (from LLM) ‚Äî Kaggle setup (commented)
# # from google.colab import files
# # print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
# # uploaded = files.upload()
# #
# # import os
# # os.makedirs('/root/.kaggle', exist_ok=True)
# # with open('/root/.kaggle/kaggle.json', 'wb') as f:
# #     f.write(uploaded[list(uploaded.keys())[0]])
# # os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only
# #
# # !kaggle --version

In [5]:
from google.colab import files
import os

# Prompt user to upload kaggle.json (contains your Kaggle API key)
uploaded = files.upload()

# Create the hidden .kaggle directory and save credentials
os.makedirs(os.path.expanduser("~/.kaggle"), exist_ok=True)
with open(os.path.expanduser("~/.kaggle/kaggle.json"), "wb") as f:
    f.write(uploaded["kaggle.json"])

# Set strict file permissions for security (owner read/write only)
os.chmod(os.path.expanduser("~/.kaggle/kaggle.json"), 0o600)

# Verify installation by printing Kaggle CLI version
!kaggle --version

Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [6]:
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

Setting the file permissions to 0600 ensures that only the authenticated can access the API token. Without this other users can copy the token

## 2) Download & unzip dataset ‚Äî What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [7]:
# #EXAMPLE (from LLM) ‚Äî Download & unzip (commented)
# !mkdir -p /content/data/raw
# !kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
# !unzip -o /content/data/*.zip -d /content/data/raw
# # List CSV inventory
# !ls -lh /content/data/raw/*.csv

In [8]:
import os
import pandas as pd

# 1Ô∏è‚É£ Create directory structure
os.makedirs("/content/data/raw", exist_ok=True)  # Where extracted files will go
print("‚úÖ Created directories: /content/data/raw")

# 2Ô∏è‚É£ Download dataset using Kaggle CLI (replace with your dataset name)
# Example: "kaggle datasets download -d zynicide/wine-reviews"
dataset = input("Enter Kaggle dataset identifier (e.g., zynicide/wine-reviews): ").strip()
!kaggle datasets download -d {dataset} -p /content/data --force

# 3Ô∏è‚É£ Unzip dataset into /content/data/raw (overwrite OK)
!unzip -o /content/data/*.zip -d /content/data/raw

# 4Ô∏è‚É£ List all CSV files and their sizes in a neat table
csv_files = []
for root, _, files in os.walk("/content/data/raw"):
    for file in files:
        if file.endswith(".csv"):
            path = os.path.join(root, file)
            size_mb = os.path.getsize(path) / (1024 * 1024)
            csv_files.append({"File": path.replace("/content/", ""), "Size (MB)": round(size_mb, 2)})

# Display summary table
if csv_files:
    df = pd.DataFrame(csv_files)
    display(df)
else:
    print("No CSV files found in /content/data/raw")


‚úÖ Created directories: /content/data/raw
Enter Kaggle dataset identifier (e.g., zynicide/wine-reviews): sayeeduddin/netflix-2025user-behavior-dataset-210k-records
Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 776MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  


Unnamed: 0,File,Size (MB)
0,data/raw/movies.csv,0.11
1,data/raw/watch_history.csv,8.84
2,data/raw/search_logs.csv,2.15
3,data/raw/users.csv,1.53
4,data/raw/recommendation_logs.csv,4.48
5,data/raw/reviews.csv,1.78


### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [9]:
import os, glob

csv_files = glob.glob("/content/data/raw/*.csv")

assert len(csv_files) == 6, f"‚ùå Expected 6 CSVs, found {len(csv_files)}"
print("‚úÖ Found 6 CSV files:")
for f in csv_files:
    print(" -", os.path.basename(f))

‚úÖ Found 6 CSV files:
 - movies.csv
 - watch_history.csv
 - search_logs.csv
 - users.csv
 - recommendation_logs.csv
 - reviews.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

It allows you to quickly detect missing, duplicate, or oversized files. It also guarantees that all inputs are known, verified, and consistent, preventing silent errors or incomplete analyses.

## 3) Create GCS bucket & upload ‚Äî What & Why
Stage in GCS ‚Üí consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [10]:
# # EXAMPLE (from LLM) ‚Äî GCS staging (commented)
# # import uuid, os
# # bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
# # os.environ["BUCKET_NAME"] = bucket_name
# # !gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION
# # !gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/
# # print("Bucket:", bucket_name)
# # # Verify contents
# # !gcloud storage ls gs://$BUCKET_NAME/netflix/

In [11]:
import os, random, string

# 1Ô∏è‚É£ Generate unique bucket name and set region (from earlier setup)
REGION = os.getenv("REGION", "us-central1")
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "")
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
BUCKET_NAME = f"{PROJECT_ID}-netflix-{suffix}"
os.environ["BUCKET_NAME"] = BUCKET_NAME

# 2Ô∏è‚É£ Create the bucket in the specified region
!gcloud storage buckets create gs://$BUCKET_NAME --project=$PROJECT_ID --location=$REGION --quiet

# 3Ô∏è‚É£ Upload all CSVs to a "netflix" folder in the bucket
!gsutil -m cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# 4Ô∏è‚É£ Print confirmation and explain purpose
print(f"‚úÖ Bucket created: gs://{BUCKET_NAME}")
print(f"üåé Region: {REGION}")
print("\nüí° Staging data in Cloud Storage helps with reproducibility, scalability, and secure access across GCP services (e.g., BigQuery, Vertex AI).")

Creating gs://mgmt-labs-unit-two-netflix-cq25ge/...
Copying file:///content/data/raw/movies.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/users.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/watch_history.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/reviews.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/recommendation_logs.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/search_logs.csv [Content-Type=text/csv]...
\ [6/6 files][ 18.9 MiB/ 18.9 MiB] 100% Done                                    
Operation completed over 6 objects/18.9 MiB.                                     
‚úÖ Bucket created: gs://mgmt-labs-unit-two-netflix-cq25ge
üåé Region: us-central1

üí° Staging data in Cloud Storage helps with reproducibility, scalability, and secure access across GCP services (e.g., BigQuery, Vertex AI).


### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [12]:
!gsutil ls -l gs://$BUCKET_NAME/netflix/

    115942  2025-10-24T18:05:34Z  gs://mgmt-labs-unit-two-netflix-cq25ge/netflix/movies.csv
   4695557  2025-10-24T18:05:35Z  gs://mgmt-labs-unit-two-netflix-cq25ge/netflix/recommendation_logs.csv
   1861942  2025-10-24T18:05:35Z  gs://mgmt-labs-unit-two-netflix-cq25ge/netflix/reviews.csv
   2250902  2025-10-24T18:05:35Z  gs://mgmt-labs-unit-two-netflix-cq25ge/netflix/search_logs.csv
   1606820  2025-10-24T18:05:34Z  gs://mgmt-labs-unit-two-netflix-cq25ge/netflix/users.csv
   9269425  2025-10-24T18:05:35Z  gs://mgmt-labs-unit-two-netflix-cq25ge/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88 MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

One benefit is that it helps the scalability and reliability. Another is that it eases access control and reproducibility.

## 4) BigQuery dataset & loads ‚Äî What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we‚Äôll enforce schemas later).

In [13]:
import os

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "")
BUCKET_NAME = os.getenv("BUCKET_NAME", "")
DATASET_ID = "netflix"

# 1Ô∏è‚É£ Create the dataset (ignore error if it already exists)
!bq --location=$REGION mk --dataset --description "Netflix staging dataset (autodetect schemas)" $PROJECT_ID:$DATASET_ID || echo "Dataset may already exist."

# 2Ô∏è‚É£ Load all CSVs from the bucket into separate tables (autodetect schema)
# Each table name is derived from the file name before ".csv"
import subprocess

csvs = !gsutil ls gs://$BUCKET_NAME/netflix/*.csv
for path in csvs:
    table_name = os.path.basename(path).replace(".csv", "")
    print(f"‚è≥ Loading {table_name} ...")
    subprocess.run([
        "bq", "load",
        "--autodetect",
        "--source_format=CSV",
        f"{DATASET_ID}.{table_name}",
        path
    ])
    print(f"‚úÖ Loaded: {table_name}")

# 3Ô∏è‚É£ Confirm tables
!bq ls $PROJECT_ID:$DATASET_ID

BigQuery error in mk operation: Dataset 'mgmt-labs-unit-two:netflix' already
exists.
Dataset may already exist.
‚è≥ Loading movies ...
‚úÖ Loaded: movies
‚è≥ Loading recommendation_logs ...
‚úÖ Loaded: recommendation_logs
‚è≥ Loading reviews ...
‚úÖ Loaded: reviews
‚è≥ Loading search_logs ...
‚úÖ Loaded: search_logs
‚è≥ Loading users ...
‚úÖ Loaded: users
‚è≥ Loading watch_history ...
‚úÖ Loaded: watch_history
         tableId           Type    Labels   Time Partitioning   Clustered Fields  
 ------------------------ ------- -------- ------------------- ------------------ 
  activity_filled          TABLE                                                  
  activity_monthly         TABLE                                                  
  activity_roll3           TABLE                                                  
  calendar_months          TABLE                                                  
  churn_predictions_lite   TABLE                                                  
  fea

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [14]:
# # EXAMPLE (from LLM) ‚Äî BigQuery dataset (commented)
# # DATASET="netflix"
# # # Attempt to create; ignore if exists
# # !bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

In [15]:
import os, subprocess

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "")
DATASET_ID = "netflix"
LOCATION = "US"

# Check if dataset exists
result = subprocess.run(
    ["bq", "ls", "--project_id", PROJECT_ID, DATASET_ID],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

if "Not found" in result.stderr.decode():
    print(f"üì¶ Creating dataset '{PROJECT_ID}:{DATASET_ID}' in {LOCATION} ...")
    subprocess.run(["bq", "mk", "--dataset", f"--location={LOCATION}", f"{PROJECT_ID}:{DATASET_ID}"])
    print(f"‚úÖ Dataset created: {PROJECT_ID}:{DATASET_ID}")
else:
    print(f"‚ÑπÔ∏è Dataset '{PROJECT_ID}:{DATASET_ID}' already exists :) ‚Äî skipping creation.")

‚ÑπÔ∏è Dataset 'mgmt-labs-unit-two:netflix' already exists :) ‚Äî skipping creation.


In [16]:
# # EXAMPLE (from LLM) ‚Äî Load tables (commented)
# # tables = {
# #   "users": "users.csv",
# #   "movies": "movies.csv",
# #   "watch_history": "watch_history.csv",
# #   "recommendation_logs": "recommendation_logs.csv",
# #   "search_logs": "search_logs.csv",
# #   "reviews": "reviews.csv",
# # }
# # import os
# # for tbl, fname in tables.items():
# #   src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
# #   print("Loading", tbl, "from", src)
# #   !bq load --skip_leading_rows=1 --autodetect --source_format=CSV $DATASET.$tbl $src
# #
# # # Row counts
# # for tbl in tables.keys():
# #   !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `${GOOGLE_CLOUD_PROJECT}.netflix.{tbl}`".format(tbl=tbl)

In [17]:
import subprocess, os

BUCKET_NAME = os.getenv("BUCKET_NAME", "")
DATASET_ID = "netflix"
TABLES = ["users", "movies", "watch_history", "recommendation_logs", "search_logs", "reviews"]

# 1Ô∏è‚É£ Load each CSV to its respective BigQuery table
for table in TABLES:
    print(f"‚è≥ Loading {table} ...")
    subprocess.run([
        "bq", "load",
        "--autodetect",
        "--skip_leading_rows=1",
        "--source_format=CSV",
        f"{DATASET_ID}.{table}",
        f"gs://{BUCKET_NAME}/netflix/{table}.csv"
    ])
    print(f"‚úÖ Loaded: {table}")

# 2Ô∏è‚É£ Run quick row-count queries to verify data
print("\nüìä Row counts:")
for table in TABLES:
    print(f"--- {table} ---")
    subprocess.run(["bq", "query", "--use_legacy_sql=false", f"SELECT COUNT(*) AS rows FROM `{PROJECT_ID}.{DATASET_ID}.{table}`"])

‚è≥ Loading users ...
‚úÖ Loaded: users
‚è≥ Loading movies ...
‚úÖ Loaded: movies
‚è≥ Loading watch_history ...
‚úÖ Loaded: watch_history
‚è≥ Loading recommendation_logs ...
‚úÖ Loaded: recommendation_logs
‚è≥ Loading search_logs ...
‚úÖ Loaded: search_logs
‚è≥ Loading reviews ...
‚úÖ Loaded: reviews

üìä Row counts:
--- users ---
--- movies ---
--- watch_history ---
--- recommendation_logs ---
--- search_logs ---
--- reviews ---


### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [18]:
import os
import pandas as pd
from google.cloud import bigquery

# Ensure the project environment variable exists
project_id = 'mgmt-labs-unit-two'
if not project_id:
    raise ValueError("‚ùå GOOGLE_CLOUD_PROJECT environment variable not set. Run the GCP auth/setup cell first.")

dataset_id = "netflix"
tables = ['users', 'movies', 'watch_history', 'recommendation_logs', 'search_logs', 'reviews']

# Initialize BigQuery client
client = bigquery.Client(project=project_id)

results = []
for table_id in tables:
    query = f"""
    SELECT
      '{table_id}' as table_name,
      COUNT(*) as row_count
    FROM
      `{project_id}.{dataset_id}.{table_id}`;
    """
    query_job = client.query(query)
    row = list(query_job.result())[0]
    results.append({"table_name": row["table_name"], "row_count": row["row_count"]})

df = pd.DataFrame(results)

print(f"‚úÖ Row counts for tables in {project_id}.{dataset_id}:")
display(df)

‚úÖ Row counts for tables in mgmt-labs-unit-two.netflix:


Unnamed: 0,table_name,row_count
0,users,61800
1,movies,6240
2,watch_history,630000
3,recommendation_logs,312000
4,search_logs,159000
5,reviews,92700


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

Autodetect is good for quick ad-hoc explorations while more advanced datasets and pipelines should have explicit enforced schemas. This prevents type drift and ensures data validity.

## 5) Data Quality (DQ) ‚Äî Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) ‚Äî What & Why
Measure % missing and check if missingness depends on another variable (MAR) ‚Üí potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [19]:
# EXAMPLE (from LLM) ‚Äî Missingness profile (commented)
# -- Users: % missing per column
# WITH base AS (
#   SELECT COUNT(*) n,
#          COUNTIF(region IS NULL) miss_region,
#          COUNTIF(plan_tier IS NULL) miss_plan,
#          COUNTIF(age_band IS NULL) miss_age
#   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# )
# SELECT n,
#        ROUND(100*miss_region/n,2) AS pct_missing_region,
#        ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
#        ROUND(100*miss_age/n,2)    AS pct_missing_age_band
# FROM base;

In [20]:
from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNT(*) AS total_rows,
  SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) AS missing_region,
  ROUND(SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_region,
  SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) AS missing_plan_tier,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_plan_tier,
  SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) AS missing_age_band,
  ROUND(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_age_band
FROM
  `{project_id}.netflix.users`
"""

df_missing_summary = client.query(query).to_dataframe()
print("üìä Missing counts and percentages for netflix.users:")
display(df_missing_summary)

üìä Missing counts and percentages for netflix.users:


Unnamed: 0,total_rows,missing_region,pct_missing_region,missing_plan_tier,pct_missing_plan_tier,missing_age_band,pct_missing_age_band
0,61800,0,0.0,0,0.0,7374,11.93


In [21]:
# # EXAMPLE (from LLM) ‚Äî MAR by region (commented)
# # SELECT region,
# #        COUNT(*) AS n,
# #        ROUND(100*COUNTIF(plan_tier IS NULL)/COUNT(*),2) AS pct_missing_plan_tier
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # GROUP BY region
# # ORDER BY pct_missing_plan_tier DESC;

In [22]:
from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  country,
  COUNT(*) AS total_rows,
  SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) AS missing_plan_tier,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_plan_tier
FROM
  `{project_id}.netflix.users`
GROUP BY
  country
ORDER BY
  pct_missing_plan_tier DESC
"""

df_plan_tier_by_region = client.query(query).to_dataframe()
print("üìä % of missing plan_tier by region (descending):")
display(df_plan_tier_by_region)


üìä % of missing plan_tier by region (descending):


Unnamed: 0,country,total_rows,missing_plan_tier,pct_missing_plan_tier
0,Canada,18576,0,0.0
1,USA,43224,0,0.0


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [23]:
from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  ROUND(SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_region,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_plan_tier,
  ROUND(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_missing_age_band
FROM
  `{project_id}.netflix.users`
"""

df_verify = client.query(query).to_dataframe()
print("‚úÖ Verification ‚Äî Missingness percentages (rounded to 2 decimals):")
display(df_verify)

‚úÖ Verification ‚Äî Missingness percentages (rounded to 2 decimals):


Unnamed: 0,pct_missing_region,pct_missing_plan_tier,pct_missing_age_band
0,0.0,0.0,11.93


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

The age band data is most commonly missing. The other two columns have none missing. This is most likely MAR as its missing based on other factors such as privacy preferences or account type.

### 5.2 Duplicates (watch_history) ‚Äî What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [24]:
# # EXAMPLE (from LLM) ‚Äî Detect duplicate groups (commented)
# # SELECT user_id, movie_id, event_ts, device_type, COUNT(*) AS dup_count
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history`
# # GROUP BY user_id, movie_id, event_ts, device_type
# # HAVING dup_count > 1
# # ORDER BY dup_count DESC
# # LIMIT 20;

In [36]:
# Goal: Find potential duplicate records in netflix.watch_history
# Duplicates are defined as rows sharing the same user_id, movie_id, watch_date, and device_type
# We count how many duplicates exist per group and list the top 20 by frequency

from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  user_id,
  movie_id,
  watch_date,
  device_type,
  COUNT(*) AS duplicate_count
FROM
  `netflix.watch_history`
GROUP BY
  user_id, movie_id, watch_date, device_type
HAVING
  COUNT(*) > 1  -- Only show actual duplicates
ORDER BY
  duplicate_count DESC
LIMIT 20;
"""

df_dupes = client.query(query).to_dataframe()
print("üìä Top 20 duplicate groups in netflix.watch_history:")
display(df_dupes)

üìä Top 20 duplicate groups in netflix.watch_history:


Unnamed: 0,user_id,movie_id,watch_date,device_type,duplicate_count
0,user_03310,movie_0640,2024-09-08,Smart TV,24
1,user_00391,movie_0893,2024-08-26,Laptop,24
2,user_01870,movie_0844,2024-06-02,Laptop,18
3,user_07594,movie_0133,2025-03-24,Laptop,18
4,user_01292,movie_0231,2024-07-05,Laptop,18
5,user_03021,movie_0602,2025-02-23,Laptop,18
6,user_09564,movie_0552,2025-01-11,Laptop,18
7,user_03348,movie_0688,2024-01-22,Desktop,18
8,user_02652,movie_0352,2024-10-22,Desktop,18
9,user_01182,movie_0794,2025-07-03,Desktop,18


In [39]:
from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

# Step 1: Create the deduplicated table
query = """
CREATE OR REPLACE TABLE `netflix.watch_history_dedup` AS
WITH ranked AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC, session_id ASC
    ) AS rn
  FROM
    `netflix.watch_history`
)
SELECT *
FROM
  ranked
WHERE
  rn = 1;
"""
client.query(query)
print("‚úÖ watch_history_dedup table created successfully.")

# Step 2: Now read from the table
fetch_query = "SELECT * FROM `netflix.watch_history_dedup` LIMIT 10"
watch_history_dedup = client.query(fetch_query).to_dataframe()
display(watch_history_dedup)

‚úÖ watch_history_dedup table created successfully.


Unnamed: 0,session_id,user_id,movie_id,watch_date,device_type,watch_duration_minutes,progress_percentage,action,quality,location_country,is_download,user_rating,rn
0,session_057017,user_02153,movie_0095,2024-01-01,Desktop,135.3,0.6,paused,HD,USA,False,,1
1,session_075534,user_05979,movie_0348,2024-01-01,Desktop,14.3,77.6,completed,HD,Canada,True,,1
2,session_082202,user_07089,movie_0942,2024-01-01,Desktop,170.1,72.3,stopped,SD,USA,False,4.0,1
3,session_080099,user_09560,movie_0935,2024-01-01,Desktop,,88.7,stopped,Ultra HD,USA,False,3.0,1
4,session_022372,user_07665,movie_0233,2024-01-01,Desktop,,77.3,started,SD,USA,False,,1
5,session_017662,user_07716,movie_0280,2024-01-01,Desktop,63.6,34.2,completed,HD,Canada,False,,1
6,session_087787,user_02013,movie_0023,2024-01-01,Desktop,85.7,34.3,paused,HD,Canada,False,,1
7,session_029001,user_01653,movie_0155,2024-01-01,Desktop,22.2,74.0,started,HD,Canada,False,,1
8,session_015067,user_00074,movie_0149,2024-01-01,Desktop,4.1,16.2,stopped,4K,USA,True,5.0,1
9,session_038793,user_08997,movie_0563,2024-01-01,Desktop,82.7,45.5,completed,HD,USA,False,,1


In [None]:
# # EXAMPLE (from LLM) ‚Äî Keep-one policy (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` AS
# # SELECT * EXCEPT(rk) FROM (
# #   SELECT h.*,
# #          ROW_NUMBER() OVER (
# #            PARTITION BY user_id, movie_id, event_ts, device_type
# #            ORDER BY progress_ratio DESC, minutes_watched DESC
# #          ) AS rk
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history` h
# # )
# # WHERE rk = 1;

### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [40]:
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = """
WITH
raw AS (
  SELECT COUNT(*) AS total_rows_raw FROM `netflix.watch_history`
),
dedup AS (
  SELECT COUNT(*) AS total_rows_dedup FROM `netflix.watch_history_dedup`
)
SELECT
  raw.total_rows_raw AS total_rows_raw,
  dedup.total_rows_dedup AS total_rows_dedup,
  (raw.total_rows_raw - dedup.total_rows_dedup) AS duplicates_removed,
  ROUND((raw.total_rows_raw - dedup.total_rows_dedup) / raw.total_rows_raw * 100, 2) AS pct_reduction
FROM
  raw, dedup;
"""
client.query(query).to_dataframe()

Unnamed: 0,total_rows_raw,total_rows_dedup,duplicates_removed,pct_reduction
0,630000,100000,530000,84.13


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

A duplicate can occur due to natural human error or a genuine repeated event that looks very similar. A system generated duplicate can occur due to data collection or pipeline issues. They can corrupt labels by inflacting metrics and adding bias to ML labels.

### 5.3 Outliers (minutes_watched) ‚Äî What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
# # EXAMPLE (from LLM) ‚Äî IQR outlier rate (commented)
# # WITH dist AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(1)] AS q1,
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(3)] AS q3
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # bounds AS (
# #   SELECT q1, q3, (q3-q1) AS iqr,
# #          q1 - 1.5*(q3-q1) AS lo,
# #          q3 + 1.5*(q3-q1) AS hi
# #   FROM dist
# # )
# # SELECT
# #   COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi) AS outliers,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi)/COUNT(*),2) AS pct_outliers
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h
# # CROSS JOIN bounds b;

In [45]:
from google.cloud import bigquery
import os, pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
WITH stats AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100) AS q
  FROM `{project_id}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT
    q[OFFSET(25)] AS Q1,
    q[OFFSET(75)] AS Q3
  FROM stats
),
outliers AS (
  SELECT
    COUNTIF(watch_duration_minutes < Q1 - 1.5*(Q3 - Q1)
         OR watch_duration_minutes > Q3 + 1.5*(Q3 - Q1)) AS outlier_rows,
    COUNT(*) AS total_rows
  FROM `{project_id}.netflix.watch_history_dedup`, bounds
)
SELECT
  Q1,
  Q3,
  ROUND(Q3 - Q1, 2) AS IQR,
  ROUND((outlier_rows / total_rows) * 100, 2) AS pct_outliers
FROM
  bounds, outliers;
"""

df_iqr = client.query(query).to_dataframe()
print("üìä IQR bounds and % of outliers in minutes_watched:")
display(df_iqr)

üìä IQR bounds and % of outliers in minutes_watched:


Unnamed: 0,Q1,Q3,IQR,pct_outliers
0,29.1,82.4,53.3,3.53


In [48]:
from google.cloud import bigquery
import os, pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

# Step 1: Compute P01 and P99 boundaries for watch_duration_minutes using a separate query
quantile_query = f"""
SELECT
  APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
  APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
FROM `{project_id}.netflix.watch_history_dedup`
"""
quantile_results = client.query(quantile_query).to_dataframe().iloc[0]
p01 = quantile_results['p01']
p99 = quantile_results['p99']

print(f"Computed P01: {p01}, P99: {p99}")

# Step 2: Create the capped table and summarize quantiles before vs after capping
# Now that we have the values, we can use them directly in the SQL query
query = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
SELECT
  *,
  CASE
    WHEN watch_duration_minutes < {p01} THEN {p01}
    WHEN watch_duration_minutes > {p99} THEN {p99}
    ELSE watch_duration_minutes
  END AS minutes_watched_capped
FROM `{project_id}.netflix.watch_history_dedup`;

-- Summarize quantiles before vs after
WITH before AS (
  SELECT
    'Before Capping' AS stage,
    MIN(watch_duration_minutes) AS min_val,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(25)] AS p25,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(50)] AS median,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(75)] AS p75,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99,
    MAX(watch_duration_minutes) AS max_val
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT
    'After Capping' AS stage,
    MIN(minutes_watched_capped) AS min_val,
    APPROX_QUANTILES(minutes_watched_capped, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(minutes_watched_capped, 100)[OFFSET(25)] AS p25,
    APPROX_QUANTILES(minutes_watched_capped, 100)[OFFSET(50)] AS median,
    APPROX_QUANTILES(minutes_watched_capped, 100)[OFFSET(75)] AS p75,
    APPROX_QUANTILES(minutes_watched_capped, 100)[OFFSET(99)] AS p99,
    MAX(minutes_watched_capped) AS max_val
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after
"""

df_quantiles = client.query(query).to_dataframe()
print("‚úÖ watch_history_robust table created successfully and quantile summaries retrieved.")
display(df_quantiles)

Computed P01: 4.4, P99: 366.0
‚úÖ watch_history_robust table created successfully and quantile summaries retrieved.


Unnamed: 0,stage,min_val,p01,p25,median,p75,p99,max_val
0,After Capping,4.4,4.5,29.1,51.2,82.4,358.1,366.0
1,Before Capping,0.2,4.4,29.1,51.2,82.4,366.0,799.3


In [None]:
# # EXAMPLE (from LLM) ‚Äî Winsorize + quantiles (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust` AS
# # WITH q AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(1)]  AS p01,
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(98)] AS p99
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # )
# # SELECT
# #   h.*,
# #   GREATEST(q.p01, LEAST(q.p99, h.minutes_watched)) AS minutes_watched_capped
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h, q;
# #
# # -- Quantiles before vs after
# # WITH before AS (
# #   SELECT 'before' AS which, APPROX_QUANTILES(minutes_watched, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # after AS (
# #   SELECT 'after' AS which, APPROX_QUANTILES(minutes_watched_capped, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`
# # )
# # SELECT * FROM before UNION ALL SELECT * FROM after;

### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [50]:
from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  'Before Capping' AS stage,
  MIN(watch_duration_minutes) AS min_val,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_val,
  MAX(watch_duration_minutes) AS max_val
FROM
  `{project_id}.netflix.watch_history_dedup`
UNION ALL
SELECT
  'After Capping' AS stage,
  MIN(minutes_watched_capped) AS min_val,
  APPROX_QUANTILES(minutes_watched_capped, 2)[OFFSET(1)] AS median_val,
  MAX(minutes_watched_capped) AS max_val
FROM
  `{project_id}.netflix.watch_history_robust`;
"""

df_verify_capping = client.query(query).to_dataframe()
print("‚úÖ Verification - Min, Median, Max before and after capping:")
display(df_verify_capping)

‚úÖ Verification - Min, Median, Max before and after capping:


Unnamed: 0,stage,min_val,median_val,max_val
0,Before Capping,0.2,51.0,799.3
1,After Capping,4.4,51.4,366.0


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping can be harmful if otuliers are still important figures, distribution is naturally tailed to extremes, and if feature scaling is the more effective solution. Tree-based models are less sensitive to outliers because they split data up based on thresholds rather than taking them as exact values.

### 5.4 Business anomaly flags ‚Äî What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1‚Äì2 comments.


In [None]:
# # EXAMPLE (from LLM) ‚Äî flag_binge (commented)
# # SELECT
# #   COUNTIF(minutes_watched > 8*60) AS sessions_over_8h,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(minutes_watched > 8*60)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`;

In [59]:
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

# Identify unusually long watch sessions (> 8 hours = 480 minutes)
query = f"""
-- Flag binge sessions (total duration over 8 hours)
WITH flagged AS (
  SELECT
    *,
    CASE WHEN minutes_watched_capped > 480 THEN 1 ELSE 0 END AS flag_binge
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT
  COUNTIF(flag_binge = 1) AS binge_sessions,
  COUNT(*) AS total_sessions,
  ROUND(COUNTIF(flag_binge = 1) / COUNT(*) * 100, 2) AS pct_binge
FROM flagged;
"""
df_binge = client.query(query).to_dataframe()
print("üé¨ Binge session summary (>8 hours):")
display(df_binge)

üé¨ Binge session summary (>8 hours):


Unnamed: 0,binge_sessions,total_sessions,pct_binge
0,0,100000,0.0


In [None]:
# # EXAMPLE (from LLM) ‚Äî flag_age_extreme (commented)
# # SELECT
# #   COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #           CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100) AS extreme_age_rows,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #                     CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`;

In [57]:
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

# Identify implausible ages parsed from 'age' column
query = f"""
-- Some datasets store age as text ranges; we parse numeric part if possible
WITH parsed AS (
  SELECT
    *,
    SAFE_CAST(REGEXP_EXTRACT(CAST(age AS STRING), r'\\d+') AS INT64) AS age_value
  FROM `{project_id}.netflix.users`
),
flagged AS (
  SELECT
    *,
    CASE WHEN age_value < 10 OR age_value > 100 THEN 1 ELSE 0 END AS flag_age_extreme
  FROM parsed
)
SELECT
  COUNTIF(flag_age_extreme = 1) AS extreme_age_users,
  COUNT(*) AS total_users,
  ROUND(COUNTIF(flag_age_extreme = 1) / COUNT(*) * 100, 2) AS pct_extreme_age
FROM flagged;
"""
df_age = client.query(query).to_dataframe()
print("üë§ Extreme age summary (<10 or >100):")
display(df_age)

üë§ Extreme age summary (<10 or >100):


Unnamed: 0,extreme_age_users,total_users,pct_extreme_age
0,1074,61800,1.74


In [58]:
from google.cloud import bigquery
import os
import pandas as pd

project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

# Identify duration anomalies in movies (duration < 15 min or > 480 min)
query = f"""
-- Flag movies with unusually short or long durations
WITH flagged AS (
  SELECT
    *,
    CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END AS flag_duration_anomaly
  FROM `{project_id}.netflix.movies`
)
SELECT
  COUNTIF(flag_duration_anomaly = 1) AS duration_anomaly_titles,
  COUNT(*) AS total_titles,
  ROUND(COUNTIF(flag_duration_anomaly = 1) / COUNT(*) * 100, 2) AS pct_duration_anomaly
FROM flagged;
"""
df_duration = client.query(query).to_dataframe()
print("üé¨ Duration anomaly summary (<15 min or >480 min):")
display(df_duration)

üé¨ Duration anomaly summary (<15 min or >480 min):


Unnamed: 0,duration_anomaly_titles,total_titles,pct_duration_anomaly
0,138,6240,2.21


In [None]:
# # EXAMPLE (from LLM) ‚Äî flag_duration_anomaly (commented)
# # SELECT
# #   COUNTIF(duration_min < 15) AS titles_under_15m,
# #   COUNTIF(duration_min > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

In [64]:
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

# Identify implausible movie durations (<15 min or >480 min)
query = f"""
-- Flag suspicious movie durations
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS duration_anomalies,
  COUNT(*) AS total_movies,
  ROUND(COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*) * 100, 2) AS pct_anomalies
FROM `{project_id}.netflix.movies`;
"""
df_duration = client.query(query).to_dataframe()
print("üé• Movie duration anomaly summary (<15 or >480 minutes):")
display(df_duration)

üé• Movie duration anomaly summary (<15 or >480 minutes):


Unnamed: 0,duration_anomalies,total_movies,pct_anomalies
0,138,6240,2.21


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [61]:
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
client = bigquery.Client(project=project_id)

query = f"""
WITH
  binge_summary AS (
    SELECT
      'flag_binge' AS flag_name,
      ROUND(COUNTIF(minutes_watched_capped > 480) / COUNT(*) * 100, 2) AS pct_of_rows
    FROM
      `{project_id}.netflix.watch_history_robust`
  ),
  age_summary AS (
    SELECT
      'flag_age_extreme' AS flag_name,
      ROUND(COUNTIF(SAFE_CAST(REGEXP_EXTRACT(CAST(age AS STRING), r'\\d+') AS INT64) < 10 OR SAFE_CAST(REGEXP_EXTRACT(CAST(age AS STRING), r'\\d+') AS INT64) > 100) / COUNT(*) * 100, 2) AS pct_of_rows
    FROM
      `{project_id}.netflix.users`
  ),
  duration_summary AS (
    SELECT
      'flag_duration_anomaly' AS flag_name,
      ROUND(COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*) * 100, 2) AS pct_of_rows
    FROM
      `{project_id}.netflix.movies`
  )
SELECT * FROM binge_summary
UNION ALL
SELECT * FROM age_summary
UNION ALL
SELECT * FROM duration_summary;
"""

df_anomaly_summary = client.query(query).to_dataframe()
print("‚úÖ Verification - Anomaly Flag Summary:")
display(df_anomaly_summary)

‚úÖ Verification - Anomaly Flag Summary:


Unnamed: 0,flag_name,pct_of_rows
0,flag_binge,0.0
1,flag_age_extreme,1.74
2,flag_duration_anomaly,2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

the duration_anomaly flag is the most common with the age being second most common. The binge flag occurs 0 times and is very helpful to know if users are on the platform often and if theyre likely to renew their subscriptions

# Reflection & Insights:
In this Lab we loaded in the Netflix dataset and completed multiple processes to ensure high quality data for future Machine Learning use. The data went under a comprehensive data-quality review which addressed missingness, outliers, and anomaly flags. The data quality issue that was most pronounced was the age_band missing as well as the duration anomaly we found, these will both need to be addressed in order tohave reliable machine learning results. Some processes that can be used to combat the potential troubles in data were capping extremes, deduplication, imputation, and removing invalid durations. Some features we found that may be used as model features down the line include the binge flag created as that can track a users extreme engagement meaning they are unlikely to churn. Now that we've assessed and cleaned the data it is more consistent and reliable. These imrpovements will also lead to more stable and interpretable churn predictions down the line.


## 6) Save & submit ‚Äî What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today‚Äôs row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
