In [None]:
# data_ingest_and_preparation.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

# ==================== CONFIGURATION ====================
DELTA_TABLE_NAME = "house_price_delta"
OUTPUT_DELTA_TABLE = "house_price_scaled_delta"

# ==================== FUNCTIONS ====================

def initialize_spark():
    """SparkSession को इनिशियलाइज़ करता है (यदि पहले से नहीं है)"""
    # Databricks में, SparkSession पहले से ही 'spark' variable में उपलब्ध होती है।
    if 'spark' in globals() and isinstance(globals()['spark'], SparkSession):
        print("✅ Using existing Databricks SparkSession")
        return globals()['spark']
    try:
        spark = SparkSession.builder.appName("DataIngestAndPreparation").getOrCreate()
        print("✅ SparkSession initialized successfully")
        return spark
    except Exception as e:
        print(f"❌ SparkSession initialization failed: {e}")
        return None

def ingest_data(spark, table_name):
    """Delta table से डेटा ingest करता है"""
    if spark is None:
        print("❌ SparkSession उपलब्ध नहीं है। डेटा इंगेस्ट नहीं किया जा सकता।")
        return None, None
        
    print(f"📥 Loading data from Delta table '{table_name}' ...")
    try:
        df = spark.read.format("delta").table(table_name)
        feature_cols = ["sq_feet", "num_bedrooms", "num_bathrooms", "year_built", "location_score"]
        label_col = "price"

        # Ensure all columns are double for scaler
        for c in feature_cols + [label_col]:
            df = df.withColumn(c, col(c).cast("double"))

        df = df.select(*feature_cols, col(label_col).alias("label"))

        print(f"✅ Data successfully ingested. Total rows: {df.count()}")
        df.printSchema()
        return df, feature_cols
    except Exception as e:
        print(f"❌ Data ingestion failed: {e}")
        return None, None

def prepare_data(spark, df, feature_cols):
    """Feature scaling (StandardScaler) लागू करता है और scaled data को Delta में save करता है"""
    if spark is None or df is None:
        print("❌ Invalid SparkSession or DataFrame. Cannot proceed with scaling.")
        return None
        
    print("\n⚙️ Feature scaling started...")

    # Step 0: Handle missing / NaN values
    print("🧹 Checking and filling missing values...")
    df = df.na.fill(0)

    # Step 1: Assemble features
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vector")

    # Step 2: Apply StandardScaler
    scaler = StandardScaler(
        inputCol="features_vector", 
        outputCol="scaled_features", 
        withMean=True, 
        withStd=True
    )

    # Step 3: Build pipeline
    pipeline = Pipeline(stages=[assembler, scaler])

    try:
        print("🔄 Fitting scaling pipeline...")
        model = pipeline.fit(df)
        scaled_data = model.transform(df)
        print("✅ Pipeline fitted and transformed successfully")
    except Exception as e:
        print(f"❌ Scaling pipeline failed: {e}")
        import traceback
        traceback.print_exc()
        return None

    # Step 4: Extract individual scaled columns
    from pyspark.ml.functions import vector_to_array
    scaled_data = scaled_data.withColumn("scaled_array", vector_to_array(col("scaled_features")))

    for i, c in enumerate(feature_cols):
        scaled_data = scaled_data.withColumn(f"{c}_scaled", col("scaled_array")[i])

    # Step 5: Select final columns
    final_cols = [f"{c}_scaled" for c in feature_cols] + ["label"]
    scaled_final_df = scaled_data.select(*final_cols)

    print("✅ Feature scaling completed successfully.")
    scaled_final_df.show(5)

    # Step 6: Save to Delta table (overwrite mode)
    try:
        print(f"💾 Saving scaled data to Delta table '{OUTPUT_DELTA_TABLE}'...")
        scaled_final_df.write.format("delta").mode("overwrite").saveAsTable(OUTPUT_DELTA_TABLE)
        print(f"✅ Scaled data successfully saved to Delta table '{OUTPUT_DELTA_TABLE}'")
    except Exception as e:
        print(f"❌ Failed to save scaled data to Delta: {e}")
        return None

    return scaled_final_df

# ==================== EXECUTION ====================
def main():
    print("=" * 70)
    print("🚀 DATA INGESTION AND PREPARATION PIPELINE")
    print("=" * 70)
    
    # Initialize SparkSession (uses Databricks global 'spark' if available)
    spark_session = initialize_spark()
    
    if spark_session is None:
        print("❌ Cannot proceed without SparkSession")
        raise Exception("SparkSession initialization failed")

    # Step 1: Data ingestion
    ingested_df, feature_cols = ingest_data(spark_session, DELTA_TABLE_NAME)
    
    if ingested_df is None or feature_cols is None:
        print("❌ Data ingestion failed. Pipeline cannot continue.")
        raise Exception("Data ingestion failed")

    # Step 2: Feature scaling and Delta save
    scaled_df = prepare_data(spark_session, ingested_df, feature_cols)
    
    if scaled_df is None:
        print("❌ Data preparation failed. Pipeline cannot continue.")
        raise Exception("Data preparation failed")

    print("\n" + "=" * 70)
    print("🎉 DATA PIPELINE COMPLETED SUCCESSFULLY")
    print("=" * 70)
    print(f"✅ Scaled data available in: {OUTPUT_DELTA_TABLE}")
    print(f"✅ Total records processed: {scaled_df.count()}")
    print("=" * 70)

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print("\n" + "=" * 70)
        print("❌ PIPELINE FAILED")
        print("=" * 70)
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()
        raise  # Re-raise to fail the Databricks job