In [0]:
# # # master reset
# # # Remove all files from inbound volume
# # dbutils.fs.rm(INBOUND_PATH, True)

# # # Delete bronze and invalid tables
# spark.sql(f"DROP TABLE IF EXISTS {BRONZE_TABLE}")
# spark.sql(f"DROP TABLE IF EXISTS {INVALID_TABLE}")

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS studio.`databricks-notes-001`")
spark.sql("CREATE VOLUME IF NOT EXISTS studio.`databricks-notes-001`.inbound")
spark.sql("CREATE VOLUME IF NOT EXISTS studio.`databricks-notes-001`.quarantine")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when

In [0]:
INBOUND_PATH = "/Volumes/studio/databricks-notes-001/inbound"
BRONZE_TABLE = "studio.`databricks-notes-001`.bronze"
INVALID_TABLE = "studio.`databricks-notes-001`.invalid"

In [0]:

schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("RateCodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])

In [0]:
df = spark.read.csv(
    INBOUND_PATH,
    header=True,
    schema=schema
).withColumn(
    "filename", col("_metadata.file_path")
).withColumn(
    "ingestiontimestamp", F.current_timestamp()
)

In [0]:
df_validated = df.withColumn(
    "is_valid",
    when(
        (col("fare_amount") < 0) &
        (col("extra") >= 0) &
        (col("mta_tax") >= 0) &
        (col("tip_amount") >= 0),
        True
    ).otherwise(False)
).withColumn(
    "ValidationError",
    when(col("fare_amount") > 0, "fare_amount < 0")
    .when(col("extra") < 0, "extra < 0")
    .when(col("mta_tax") < 0, "mta_tax < 0")
    .when(col("tip_amount") < 0, "tip_amount < 0")
    .otherwise(None)
)


In [0]:

#  now delete the file as it was processed
dbutils.fs.rm(INBOUND_PATH, True)

In [0]:
df_valid = df_validated.filter(F.col("is_valid") == True)
df_invalid = df_validated.filter(F.col("is_valid") == False)

In [0]:
df.write.mode("overwrite").saveAsTable(BRONZE_TABLE)
df_invalid.write.mode("overwrite").saveAsTable(INVALID_TABLE)

In [0]:
dbutils.jobs.taskValues.set(
    key="has_invalid_data",
    value=True if df_invalid.count() > 0 else False
)