# Yelp Business Silver Layer Tranformation

This notebook ingest the Yelp business data form th Bronze layer, parses JSON records, validates them, deduplicates and tranforms them int a Silver layer format using **PySpark RDDs** and wirets in Parquet format.

In [None]:
import json
import time
from datetime import datetime, timezone

import findspark

findspark.init()

try:
    from pyspark.sql import SparkSession
    from pyspark.sql.types import ( StringType, StructField,
                                   StructType)

    pyspark_available = True
except ImportError:
    print("PySpark not available. Install with: pip install pyspark")
    pyspark_available = False

# Initialize SparkSession and SparkContext
if pyspark_available:
    spark = (
        SparkSession.builder.appName("yelp_business_silver_transform")
        .master("spark://192.168.5.121:7077")
        .config("spark.sql.adaptive.enabled", "true")

        # .config("spark.dynamicAllocation.enabled", "true")
        # .config("spark.dynamicAllocation.minExecutors", "3")
        # .config("spark.dynamicAllocation.maxExecutors", "16")

        # Executor settings
        .config("spark.executor.cores", "4")
        .config("spark.executor.memory", "6g")

        # Driver memory
        .config("spark.driver.memory", "2g")

        # Parallelism settings
        .config("spark.sql.shuffle.partitions", "24")
        .config("spark.default.parallelism", "24")
        
        .getOrCreate()
    )
    sc = spark.sparkContext

    print("Spark session initialzed succesfully!")
    print(f"Spark version: {spark.version}")
    print(f"Spark UI available at: {sc.uiWebUrl}")
else:
    print("Skipping Spark tasks - Pyspark not available")

## Utility functions

In [None]:
def parse_json_safe(json_str: str) -> dict:
    """
    Safely parse a JSON string and add ingestion metadata.

    Args:
        json_str (str): The JSON string to parse.
    Returns:
        dict: A dictionary containing the parsed data and ingestion metadata,
              or error information if parsing fails.
    """
    try:
        data = json.loads(json_str)

        # Add ingestion metadata
        data["_ingestion_date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
        data["_ingestion_timestamp"] = time.time()
        data["_source"] = "yelp_dataset"
        data["_status"] = "valid"

        return data

    except json.JSONDecodeError as e:

        return {
            "_raw_data": json_str,
            "_ingestion_timestamp": time.time(),
            "_source": "yelp_dataset",
            "_status": "parse_error",
            "_error_msg": str(e),
        }

In [None]:
def is_business_valid(business: dict) -> bool:
    """
    Validate business data based on specific criteria.

    Args:
        business (dict): The business data to validate.
    Returns:
        bool: True if the business data is valid, False otherwise.
    """
    required_fields = ["business_id", "name", "categories"]

    if not all(field in business for field in required_fields):
        return False

    if not isinstance(business["business_id"], str) or  len(business["business_id"].strip()) == 0:
        return False
    
    if not isinstance(business["name"], str) or len(business["name"].strip()) == 0:
        return False
    
    if not isinstance(business["categories"], str) or len(business["categories"].strip()) == 0:
        return False

    return True

In [None]:
def transform_business_silver(business: dict) -> dict:
    """
    Transform business data to silver schema.

    Args:
        business (dict): The business data to transform.
    Returns:
        dict: The transformed business data.
    """
    raw_cat = business.get("categories")
    if raw_cat is None or len(raw_cat.strip()) == 0:
        categories = ["Unknown"]
    else:
        categories = [cat.strip() for cat in raw_cat.split(",") if cat.strip()]
    
    return {
        "business_id": business["business_id"],
        "name": business["name"],
        "categories": categories,
        "ingest_date": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
    }

## Quick santity check

In [None]:
!wc -l /data/bronze/yelp/raw/2025-11-13/yelp_academic_dataset_business.json

In [None]:
!head -n 3 /data/bronze/yelp/raw/2025-11-13/yelp_academic_dataset_business.json

## Load bronze data as RDD

In [None]:
raw_path = "file:///data/bronze/yelp/raw/2025-11-13/yelp_academic_dataset_business.json"
if pyspark_available:
    business_raw_rdd = sc.textFile(raw_path)
    business_parsed_rdd = business_raw_rdd.map(parse_json_safe)
    print("Parsed record count:", business_parsed_rdd.count())
    print("Parsed sample line:", business_parsed_rdd.take(1))

## Filter parable business

In [None]:
if pyspark_available:
    business_valid_json_rdd = business_parsed_rdd.filter(lambda d: d["_status"] == "valid")
    business_invalid_json_rdd = business_parsed_rdd.filter(
        lambda d: d["_status"] == "parse_error"
    )

    total_count = business_parsed_rdd.count()
    invalid_count = business_invalid_json_rdd.count()
    print(
        f"Malformed records: {invalid_count}/{total_count} ({invalid_count/total_count*100:.2f}%)"
    )
    print(f"Valid records: {business_valid_json_rdd.count()}")

## Filter valid business

In [None]:
if pyspark_available:
    business_valid_rdd = business_valid_json_rdd.filter(is_business_valid)

    print(f"Valid business records: {business_valid_rdd.count()}")

## Deduplicate business by `business_id`

In [None]:
if pyspark_available:
    business_deduped_rdd = (
        business_valid_rdd.map(lambda r: (r["business_id"], r))
        .reduceByKey(lambda a, b: a)
        .map(lambda kv: kv[1])
    )

    print("After deduplication:", business_deduped_rdd.count())

## Apply silver transformattion

In [None]:
if pyspark_available:
    business_silver_rdd = business_deduped_rdd.map(transform_business_silver)
    print("Transformed business record count:", business_silver_rdd.count())
    print("Sample transformed business record:", business_silver_rdd.take(1))

## Convert RDD to DataFrame

In [None]:
if pyspark_available:
    business_silver_schema = StructType(
        [
            StructField("business_id", StringType(), False),
            StructField("name", StringType(), False),
            StructField("categories", StringType(), False),
            StructField("ingest_date", StringType(), False),
        ]
    )

    business_silver_df = spark.createDataFrame(business_silver_rdd, schema=business_silver_schema)
    business_silver_df.printSchema()
    business_silver_df.show(5, truncate=False)

## Write silver data to Parquet

In [None]:
if pyspark_available:
    business_silver_path = "file:///data/silver/yelp/business/"
    business_silver_df.write.mode("overwrite").partitionBy("ingest_date").parquet(
        business_silver_path
    )
    print(f"Users silver data written to: {business_silver_path}")

## Cleanup

In [None]:
if pyspark_available:
    spark.stop()
    print("Spark session stopped.")