In [0]:
# %run "/Workspace/Users/ruchika.b.mhetre@v4c.ai/vstone_project/vstone_databricks_pipeline/src/notebooks/00_Setup/project_config"

In [0]:
catalog_name = "vstone_project"  # Matches the icon with the box
schema_name = "db_project"      # Matches the icon with the database cylinders
volume_path = "/Volumes/vstone_project/db_project/raw_data"

## Ingesting last chunk of transactions data from chunk2_incremental location

In [0]:
import dlt
from pyspark.sql.functions import col, expr, lit, current_timestamp

@dlt.table(
    name="bronze_transactions_incremental_dlt",
    comment="Individual Project: Incremental ingestion with unified date parsing and dynamic pathing"
)
def bronze_transactions():
    # 1. Retrieve the volume path from the Spark configuration
    # This MUST be inside the function to avoid the 'None.get' Java error.
    # The second argument is a fallback path for local testing.
    v_path = spark.conf.get("pipeline.volume_path", "/Volumes/vstone_project/db_project/raw_data")
    
    # 2. Define the source location
    source_location = f"{v_path}/chunks/chunk2_incremental"
    
    # 3. Read the Stream using Autoloader (cloudFiles)
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .load(source_location)
        .select(
            # Primary Key
            col("id").cast("string"),
            
            # Dimension Keys (Foreign Keys)
            col("marka").cast("string"),
            col("model").cast("string"),
            col("currency").cast("string"),
            col("place").cast("string"),
            
            # Unified Date Parsing Logic
            expr("""
                coalesce(
                    to_date(date, 'dd.MM.yyyy'),
                    to_date(date, "yyyy-MM-dd'T'HH:mm:ss'Z'"),
                    to_date(date, 'yyyy-MM-dd')
                )
            """).alias("date"),
            
            # Measures and Casts
            expr("try_cast(cost as double)").alias("cost"),
            expr("try_cast(year as int)").alias("year"),
            expr("try_cast(has_license as boolean)").alias("has_license"),
            expr("try_cast(power as int)").alias("power"),
            expr("try_cast(probeg as long)").alias("probeg"),
            
            # Metadata for Auditing
            lit("chunk2_incremental").alias("source_file"),
            current_timestamp().alias("load_timestamp")
        )
    )

## Unifying bronze_transactions and bronze_transactions_incremental_dlt into one single table.

In [0]:
@dlt.table(
    name="bronze_transactions_unified",
    comment="Union of manual ingestions and DLT incremental ingestion"
)
def bronze_transactions_unified():
    # Read the manually ingested table as a static source
    df_manual = spark.read.table(f"{catalog_name}.{schema_name}.bronze_transactions")
    
    # Read the DLT-managed table
    # Note: Use dlt.read() to reference other tables in the same pipeline
    df_dlt = dlt.read("bronze_transactions_incremental_dlt")
    
    # Return the union
    return df_manual.unionByName(df_dlt, allowMissingColumns=True)

## Ingesting Photo csv data

In [0]:
photo_source_location = f"{volume_path}/photo_data/" # Update to your actual path

@dlt.table(
    name="bronze_photos",
    comment="Ingestion of transaction photo URLs"
)
def bronze_photos():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .load(photo_source_location)
        .select(
            # Removing the first column (index '0') and casting
            col("id").cast("string"),
            col("photo_url").cast("string")
        )
    )

## Ingesting text data

In [0]:
# Update this path to the actual location of your 1_text.csv
text_source_location = f"{volume_path}/text_data/"

@dlt.table(
    name="bronze_texts",
    comment="Ingesting car descriptions with long ID"
)
def bronze_texts():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("multiline", "true") 
        .load(text_source_location)
        .select(
            # Convert "47937696.0" -> 47937696 (long)
            expr("cast(cast(id as double) as long)").alias("id"),
            col("text").cast("string")
        )
    )

## Ingesting geographic data

In [0]:
geo_source_location = f"{volume_path}/geo_data/"

@dlt.table(
    name="bronze_geographic",
    comment="Geographic mapping for city names and coordinates"
)
def bronze_geographic():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .load(geo_source_location)
        .select(
            col("name_padesh").alias("city_name"),
            # This 'greate_padesh' is what matches your 'place' column in transactions
            col("greate_padesh").alias("city_prepositional"), 
            col("lat").cast("double"),
            col("lon").cast("double"),
            lit("final_geografic.csv").alias("source_file"),
            current_timestamp().alias("load_timestamp")
        )
    )

## Ingesting catalogs data


In [0]:
catalog_source_location = f"{volume_path}/catalogs"

@dlt.table(
    name="bronze_catalogs",
    comment="Individual Project: Correcting Russian headers and semicolon delimiter"
)
def bronze_catalogs():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("delimiter", ";")  # Mandatory for your specific file
        .load(catalog_source_location)
        .select(
            # We MUST alias these Russian names to English to build the Star Schema later
            col("Марка").alias("marka"),
            col("Модель").alias("model"),
            col("Поколение").alias("generation"),
            col("Комплектация").alias("version"),
            col("Тип кузова").alias("body_type"),
            lit("catalogs.csv").alias("source_file"),
            current_timestamp().alias("load_timestamp")
        )
    )