In [0]:
from pyspark.sql.functions import *

In [0]:
dbutils.widgets.text("catalog_name", "")
dbutils.widgets.text("schema_name", "")
dbutils.widgets.text("table_name", "")

In [0]:
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
table_name = dbutils.widgets.get("table_name")

In [0]:
table = catalog_name + '.' + schema_name + '.' + table_name
schemaLocation = f'/Volumes/{catalog_name}/bronze/metadata_table/{table_name}/schema'
checkpointLocation = f'/Volumes/{catalog_name}/bronze/metadata_table/{table_name}/checkpoint'
csv_path = f'/Volumes/{catalog_name}/bronze/raw_data/{table_name}/'

In [0]:
df = spark.readStream.format("cloudFiles").option("cloudFiles.format", "csv").option("cloudFiles.schemaLocation", f'{schemaLocation}').option("header", "true").option("cloudFiles.schemaEvolutionMode", "none").load(f'{csv_path}')

In [0]:
df = df.withColumn("ETL_Date", current_timestamp())
df.writeStream.option("checkpointLocation", f'{checkpointLocation}').outputMode("append").trigger(availableNow=True).toTable(f'{table}')


In [0]:
output_count = spark.table(f"{table}").count()
input_count = output_count

spark.sql(f"""
INSERT INTO olist_db.platform_audit.pipeline_audit
SELECT
  'olist_pipeline'              AS pipeline_name,
  'bronze'                      AS layer,
  'customers'                   AS table_name,

  uuid()                        AS batch_id,
  current_date()                AS run_date,
  current_timestamp()           AS run_timestamp,

  '{csv_path}' AS source_table,
  '{table}' AS target_table,

  '{input_count}'                AS input_records,
  '{output_count}'                AS output_records,
  0                             AS rejected_records,

  map()                         AS expectation_failures,

  'NONE'                        AS scd_type,
  'INCREMENTAL'                 AS load_type,

  'SUCCESS'                     AS status,
  NULL                          AS error_message,
  current_timestamp()           AS created_at
""")
