In [7]:
import sys
if "/mnt/c/Users/HP/Learning/DE_Development" not in sys.path:
    sys.path.append("/mnt/c/Users/HP/Learning/DE_Development")

In [49]:
from pyspark.sql import functions as F, types as T
from pyspark.sql import SparkSession
import yaml, os, sys
from src.utils.spark_utils import create_spark


    
config_path = '/mnt/c/Users/HP/Learning/DE_Development/config/pipeline_config_wsl.yaml'

with open(config_path, "r") as f:
    config_file = yaml.safe_load(f)

base_path = config_file["paths"]["base_path"]
print(base_path)
landing_batch = config_file["paths"]["landing"]["batch"].replace("${paths.base_path}", base_path)
landing_batch_product = os.path.join(landing_batch, "Products.csv")
print(landing_batch_product)
bronze_products_path = config_file["paths"]["bronze"]["products"].replace("${paths.base_path}", base_path)
print(bronze_products_path)

spark = create_spark(config_file["spark"]["app_name"], config_file["spark"]["configs"])



# Define schema matching your actual columns
products_schema = T.StructType([
    T.StructField("product_id", T.StringType(), True),
    T.StructField("product_name", T.StringType(), True),
    T.StructField("category", T.StringType(), True),
    T.StructField("sub_category", T.StringType(), True),
    T.StructField("brand", T.StringType(), True),
    T.StructField("unit_price", T.DoubleType(), True),
    T.StructField("launch_date", T.StringType(), True),  # Keep as string, convert later
    T.StructField("is_active", T.BooleanType(), True),
])


product_raw = (spark.read
    .option("header", True)
    .schema(products_schema)
    .csv(landing_batch_product))
product_raw.show(5)



product_bronze = (product_raw
                  .withColumn("ingestion_timestamp", F.current_timestamp())
                  .withColumn("source_file", F.input_file_name())
                  .withColumn("batch_date", F.current_date())
                  .withColumn("record_hash", F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in product_raw.columns]), 256)))

# Basic DQ: required fields not null, positive price
required_cols = [F.col("product_id").isNotNull(), F.col("product_name").isNotNull()]
dq_conditions = required_cols + [(F.col("unit_price").isNull()) | (F.col("unit_price") >= F.lit(0.0))]
dq_valid = (F.col("product_id").isNotNull() & 
            F.col("product_name").isNotNull() & 
            ((F.col("unit_price").isNull()) | (F.col("unit_price") >= F.lit(0.0))))

product_bronze = product_bronze.withColumn("dq_is_valid", dq_valid)


product_bronze.show(5, truncate = False)


(product_bronze
  .repartition(1)
  .write
  .mode("overwrite")
  .partitionBy("batch_date")
  .parquet(bronze_products_path))

# Read the written Parquet data back
bronze_products_read = spark.read.parquet(bronze_products_path)
print("=== BRONZE DATA READ BACK ===")
bronze_products_read.show(10, truncate=False)
print(f"Total rows in bronze: {bronze_products_read.count()}")


/mnt/c/Users/HP/Learning/DE_Development
/mnt/c/Users/HP/Learning/DE_Development/data/landing/batch/Products.csv
/mnt/c/Users/HP/Learning/DE_Development/data/bronze/products
+----------+--------------------+---------+------------+------------+----------+-----------+---------+
|product_id|        product_name| category|sub_category|       brand|unit_price|launch_date|is_active|
+----------+--------------------+---------+------------+------------+----------+-----------+---------+
|      P001|       Tata Tea Gold|Beverages|         Tea|    Tata Tea|     245.0| 15-01-2020|     true|
|      P002|   Tata Coffee Grand|Beverages|      Coffee| Tata Coffee|     185.0| 10-06-2019|     true|
|      P003|Tata Sampann Toor...|    Foods|      Pulses|Tata Sampann|     145.0| 20-03-2021|     true|
|      P004|      Tata Salt Lite|    Foods|        Salt|   Tata Salt|      28.0| 12-04-2018|     true|
|      P005|    Tetley Green Tea|Beverages|         Tea|      Tetley|     120.0| 05-11-2020|     true|
+--