In [0]:
# bronze layer paths (recommended: use a dedicated volume or DBFS path)
BRONZE_PATH = "/Volumes/project1/default/bronze/"

# Raw data location (Volume path where you uploaded the files)
SILVER_PATH = "/Volumes/project1/default/silver/"

print(f"Silver data path: {SILVER_PATH}")
print(f"Bronze path: {BRONZE_PATH}")

In [0]:
from pyspark.sql.functions import current_timestamp, lit

# Read Customers CSV
customers_silver = (spark.read
                 .option("header", "true")
                 .option("inferSchema", "true")
                 .csv(f"{BRONZE_PATH}customers_bronze.csv"))

customers_silver.display()

In [0]:
customers_silver.printSchema()
customers_silver.count()

In [0]:
# Clean Customers

import pyspark.sql.functions as F
from pyspark.sql.functions import col, trim, lower, regexp_replace, when
from pyspark.sql.types import IntegerType

customers_silver_clean_name = (customers_silver
    .withColumn("customer_id", col("customer_id").cast(IntegerType()))
    .withColumn("name", trim(lower(regexp_replace(col("name"), "[^a-zA-Z ]", ""))))
    .withColumn("email", trim(lower(regexp_replace(col("email"), "[^a-zA-Z0-9@.]", ""))))
    # Handle nulls
    .withColumn("name", when(col("name").isNull() | (col("name") == ""), "Guest").otherwise(col("name")))
    .withColumn("email", when(col("email").isNull() | (col("email") == "") | ~col("email").contains("@"), 
                             "unknown@example.com").otherwise(col("email")))
    # Remove duplicates
    .dropDuplicates(["customer_id"])
)

# customers_silver_clean_name.display(10, truncate=False)
customers_silver_clean_name.display()

In [0]:
customers_silver_clean_name.count()


In [0]:
# Write the DataFrame as a Delta table in schema 'silver' with table name 'customers'
(customers_silver_clean_name
    .write
    .format("delta")
    .mode("overwrite")                  # Change to "append" for incremental loads
    .option("overwriteSchema", "true")  # Optional: allows schema changes when overwriting
    .saveAsTable("project1.silver.customers")
)

In [0]:
# write Customers CSV to SILVER_PATH

(customers_silver_clean_name
    .write
    .mode("overwrite")          # use "append" if incremental
    .option("header", "true")
    .csv(f"{SILVER_PATH}/customers_silver_tmp")
)

In [0]:
# List files
display(dbutils.fs.ls(f"{SILVER_PATH}/customers_silver_tmp"))
files = dbutils.fs.ls(f"{SILVER_PATH}/customers_silver_tmp")

In [0]:
# List all files and folders inside directory
display(dbutils.fs.ls(SILVER_PATH))

In [0]:
# Find the CSV part file
display([f.path for f in files if f.name.endswith(".csv")][0])
csv_file = [f.path for f in files if f.name.endswith(".csv")][0]

In [0]:
# Final target path
final_path = f"{SILVER_PATH}/customers_silver.csv"

In [0]:
# Move & rename
dbutils.fs.mv(csv_file, final_path)

In [0]:
# Remove temp folder
dbutils.fs.rm(f"{SILVER_PATH}/customers_silver_tmp", recurse=True)

In [0]:
display(dbutils.fs.ls(SILVER_PATH))

In [0]:
from pyspark.sql.functions import current_timestamp, lit

# Read Customers CSV
customers_silver_new = (spark.read
                 .option("header", "true")
                 .option("inferSchema", "true")
                 .csv(f"{BRONZE_PATH}customer_new.csv"))

customers_silver_new.display()

In [0]:
# Write the DataFrame as a Delta table in schema 'silver' with table name 'customers'
(customers_silver_new 
    .write
    .format("delta")
    .mode("append")                  # Change to "append" for incremental loads
    .option("overwriteSchema", "true")  # Optional: allows schema changes when overwriting
    .saveAsTable("project1.silver.customers")
)

In [0]:
print("customers_silver_new", customers_silver_new.count())
print("customers_silver", spark.table("project1.silver.customers").count()

In [None]:
print(a+b)

In [None]:
print(1+2)