# AWS Credentials Required

> To run this notebook, you must set AWS credentials in your environment before executing any cells.
>
- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `AWS_SESSION_TOKEN` (only if using temporary credentials)
- `AWS_REGION` (optional; defaults to `us-east-1`)

> Tip: You can set these in your terminal session or in a `.env` file loaded by the notebook.

# Backblaze Data Pipeline - S3 Setup and Data Loading

This notebook sets up an S3 bucket, downloads Backblaze hard drive data, and converts it to partitioned Parquet format for efficient querying.

## 1. Initialize S3 Bucket

Creates or retrieves an S3 bucket for storing raw and curated data. The bucket name is persisted in a `.env` file for reuse across sessions.

In [11]:
import os
import uuid
import boto3
from dotenv import load_dotenv, set_key

ENV_PATH = ".env"

# Load .env if it exists
load_dotenv(ENV_PATH)

bucket = os.getenv("BUCKET_NAME")

session = boto3.session.Session()
region = session.region_name or os.getenv("AWS_REGION") or "us-east-1"
s3 = boto3.client("s3", region_name=region)

DATA_DIR = os.getenv("DATA_DIR", "/tmp/aai540")
LOCAL_ZIP_DIR = os.path.join(DATA_DIR, "backblaze_zips")
LOCAL_PARQUET_DIR = os.path.join(DATA_DIR, "backblaze_parquet")
os.makedirs(LOCAL_ZIP_DIR, exist_ok=True)
os.makedirs(LOCAL_PARQUET_DIR, exist_ok=True)

# If BUCKET_NAME exists, use it
if bucket:
    print("Using existing bucket from .env:", bucket)

else:
    # Create new bucket name
    bucket = f"mlops-backblaze-{uuid.uuid4().hex[:8]}-{region}"
    print("Creating new bucket:", bucket)

    try:
        if region == "us-east-1":
            s3.create_bucket(Bucket=bucket)
        else:
            s3.create_bucket(
                Bucket=bucket,
                CreateBucketConfiguration={"LocationConstraint": region}
            )
    except Exception as e:
        if "NoCredentialsError" in str(type(e)):
            print("Error: AWS credentials not found.")
            print("Running in local-only mode. S3 steps will be skipped.")
            bucket = None
        else:
            raise e

    if bucket is not None:
        # Persist to .env
        if not os.path.exists(ENV_PATH):
            open(ENV_PATH, "w").close()

        set_key(ENV_PATH, "BUCKET_NAME", bucket)

local_mode = bucket is None
print("Local mode:", local_mode)
if bucket:
    print("Bucket ready:", bucket)
else:
    print("Bucket not created due to missing credentials.")
reload_s3 = False if local_mode else False



Creating new bucket: mlops-backblaze-83d6dd0d-us-east-1
Error: AWS credentials not found.
Running in local-only mode. S3 steps will be skipped.
Local mode: True
Bucket not created due to missing credentials.


## 2. Create Folder Structure

Creates a standard MLOps folder structure in S3 to organize:
- Raw data (Backblaze hard drive stats, reviews)
- Curated/processed data
- Feature stores
- Model artifacts
- Evaluation results
- Monitoring data
- Batch inference outputs

In [8]:
if not local_mode:
    prefixes = [
        "raw/backblaze/",
        "raw/reviews/",
        "curated/",
        "features/",
        "artifacts/models/",
        "artifacts/eval/",
        "artifacts/monitoring/",
        "inference/batch/"
    ]

    for p in prefixes:
        s3.put_object(Bucket=bucket, Key=p, Body=b'')

    print("Folder layout created:")
    for p in prefixes:
        print(" -", p)
else:
    print("Local mode: using", DATA_DIR)

Local mode: using /tmp/aai540


## 3. Verify Folder Structure

Lists all objects in the S3 bucket to confirm the folder structure was created successfully.

In [9]:
if not local_mode:
    resp = s3.list_objects_v2(Bucket=bucket)
    for obj in resp.get("Contents", []):
        print(obj["Key"])
else:
    print("Local mode: listing local directories")
    print("ZIP dir:", LOCAL_ZIP_DIR)
    print("Parquet dir:", LOCAL_PARQUET_DIR)

Local mode: listing local directories
ZIP dir: /tmp/aai540/backblaze_zips
Parquet dir: /tmp/aai540/backblaze_parquet


## 4. Download Backblaze Data to S3

