In [0]:
%run ../helpers/GdprUtils

In [0]:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("databricks.delta.properties.defaults.autoOptimize.optimizeWrite","true")
spark.conf.set("databricks.delta.properties.defaults.autoOptimize.autoCompact","true")
spark.conf.set("spark.sql.session.timeZone", "CET")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold","500mb")

In [0]:
dbutils.widgets.text('environment', 'dev')
prefix = "raw"
cluster = "bisnode"
database = cluster.replace("_", "/", 1) + "/fact"
pertitions_to_delete = set()                                                                                                             # add table name to this set if deletion needed
tables_quality_check = set()                                                                                                             # add table name to this set to check if data been updated

In [0]:
%run ../helpers/metastore_helpers

In [0]:
tables = sqlContext.tableNames(f"{environment}_{cluster}")                                                                               # list of all fact bisnode tables 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import date, datetime

## VDBNBAL

In [0]:
table_name = "VDBNBAL"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNBOA

In [0]:
table_name = "VDBNBOA"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNBPR

In [0]:
table_name = "VDBNBPR"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNBRA

In [0]:
table_name = "VDBNBRA"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNEMP

In [0]:
table_name = "VDBNEMP"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNFUN

In [0]:
table_name = "VDBNEMP"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNINC

In [0]:
table_name = "VDBNINC"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNKEF

In [0]:
table_name = "VDBNKEF"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNLIN

In [0]:
table_name = "VDBNLIN"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNLOB

In [0]:
table_name = "VDBNLOB"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNMOS

In [0]:
table_name = "VDBNMOS"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNORG

In [0]:
table_name = "VDBNORG"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols)

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

window = Window.partitionBy("ORG_REG_NUM").orderBy(desc("DB_TIMESTAMP"))                                                            # ranking / windowing to get recent relevant rows without duplicates

raw_recent_relevant = (bronze_data.withColumn("rank", dense_rank().over(window))
                                 .filter(col("rank") == 1)
                                 .drop(col("rank")))

anti_join_data = silver_data.join(raw_recent_relevant, on="ORG_REG_NUM", how="anti")                                                # autobroadcast Threshold 500 MB, if less --> sort merge join

new_silver_data = (raw_recent_relevant.unionByName(anti_join_data)
                          .selectExpr(cols)
                          .withColumn("CETLoadDate", current_timestamp()))
new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNRAH

In [0]:
table_name = "VDBNRAH"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNRAT

In [0]:
table_name = "VDBNRAT"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNRIB

In [0]:
table_name = "VDBNRIB"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNRIH

In [0]:
table_name = "VDBNRIH"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNROC

In [0]:
table_name = "VDBNROC"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNROP

In [0]:
table_name = "VDBNROP"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNSIC

In [0]:
table_name = "VDBNSIC"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNSIG

In [0]:
table_name = "VDBNSIG"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNVEC

In [0]:
table_name = "VDBNVEC"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## VDBNOWN_FI

In [0]:
table_name = "VDBNOWN_FI"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## DUN_BRADSTR_COM_CRM

In [0]:
table_name = "DUN_BRADSTR_COM_CRM"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## DUN_BRADSTREET_COM_CRM

In [0]:
table_name = "DUN_BRADSTREET_COM_CRM"
clb = ColumnListBuilder(table_name, True)
cols = clb.BuildColumnListCL1()

bronze_path = f'raw_{environment}_{cluster}.{table_name}'                                                                           # path for reading
silver_path = f'{environment}_{cluster}.{table_name}'                                                                               # path for reading and overwriting

CreateTableOnExistingData('bronze', f'{database}/{table_name}', f'{prefix}_{environment}_{cluster}.{table_name}', 'parquet')        # the "environment" varible we got from metastore_helpers

bronze_data = spark.table(bronze_path).selectExpr(cols).distinct()

if table_name.lower() not in tables:                                                                                                # if silver does not exists, create it from bronze then use that data in the pipeline
    bronze_data.write.option("overwriteSchema", "True").format("delta").mode("overwrite").saveAsTable(silver_path)                  
    print("Silver table does not exist, created a new one.")
silver_data = spark.table(silver_path).selectExpr(cols).distinct()

new_silver_data = (silver_data.union(bronze_data)
                       .withColumn("CETLoadDate", current_timestamp()))

new_silver_data.write.option("overwriteSchema", "True").format('delta').mode('overwrite').saveAsTable(silver_path)

pertitions_to_delete.add(table_name)
tables_quality_check.add(silver_path)

## REMOVE DATA FROM RAW LAYER

In [0]:
if environment == 'prod':
  for part in pertitions_to_delete:
    delete_partitions(part, database)

## Data Quality Checking

In [0]:
for table in tables_quality_check:                                                      # throw an error if the data has not been updated
  data_updated(table)