Scrapes the Backblaze website to find all quarterly data files from 2024 onwards, then:
- Downloads each ZIP file directly from Backblaze
- Streams the data to S3 without storing locally (efficient for large files)
- Skips files that already exist in S3 to avoid redundant downloads
- Stores ZIP files in `raw/backblaze/zips/`

In [13]:
import os
import re
import zipfile
from io import BytesIO
from urllib.request import urlopen, Request
import pandas as pd

# Scrape Backblaze URLs with user-agent to avoid 403 Forbidden
req = Request("https://www.backblaze.com/cloud-storage/resources/hard-drive-test-data", headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'})
html = urlopen(req).read().decode("utf-8")
urls = sorted(set(re.findall(r"https://f001\.backblazeb2\.com/file/Backblaze-Hard-Drive-Data/data_Q[1-4]_[0-9]{4}\.zip", html)))
urls = [u for u in urls if re.search(r"_202[4-9]\.zip", u)]

print("Found URLs:", len(urls))
print("First 5:", urls[:5])

if not urls:
    raise ValueError("No Backblaze URLs found.")

# Download first ZIP locally for a sample parquet
sample_url = urls[0]
fname = os.path.basename(sample_url)
local_zip = os.path.join(LOCAL_ZIP_DIR, fname)
if not os.path.exists(local_zip):
    print("Downloading:", sample_url)
    data = urlopen(sample_url).read()
    with open(local_zip, "wb") as f:
        f.write(data)
else:
    print("Using cached ZIP:", local_zip)

# Optionally upload to S3 if available
if not local_mode:
    try:
        s3.upload_file(local_zip, bucket, f"raw/backblaze/zips/{fname}")
        print("Uploaded to S3:", f"s3://{bucket}/raw/backblaze/zips/{fname}")
    except Exception as e:
        print("S3 upload skipped:", e)

# Create a local sample parquet for downstream notebooks
with zipfile.ZipFile(local_zip, "r") as z:
    csv_files = [n for n in z.namelist() if n.endswith(".csv") and not os.path.basename(n).startswith("._")]
    if not csv_files:
        raise ValueError("No CSV files found in ZIP.")
    sample_csv = csv_files[0]
    with z.open(sample_csv) as f:
        df_sample = pd.read_csv(f, encoding="utf-8", encoding_errors="replace")

sample_parquet = os.path.join(DATA_DIR, "engineered_data_sample.parquet")
df_sample.to_parquet(sample_parquet, index=False)
print("Saved sample parquet:", sample_parquet)

Found URLs: 7
First 5: ['https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/data_Q1_2024.zip', 'https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/data_Q1_2025.zip', 'https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/data_Q2_2024.zip', 'https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/data_Q2_2025.zip', 'https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/data_Q3_2024.zip']
Downloading: https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/data_Q1_2024.zip
Saved sample parquet: /tmp/aai540/engineered_data_sample.parquet


## 5. List Downloaded ZIP Files

Verifies that all ZIP files were successfully uploaded to S3.

In [None]:
if not local_mode:
    import subprocess
    result = subprocess.run(["aws", "s3", "ls", f"s3://{bucket}/raw/backblaze/zips/"], capture_output=True, text=True)
    print(result.stdout or result.stderr)
else:
    print("Local ZIP files:")
    for name in sorted(os.listdir(LOCAL_ZIP_DIR))[:10]:
        print(name)

2026-01-25 13:42:10 1020483699 data_Q1_2025.zip
2026-01-25 13:43:05 1067562257 data_Q2_2025.zip
2026-01-25 13:44:03 1111587745 data_Q3_2025.zip


## 6. List and Prepare ZIP Files for Processing

Imports libraries for data processing and lists all ZIP files from S3 that need to be converted to Parquet format. Uses pagination to handle large numbers of files efficiently.

In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from io import BytesIO
import zipfile
import os

zip_s3_prefix = "raw/backblaze/zips/"
out_prefix = "curated/backblaze_parquet/"

def list_s3_keys(prefix):
    paginator = s3.get_paginator("list_objects_v2")
    keys = []
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            if obj["Key"].endswith(".zip"):
                keys.append(obj["Key"])
    return sorted(keys)

if local_mode:
    zip_keys = [os.path.join(LOCAL_ZIP_DIR, f) for f in os.listdir(LOCAL_ZIP_DIR) if f.endswith(".zip")]
else:
    zip_keys = list_s3_keys(zip_s3_prefix)

print("ZIPs found:", len(zip_keys))
print(zip_keys[:5])


ZIPs found: 3
['raw/backblaze/zips/data_Q1_2025.zip', 'raw/backblaze/zips/data_Q2_2025.zip', 'raw/backblaze/zips/data_Q3_2025.zip']


## 7. Convert CSV to Partitioned Parquet

Processes each ZIP file by:
1. Downloading the ZIP from S3 into memory
2. Extracting CSV files from the ZIP
3. Parsing date information from CSV filenames (YYYY-MM-DD)
4. Converting CSV to Parquet format with Snappy compression
5. Uploading to S3 with Hive-style partitioning: `year=YYYY/month=MM/day=DD/`
6. Skipping files that already exist to enable resumable processing

This partitioned structure enables efficient querying by date ranges in tools like Athena or Spark.

In [14]:
if local_mode:
    print("Local mode: sample parquet already saved to", os.path.join(DATA_DIR, "engineered_data_sample.parquet"))
else:
    def upload_parquet_to_s3(df, s3_key):
        table = pa.Table.from_pandas(df, preserve_index=False)
        buf = BytesIO()
        pq.write_table(table, buf, compression="snappy")
        buf.seek(0)
        s3.put_object(Bucket=bucket, Key=s3_key, Body=buf.getvalue())

    def check_s3_key_exists(s3_key):
        try:
            s3.head_object(Bucket=bucket, Key=s3_key)
            return True
        except:
            return False

    for zip_key in zip_keys:
        print("Processing:", zip_key)
        zip_obj = s3.get_object(Bucket=bucket, Key=zip_key)["Body"].read()

        with zipfile.ZipFile(BytesIO(zip_obj), "r") as z:
            # Filter out macOS metadata files (._filename) and only get actual CSV files
            csv_files = [n for n in z.namelist() 
                        if n.endswith(".csv") and not os.path.basename(n).startswith("._")]

            for csv_name in csv_files:
                # csv_name is like 'data_Q1_2024/2024-01-01.csv'
                date_str = os.path.basename(csv_name).replace(".csv", "")
                yyyy, mm, dd = date_str.split("-")

                # Write to partitioned parquet key
                out_key = (
                    f"{out_prefix}year={yyyy}/month={mm}/day={dd}/"
                    f"{os.path.basename(zip_key).replace('.zip','')}_{dd}.parquet"
                )

                # Check if already extracted
                if check_s3_key_exists(out_key):
                    print(f"  Skip (already exists): {out_key}")
                    continue

                with z.open(csv_name) as f:
                    df = pd.read_csv(f, encoding='utf-8', encoding_errors='replace')

                upload_parquet_to_s3(df, out_key)
                print(f"  Uploaded: {out_key}")

    print("Done writing curated partitioned parquet to S3!")

Local mode: sample parquet already saved to /tmp/aai540/engineered_data_sample.parquet


In [15]:
# %pip install -q -U datasets pyarrow s3fs pandas

In [16]:
HF_DATASET = "McAuley-Lab/Amazon-Reviews-2023"

REVIEW_CONFIG = "raw_review_Electronics"
META_CONFIG   = "raw_meta_Electronics"

s3_reviews_out = f"s3://{bucket}/raw/reviews_2023_parquet/{REVIEW_CONFIG}/"
s3_meta_out    = f"s3://{bucket}/raw/reviews_2023_parquet/{META_CONFIG}/"

print(s3_reviews_out)
print(s3_meta_out)


s3://None/raw/reviews_2023_parquet/raw_review_Electronics/
s3://None/raw/reviews_2023_parquet/raw_meta_Electronics/


In [17]:
if reload_s3:
    from huggingface_hub import HfFileSystem
    from pathlib import Path
    import pandas as pd
    import pyarrow.parquet as pq
    import pyarrow as pa
    from io import BytesIO
    import json

    # Initialize HF filesystem
    hf_fs = HfFileSystem()

    dataset_name = HF_DATASET

    # Find Electronics review file
    print("Finding Electronics review file...")
    review_path = f"datasets/{dataset_name}/raw/review_categories"
    review_files = hf_fs.ls(review_path, detail=True)
    electronics_files = [f for f in review_files if 'Electronics' in f['name']]

    if not electronics_files:
        print("No Electronics files found")
    else:
        file_info = electronics_files[0]
        file_path = file_info['name']
        file_name = Path(file_path).name
        
        print(f"Found: {file_name}")
        print(f"Size: {file_info['size'] / 1024 / 1024:.2f} MB")
        
        # First pass: Count total rows in JSONL file
        print(f"\nCounting total rows in JSONL file...")
        total_lines = 0
        with hf_fs.open(file_path, 'r') as f:
            for line in f:
                if line.strip():
                    total_lines += 1
                if total_lines % 500000 == 0:
                    print(f"  Counted {total_lines:,} rows...")
        
        print(f"Total rows in JSONL file: {total_lines:,}\n")
        
        # Second pass: Stream JSONL from HF, convert to Parquet, upload directly to S3
        # This avoids filling up local disk
        # Filter for hard drive manufacturers: Western Digital, Toshiba, Seagate, Hitachi
        print(f"Streaming JSONL -> Parquet -> S3 (filtering for hard drive brands)...")
        
        # Define target brands for filtering (case-insensitive)
        TARGET_BRANDS = ['western digital', 'wd', 'toshiba', 'seagate', 'hitachi', 'hgst', 'wdc']
        
        def is_hard_drive_review(review_data):
            """Check if review is for Western Digital, Toshiba, Seagate, or Hitachi storage drives"""
            # Check in title, text, and parent_asin fields
            search_fields = []
            
            if 'title' in review_data and review_data['title']:
                search_fields.append(str(review_data['title']).lower())
            if 'text' in review_data and review_data['text']:
                search_fields.append(str(review_data['text']).lower())
            if 'parent_asin' in review_data:
                search_fields.append(str(review_data['parent_asin']).lower())
            
            # Combine all searchable text
            search_text = ' '.join(search_fields)
            
            # Check if any target brand appears in the text
            for brand in TARGET_BRANDS:
                if brand in search_text:
                    # Additional check for storage-related keywords
                    storage_keywords = ['hard drive', 'hdd', 'ssd', 'drive', 'storage', 'disk', 'external drive', 'internal drive']
                    if any(keyword in search_text for keyword in storage_keywords):
                        return True
            return False
        
        chunk_size = 200_000
        part_idx = 0
        buffer = []
        total_rows = 0
        filtered_rows = 0
        
        # Open the JSONL file from HF
        with hf_fs.open(file_path, 'r') as f:
            for line_num, line in enumerate(f):
                if line.strip():
                    review_data = json.loads(line)
                    total_rows += 1
                    
                    # Filter for hard drive brands
                    if is_hard_drive_review(review_data):
                        buffer.append(review_data)
                        filtered_rows += 1
                    
                    # When buffer reaches chunk_size, write to parquet and upload
                    if len(buffer) >= chunk_size:
                        df = pd.DataFrame(buffer)
                        table = pa.Table.from_pandas(df, preserve_index=False)
                        
                        # Write to memory buffer
                        buf = BytesIO()
                        pq.write_table(table, buf, compression="snappy")
                        buf.seek(0)
                        
                        # Upload directly to S3
                        s3_key = s3_reviews_out.replace(f"s3://{bucket}/", "") + f"part-{part_idx:06d}.parquet"
                        s3.put_object(Bucket=bucket, Key=s3_key, Body=buf.getvalue())
                        
                        print(f"  Part {part_idx}: {len(buffer):,} rows → s3://{bucket}/{s3_key}")
                        
                        # Clear buffer
                        buffer = []
                        part_idx += 1
                
                # Progress update every 100k lines
                if (line_num + 1) % 100000 == 0:
                    print(f"  Processed {line_num + 1:,} lines | Filtered: {filtered_rows:,}/{total_rows:,} ({100*filtered_rows/total_rows:.2f}%)")
        
        # Write remaining buffer
        if buffer:
            df = pd.DataFrame(buffer)
            table = pa.Table.from_pandas(df, preserve_index=False)
            
            buf = BytesIO()
            pq.write_table(table, buf, compression="snappy")
            buf.seek(0)
            
            s3_key = s3_reviews_out.replace(f"s3://{bucket}/", "") + f"part-{part_idx:06d}.parquet"
            s3.put_object(Bucket=bucket, Key=s3_key, Body=buf.getvalue())
            
            print(f"  Part {part_idx}: {len(buffer):,} rows → s3://{bucket}/{s3_key}")
        
        print(f"\n Complete!")
        print(f"   Total reviews processed: {total_rows:,}")
        print(f"   Hard drive reviews (WD/Toshiba/Seagate/Hitachi): {filtered_rows:,} ({100*filtered_rows/total_rows:.2f}%)")
        print(f"   Parquet files created: {part_idx + 1}")
        print(f"   S3 location: {s3_reviews_out}")


## Read Sample Backblaze Parquet File from S3

Read a sample parquet file from the curated Backblaze data in S3.

In [18]:
if reload_s3:
    import pandas as pd
    import pyarrow.parquet as pq
    import pyarrow as pa
    from io import BytesIO

    # List files in the S3 path
    s3_path = "curated/backblaze_parquet/"
    response = s3.list_objects_v2(Bucket=bucket, Prefix=s3_path, MaxKeys=10)

    if 'Contents' in response:
        # Get the first parquet file
        parquet_files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.parquet')]
        
        if parquet_files:
            sample_file = parquet_files[0]
            print(f"Reading sample file: s3://{bucket}/{sample_file}")
            
            # Download and read the parquet file
            obj = s3.get_object(Bucket=bucket, Key=sample_file)
            buffer = BytesIO(obj['Body'].read())
            
            # Read into dataframe
            df_backblaze = pd.read_parquet(buffer)
            
            print(f"\nDataframe shape: {df_backblaze.shape}")
            print(f"\nColumns: {list(df_backblaze.columns)}")
            print(f"\nFirst few rows:")
            display(df_backblaze.head())
        else:
            print(f"No parquet files found in s3://{bucket}/{s3_path}")
    else:
        print(f"No objects found in s3://{bucket}/{s3_path}")

## Read Sample Reviews Parquet File from S3

Read a sample parquet file from the curated reviews data in S3.

In [19]:
if reload_s3:
    # List files in the reviews S3 path
    s3_reviews_path = "raw/reviews_2023_parquet/raw_review_Electronics/"
    response_reviews = s3.list_objects_v2(Bucket=bucket, Prefix=s3_reviews_path, MaxKeys=10)

    if 'Contents' in response_reviews:
        # Get the first parquet file
        reviews_parquet_files = [obj['Key'] for obj in response_reviews['Contents'] if obj['Key'].endswith('.parquet')]
        
        if reviews_parquet_files:
            sample_reviews_file = reviews_parquet_files[0]
            print(f"Reading sample reviews file: s3://{bucket}/{sample_reviews_file}")
            
            # Download and read the parquet file
            obj_reviews = s3.get_object(Bucket=bucket, Key=sample_reviews_file)
            buffer_reviews = BytesIO(obj_reviews['Body'].read())
            
            # Read into dataframe
            df_reviews = pd.read_parquet(buffer_reviews)
            
            print(f"\nDataframe shape: {df_reviews.shape}")
            print(f"\nColumns: {list(df_reviews.columns)}")
            print(f"\nFirst few rows:")
            display(df_reviews.head())
        else:
            print(f"No parquet files found in s3://{bucket}/{s3_reviews_path}")
    else:
        print(f"No objects found in s3://{bucket}/{s3_reviews_path}")

In [20]:
if reload_s3:
    # Count unique ASIN values
    unique_asin_count = df_reviews['asin'].nunique()
    print(f"Unique ASIN values: {unique_asin_count:,}")

    # Also show unique parent_asin if it exists
    if 'parent_asin' in df_reviews.columns:
        unique_parent_asin_count = df_reviews['parent_asin'].nunique()
        print(f"Unique parent_asin values: {unique_parent_asin_count:,}")

## Examine Data for Joining

Check sample values from both datasets to understand how to create a join key.

In [21]:
if reload_s3:
    # Sample review titles and parent_asin to see product names
    print("Sample Reviews Data:")
    print(df_reviews[['parent_asin', 'title', 'text']].head(10))

    print("\n" + "="*80 + "\n")

    # Sample backblaze model names
    print("Sample Backblaze Model Data:")
    print(df_backblaze['model'].value_counts().head(20))

## Create Join Keys

Extract manufacturer and model information from reviews to match with Backblaze model data.

In [22]:
if reload_s3:
    import re

    def extract_manufacturer(text):
        """Extract manufacturer from review text"""
        text_lower = str(text).lower()
        
        # Map various manufacturer names to standard format
        if 'toshiba' in text_lower:
            return 'TOSHIBA'
        elif 'seagate' in text_lower or 'st' in text_lower[:3]:
            return 'SEAGATE'
        elif 'western digital' in text_lower or 'wd' in text_lower or 'wdc' in text_lower:
            return 'WDC'
        elif 'hitachi' in text_lower or 'hgst' in text_lower:
            return 'HGST'
        
        return None

    def extract_model_hints(text):
        """Extract potential model numbers/patterns from review text"""
        text_upper = str(text).upper()
        
        # Look for common model patterns
        patterns = [
            r'[A-Z]{2,}\s*[A-Z0-9]{6,}',  # e.g., "MG08ACA16TA", "WUH722222ALE6L4"
            r'ST\d{4,}[A-Z]{2}\d{3,}[A-Z]?',  # Seagate pattern
            r'WD[A-Z0-9]{6,}',  # WD pattern
            r'MG\d{2}[A-Z]{3}\d{2}[A-Z]{2,}',  # Toshiba pattern
        ]
        
        models = []
        for pattern in patterns:
            matches = re.findall(pattern, text_upper)
            models.extend(matches)
        
        return models if models else None

    # Add manufacturer and model hints to reviews dataframe
    df_reviews['manufacturer'] = df_reviews['title'].apply(extract_manufacturer)
    df_reviews['manufacturer'] = df_reviews['manufacturer'].fillna(
        df_reviews['text'].apply(extract_manufacturer)
    )

    df_reviews['model_hints'] = df_reviews['title'].apply(extract_model_hints)
    df_reviews['model_hints'] = df_reviews['model_hints'].fillna(
        df_reviews['text'].apply(extract_model_hints)
    )

    # Add manufacturer to backblaze data for easier joining
    def get_bb_manufacturer(model):
        """Extract manufacturer from Backblaze model name"""
        if model.startswith('TOSHIBA'):
            return 'TOSHIBA'
        elif model.startswith('ST') or model.startswith('SEAGATE'):
            return 'SEAGATE'
        elif model.startswith('WDC') or model.startswith('WD'):
            return 'WDC'
        elif model.startswith('HGST') or model.startswith('HITACHI'):
            return 'HGST'
        return None

    df_backblaze['manufacturer'] = df_backblaze['model'].apply(get_bb_manufacturer)

    print("Reviews with manufacturer extracted:")
    print(df_reviews[['parent_asin', 'title', 'manufacturer', 'model_hints']].head(10))
    print(f"\nReviews with manufacturer identified: {df_reviews['manufacturer'].notna().sum():,} / {len(df_reviews):,}")

    print("\n" + "="*80 + "\n")

    print("Backblaze with manufacturer extracted:")
    print(df_backblaze[['model', 'manufacturer']].head(10))
    print(f"\nManufacturer distribution in Backblaze:")
    print(df_backblaze['manufacturer'].value_counts())

## Join Strategy Summary

Now both datasets have a `manufacturer` field that can be used for joining:

- **Reviews**: `df_reviews['manufacturer']` - extracted from title/text (TOSHIBA, SEAGATE, WDC, HGST)
- **Backblaze**: `df_backblaze['manufacturer']` - extracted from model name

You can join on manufacturer to analyze reviews by manufacturer against Backblaze failure data.

In [23]:
if reload_s3:
    # Example join: Get reviews by manufacturer
    print("Join Example - Reviews by Manufacturer:")
    print("\nReviews distribution:")
    print(df_reviews['manufacturer'].value_counts())

    print("\n" + "="*80 + "\n")

    print("Backblaze distribution:")
    print(df_backblaze['manufacturer'].value_counts())

    # You can now join like this:
    # joined_df = df_reviews.merge(df_backblaze, on='manufacturer', how='inner')
    # Or do aggregations by manufacturer before joining

In [24]:
if reload_s3:
    # Check a sample Backblaze parquet file to understand the schema
    import pandas as pd
    import pyarrow.parquet as pq
    from io import BytesIO

    # List some parquet files
    s3_path = "curated/backblaze_parquet/"
    response = s3.list_objects_v2(Bucket=bucket, Prefix=s3_path, MaxKeys=100)

    if 'Contents' in response:
        # Find the first actual parquet file
        parquet_files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.parquet')]
        
        if parquet_files:
            sample_file = parquet_files[0]
            print(f"Reading schema from: {sample_file}\n")
            
            # Download and read parquet file metadata
            obj = s3.get_object(Bucket=bucket, Key=sample_file)
            buffer = BytesIO(obj['Body'].read())
            
            # Read parquet schema
            parquet_file = pq.ParquetFile(buffer)
            arrow_schema = parquet_file.schema_arrow
            
            print(f"Number of columns: {len(arrow_schema.names)}\n")
            print("First 20 columns:")
            for i, name in enumerate(arrow_schema.names[:20], 1):
                field = arrow_schema.field(name)
                print(f"  {i}. {name} ({field.type})")
            
            print(f"\n... and {len(arrow_schema.names) - 20} more columns")
        else:
            print(f"No parquet files found in s3://{bucket}/{s3_path}")
    else:
        print(f"No objects found in s3://{bucket}/{s3_path}")