In [0]:
# Import session
from pyspark.sql import SparkSession

# Spark
spark = SparkSession.builder.appName('Audit_table').getOrCreate()

In [0]:
%fs
ls /user/hive/warehouse/a_target.db/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/a_target.db/employee/,employee/,0,0
dbfs:/user/hive/warehouse/a_target.db/product/,product/,0,0
dbfs:/user/hive/warehouse/a_target.db/student/,student/,0,0


In [0]:
dbutils.fs.rm('dbfs:/user/hive/warehouse/', recurse=True)

Out[265]: True

In [0]:
# Fetch the list of databases
databases = spark.sql("SHOW DATABASES").collect()

# Drop each database
for db in databases:
    db_name = db.databaseName
    print(db_name)
    if db_name != 'default':  # Skip 'default' database or any other system database you don't want to drop
        spark.sql(f"DROP DATABASE {db_name} CASCADE")

a_source
a_target
default


In [0]:
%sql
create database a_target;

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *
import uuid

# Create schema for streaming data
student_schema = StructType(
    [
        StructField('id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('dep', StringType(), True),
        ]
)

# Define data with UUIDs and current date
student_data = [
    (1, 'Aravindh', 'Mct'),
    (2, 'Vasanth', 'Mct'),
    (3, 'Jana', 'Mct'),    
    (4, 'Pavithran', 'Mct')
]

# Create DataFrame
df = spark.createDataFrame(data=student_data, schema=student_schema)

# Adding uuid
df = df.withColumn('flag', lit('Y')) \
    .withColumn('start_date', lit('1900-01-01').cast('date')) \
    .withColumn('end_date', lit('9999-12-31').cast('date')) \
    .withColumn('inserted_date', current_timestamp())\
    .withColumn('updated_date', current_timestamp()) \
    .withColumn('inserted_batch_id', lit(str(uuid.uuid4()))) \
    .withColumn('updated_batch_id', lit(str(uuid.uuid4()))) \
    .withColumn('file_name', lit('student'))

# Show DataFrame
df.show()

# Insert DataFrame into Delta table
df.write.format("delta").mode("overwrite").saveAsTable("a_target.student")

+---+---------+---+----+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+
| id|     name|dep|flag|start_date|  end_date|       inserted_date|        updated_date|   inserted_batch_id|    updated_batch_id|file_name|
+---+---------+---+----+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+
|  1| Aravindh|Mct|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|e83259c2-789e-4f0...|674000a8-26fd-47d...|  student|
|  2|  Vasanth|Mct|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|e83259c2-789e-4f0...|674000a8-26fd-47d...|  student|
|  3|     Jana|Mct|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|e83259c2-789e-4f0...|674000a8-26fd-47d...|  student|
|  4|Pavithran|Mct|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|e83259c2-789e-4f0...|674000a8-26fd-47d...|  student|
+---+--------

In [0]:
product_schema = StructType(
    [
        StructField('id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('category', StringType(), True)
    ]
)

product_data = [
    (1, 'Laptop', 'Electronics'),
    (2, 'Chair', 'Furniture'),
    (3, 'Desk', 'Furniture'),
    (4, 'Smartphone', 'Electronics'),
    (5, 'Monitor', 'Electronics')
]

# Create DataFrame
df = spark.createDataFrame(data=product_data, schema=product_schema)

# Adding uuid
df = df.withColumn('flag', lit('Y')) \
    .withColumn('start_date', lit('1900-01-01').cast('date')) \
    .withColumn('end_date', lit('9999-12-31').cast('date')) \
    .withColumn('inserted_date', current_timestamp()) \
    .withColumn('updated_date', current_timestamp()) \
    .withColumn('inserted_batch_id', lit(str(uuid.uuid4()))) \
    .withColumn('updated_batch_id', lit(str(uuid.uuid4()))) \
    .withColumn('file_name', lit('product'))

# Show DataFrame
df.show()

# Insert DataFrame into Delta table
df.write.format("delta").mode("overwrite").saveAsTable("a_target.product")

+---+----------+-----------+----+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+
| id|      name|   category|flag|start_date|  end_date|       inserted_date|        updated_date|   inserted_batch_id|    updated_batch_id|file_name|
+---+----------+-----------+----+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+
|  1|    Laptop|Electronics|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|5f473fb6-c660-42e...|d258c67c-b0c3-453...|  product|
|  2|     Chair|  Furniture|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|5f473fb6-c660-42e...|d258c67c-b0c3-453...|  product|
|  3|      Desk|  Furniture|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|5f473fb6-c660-42e...|d258c67c-b0c3-453...|  product|
|  4|Smartphone|Electronics|   Y|1900-01-01|9999-12-31|2024-08-27 06:33:...|2024-08-27 06:33:...|5f4

In [0]:
employee_schema = StructType(
    [
        StructField('id', StringType(), True),
        StructField('name', StringType(), True),
        StructField('department', StringType(), True),
        StructField('salary', IntegerType(), True)
    ]
)

employee_data = [
    (1, "John Doe", "Engineering", 75000),
    (2, "Jane Smith", "Marketing", 65000),
    (3, "Sam Brown", "Sales", 55000),
    (4, "Emily Davis", "HR", 60000),
    (5, "Michael Johnson", "Finance", 70000)
]

# Create DataFrame
df = spark.createDataFrame(data=employee_data, schema=employee_schema)

# Adding uuid
df = df.withColumn('flag', lit('Y')) \
    .withColumn('start_date', lit('1900-01-01').cast('date')) \
    .withColumn('end_date', lit('9999-12-31').cast('date')) \
    .withColumn('inserted_date', current_timestamp()) \
    .withColumn('updated_date', current_timestamp()) \
    .withColumn('inserted_batch_id', lit(str(uuid.uuid4()))) \
    .withColumn('updated_batch_id', lit(str(uuid.uuid4()))) \
    .withColumn('file_name', lit('employee'))

# Show DataFrame
df.show()

# Insert DataFrame into Delta table
df.write.format("delta").mode("overwrite").saveAsTable("a_target.employee")

+---+---------------+-----------+------+----+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+
| id|           name| department|salary|flag|start_date|  end_date|       inserted_date|        updated_date|   inserted_batch_id|    updated_batch_id|file_name|
+---+---------------+-----------+------+----+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+
|  1|       John Doe|Engineering| 75000|   Y|1900-01-01|9999-12-31|2024-08-27 06:34:...|2024-08-27 06:34:...|f3dd1c2c-07e1-45e...|8b6705ce-6c68-4aa...| employee|
|  2|     Jane Smith|  Marketing| 65000|   Y|1900-01-01|9999-12-31|2024-08-27 06:34:...|2024-08-27 06:34:...|f3dd1c2c-07e1-45e...|8b6705ce-6c68-4aa...| employee|
|  3|      Sam Brown|      Sales| 55000|   Y|1900-01-01|9999-12-31|2024-08-27 06:34:...|2024-08-27 06:34:...|f3dd1c2c-07e1-45e...|8b6705ce-6c68-4aa...| employee|
|  4|    Emily Davis|       

# Create a Source Data

In [0]:
%sql
create database a_source

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# Create schema for streaming data
student_schema = StructType(
    [
        StructField('id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('dep', StringType(), True),
        StructField('last_modified', DateType(), True)
    ]
)

student_data = [
    (5, 'Nishanth', 'Mech', date(2024,8,14)),
    (6, 'Vikram', 'Mct', date(2024,8,14)),
    (3, 'Jana', 'Mech', date(2024,8,14)),
    (4, 'Pavithran', 'Cse', date(2024,8,14))
]

s_student = spark.createDataFrame(data = student_data, schema = student_schema)
s_student.display()

id,name,dep,last_modified
5,Nishanth,Mech,2024-08-14
6,Vikram,Mct,2024-08-14
3,Jana,Mech,2024-08-14
4,Pavithran,Cse,2024-08-14


In [0]:
s_student.write.partitionBy('last_modified').format('delta').mode('overwrite').saveAsTable('a_source.student')

In [0]:
product_schema = StructType(
    [
        StructField('id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('category', StringType(), True),
        StructField('last_modified', DateType(), True)
    ]
)

product_data = [
    (1, 'Bed', 'Furniture', date(2024,8,14)),
    (2, 'PS4', 'Electronics', date(2024,8,14)),
    (6, 'Fan', 'Electronics', date(2024,8,14))
]

s_product = spark.createDataFrame(data = product_data, schema = product_schema)
s_product.display()

id,name,category,last_modified
1,Bed,Furniture,2024-08-14
2,PS4,Electronics,2024-08-14
6,Fan,Electronics,2024-08-14


In [0]:
s_product.write.partitionBy('last_modified').format('delta').mode('overwrite').saveAsTable('a_source.product')

In [0]:
# Define schema
employee_schema = StructType(
    [
        StructField('id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('department', StringType(), True),
        StructField('salary', IntegerType(), True),
        StructField('last_modified', DateType(), True)
    ]
)

# Define data
employee_data = [
    (1, "Aravindh", "Jobless", 0, date(2014, 8, 14)),
    (1, "Aravindh", "Engineering", 75000, date(2024, 8, 14)),
    (6, "Pavithran", "Marketing", 65000, date(2024, 8, 14)),
    (7, "Samantha", "Sales", 55000, date(2024, 8, 14)),
    (1, "Aravindh", "IT", 75000, date(2024, 8, 15)),
    (8, "Bala", "Engineering", 75000, date(2024, 8, 15))
]

# Create DataFrame
s_employee = spark.createDataFrame(data=employee_data, schema=employee_schema)

# Show the DataFrame
s_employee.show()

+---+---------+-----------+------+-------------+
| id|     name| department|salary|last_modified|
+---+---------+-----------+------+-------------+
|  1| Aravindh|    Jobless|     0|   2014-08-14|
|  1| Aravindh|Engineering| 75000|   2024-08-14|
|  6|Pavithran|  Marketing| 65000|   2024-08-14|
|  7| Samantha|      Sales| 55000|   2024-08-14|
|  1| Aravindh|         IT| 75000|   2024-08-15|
|  8|     Bala|Engineering| 75000|   2024-08-15|
+---+---------+-----------+------+-------------+



In [0]:
s_employee.write.format('delta').mode('overwrite').partitionBy('last_modified').saveAsTable('a_source.employee')

In [0]:
# Define the target path where the tables are stored
t_path = '/user/hive/warehouse/a_target.db'

# List all directories (table) inside the path
t_tables = dbutils.fs.ls(t_path)

# Loop through each target tables
target_tbl = []
for table in t_tables:
    a = table.name.strip('/')
    target_tbl.append(a)

# Define the source path where the tables are stored
s_path = '/user/hive/warehouse/a_source.db'

# List all directories (table) inside the path
s_tables = dbutils.fs.ls(s_path)

# Loop through each source tables
source_tbl = []
for table in s_tables:
    a = table.name.strip('/')
    source_tbl.append(a)

print(f'The target tables are {target_tbl}')
print(f'The source tables are {source_tbl}')

The target tables are ['employee', 'product', 'student']
The source tables are ['employee', 'product', 'student']


In [0]:
older_date = datetime.now() - timedelta(days = 90)
older_date = older_date.strftime('%Y-%m-%d')
print(older_date)

2024-05-29


In [0]:
# Date older than 90
older_date = datetime.now() - timedelta(days = 90)
older_date = older_date.strftime('%Y-%m-%d')
max_dates = {}
# Defining a function
def process_table(tbl: str, keys: dict):
    target = None
    try:
        tbl_path = f'dbfs:/user/hive/warehouse/a_source.db/{tbl}'

        # Getting all partitions inside the directory
        partitions = dbutils.fs.ls(tbl_path)

        # Initialize max_last_modified variable
        max_last_modified = None

        # Extracting the data from directory
        dates = []
        for data in partitions:
            files = data.name.strip('/')

            if 'last_modified=' in files:
                date = files.split('=')[1]
                dates.append(date)

        # Sort dates to ensure processing in the correct order
        dates.sort()

        # Process each date in the sorted list
        for date in dates:
            max_last_modified = dates[-1]  # Get the latest date from the sorted list
            stored_max_date = max_dates.get(tbl)

            '''# Set max_last_modified to the current date if it's not set
            if max_last_modified is None:
                max_last_modified = date  '''

            # If no stored date or the current max_last_modified is greater, process the table
            if not stored_max_date or (max_last_modified > stored_max_date):
                print(f"Processing table {tbl} with last_modified {stored_max_date}")
                max_dates[tbl] = date

                # Construct the full path from the specific partition
                path_date = f'{tbl_path}/last_modified={date}'

                # Load the source DataFrame
                source = spark.read.format('delta').load(path_date)

                # Load the corresponding target table 
                target = spark.read.format('delta').load(f'dbfs:/user/hive/warehouse/a_target.db/{tbl}')

                # Get the primary key for the current table
                primary_key_col = keys.get(tbl)

                # Perform the join operation using the dynamic primary key column
                join_df = source.join(
                    target,
                    (source[primary_key_col] == target[primary_key_col]) & (target['flag'] == 'Y'),
                    'leftouter'
                ).select(
                    source['*'],
                    *[target[col].alias(f't_{col}') for col in source.columns if col in target.columns]
                )

                join_df = join_df.drop('last_modified')

                # Get columns from the join_df 
                source_col = [col for col in join_df.columns if not col.startswith('t_')]
                target_col = [col for col in join_df.columns if col.startswith('t_')]

                # Construct the hash condition
                hash_condition = xxhash64(*[join_df[col] for col in source_col]) != xxhash64(*[join_df[col] for col in target_col])
                
                # Filter to get the rows where the hash values differ
                filter_df = join_df.filter(hash_condition)
                    
                # Add merge key
                merge_df = filter_df.withColumn('Merge_key', filter_df[primary_key_col])

                # Add dummy merge key
                dummy_df = filter_df.filter(f't_{primary_key_col} is not null').withColumn('Merge_key', lit('None'))

                # Union both merge and dummy keys
                scd_df = merge_df.union(dummy_df)
                    
                # Define start_date for new and existing records dynamically
                start_date_expr = when(col('Merge_key') == 'None', lit(current_date())).otherwise('1900-01-01')

                # Apply the start_date_expr to add the start_date column in scd_df
                scd_df = scd_df.withColumn('start_date', start_date_expr)

                target_table = DeltaTable.forPath(spark, f'dbfs:/user/hive/warehouse/a_target.db/{tbl}')
                insert_values = {col: f'source.{col}' for col in scd_df.columns if not col.startswith('t_') and col != 'Merge_key'}

                # Generate a single inserted_batch_id and updated_batch_id
                inserted_batch_id = str(uuid.uuid4())
                updated_batch_id = str(uuid.uuid4())

                insert_values.update({
                    'flag': lit('Y'),
                    'end_date': lit('9999-12-31'),
                    'inserted_date': current_timestamp(),
                    'updated_date': current_timestamp(),
                    'inserted_batch_id': lit(inserted_batch_id),
                    'updated_batch_id': lit(updated_batch_id),
                    'file_name': lit(tbl)
                })

                target_table.alias('target').merge(
                    source=scd_df.alias('source'),
                    condition=f"target.{primary_key_col} = source.Merge_Key and target.flag = 'Y'"
                ).whenMatchedUpdate(
                    set={
                        'flag': lit('N'),
                        'end_date': date_sub(current_date(), 1),
                        'inserted_date': current_timestamp(),
                        'updated_date': current_timestamp(),
                        'updated_batch_id': lit(inserted_batch_id),
                        'file_name': lit(tbl)
                    }
                ).whenNotMatchedInsert(
                    values=insert_values
                ).execute()

                # If successful, extract Delta operation metrics for audit logging
                delta_df = DeltaTable.forPath(spark, f'dbfs:/user/hive/warehouse/a_target.db/{tbl}')
                last_operation = delta_df.history(1)

                explode_df = last_operation.select(last_operation.version, last_operation.operation, explode(last_operation.operationMetrics))
                explode_df_sel = explode_df.select(explode_df.version, explode_df.operation, explode_df.key, explode_df.value.cast('int'))

                pivot = explode_df_sel.groupBy(['version', 'operation']).pivot('key').sum('value')

                # Extract the required metrics 
                num_op_rows = pivot.select(pivot['numOutputRows']).collect()[0][0]
                num_target_inserted = pivot.select(pivot['numTargetRowsInserted']).collect()[0][0]
                num_target_updated = pivot.select(pivot['numTargetRowsUpdated']).collect()[0][0]
                num_source_rows = pivot.select(pivot['numSourceRows']).collect()[0][0]
                version = pivot.select(pivot['version']).collect()[0][0]

                # Prepare the audit log record
                audit_log_record = spark.createDataFrame([
                        (
                            version,
                            'Audit_column',                                     # job_name
                            path_date,                                          # source_name
                            tbl,                                                # source_table_name
                            tbl,                                                # target_table_name
                            datetime.now(),                                     # load_date
                            'Completed',                                        # job_status
                            num_source_rows,                                    # source_received_count
                            num_target_inserted,                                # new_inserted_count
                            num_target_updated,                                 # new_updated_count
                            num_target_inserted + num_target_updated,           # total_inserted_count
                            num_target_updated,                                 # total_updated_count
                            num_source_rows - num_op_rows,                      # skip_count
                            tbl,                                                # filename_ts
                            tbl,                                                # source_file_name
                            str(uuid.uuid4())                                   # batch_id
                        )
                    ], [
                        'job_name', 'source_name', 'source_table_name', 'target_table_name', 'load_date', 'job_status',
                        'source_received_count', 'new_inserted_count', 'new_updated_count', 'total_inserted_count',
                        'total_updated_count', 'skip_count', 'filename_ts', 'source_file_name', 'batch_id'
                    ])

                # Insert the audit log record into the audit_log table
                audit_log_record.write.insertInto('audit_log')

                if date < older_date: 
                    # Construct the SQL command for deletion
                    delete_query = f"DELETE FROM a_source.{tbl} WHERE last_modified = '{date}'"
                    
                    # Execute the delete query
                    spark.sql(delete_query)
                    print(f'Deleted old file: {path_date}')

                else:
                    print('All files in the table are above 90 days')

            else:
                print(f"No new data to process for table {tbl}. Latest date {max_last_modified} is not greater than stored date {stored_max_date}.")

        else:
            print(f"No valid last_modified dates found for table {tbl}.")

        # Store the max last_modified date for the current table in the dictionary
        if max_last_modified:
            max_dates[tbl] = max_last_modified
            print(f'Max last_modified date for {tbl}: {max_last_modified}')

    except Exception as e:
        # Handle the exception and update audit log with 'Failed' status
        audit_log_record = spark.createDataFrame([
            (
                version,
                'Audit_column',                                     # job_name
                path_date if 'path_date' in locals() else '',       # source_name
                tbl,                                                # source_table_name
                tbl,                                                # target_table_name
                datetime.now(),                                     # load_date
                'Failed',                                           # job_status
                None,                                               # source_received_count
                None,                                               # new_inserted_count
                None,                                               # new_updated_count
                None,                                               # total_inserted_count
                None,                                               # total_updated_count
                None,                                               # skip_count
                date,                                               # filename_ts
                tbl,                                                # source_file_name
                str(uuid.uuid4())                                   # batch_id
            )
        ], [
            'job_name', 'source_name', 'source_table_name', 'target_table_name', 'load_date', 'job_status',
            'source_received_count', 'new_inserted_count', 'new_updated_count', 'total_inserted_count',
            'total_updated_count', 'skip_count', 'filename_ts', 'source_file_name', 'batch_id'
        ])
        audit_log_record.write.insertInto('audit_log')
        print(f"Error occurred while processing table {tbl}: {e}")

    finally:
        # Ensure target DataFrame is defined before displaying
        if target is not None:
            print(f'The final result of {tbl} table is:')
            df = target.orderBy(f'{primary_key_col}')
            display(df)


In [0]:
print(max_dates)

{}


# Audit Log Table

In [0]:
%sql
drop table audit_log

In [0]:
%sql
create table audit_log(
  version int,
  job_name string,
  source_name string,
  source_table_name string,
  target_table_name string,
  load_date timestamp,
  job_status string,
  source_received_count int,
  new_inserted_count int,
  new_updated_count int,
  total_inserted_count int,
  total_updated_count int,
  skip_count int,
  filename_ts string,
  source_file_name string,
  batch_id string
)

In [0]:
keys = {
    'employee' : 'id',
    'product' : 'id',
    'student' : 'id'
}

In [0]:
from delta.tables import DeltaTable
# Process each table in the source_tbl list
for tbl in source_tbl:
    process_table(tbl, keys)

Processing table employee with last_modified None
Deleted old file: dbfs:/user/hive/warehouse/a_source.db/employee/last_modified=2014-08-14
Processing table employee with last_modified 2014-08-14
All files in the table are above 90 days
Processing table employee with last_modified 2024-08-14
All files in the table are above 90 days
No valid last_modified dates found for table employee.
Max last_modified date for employee: 2024-08-15
The final result of employee table is:


id,name,department,salary,flag,start_date,end_date,inserted_date,updated_date,inserted_batch_id,updated_batch_id,file_name
1,Aravindh,IT,75000,Y,2024-08-27,9999-12-31,2024-08-27T06:35:08.623+0000,2024-08-27T06:35:08.623+0000,494d6856-4fda-48bd-9b51-17136cfb5e5d,669fcd56-bae3-4791-9764-2a29db3a33c0,employee
1,John Doe,Engineering,75000,N,1900-01-01,2024-08-26,2024-08-27T06:34:32.103+0000,2024-08-27T06:34:32.103+0000,f3dd1c2c-07e1-45ed-905f-d6e787c7a4aa,c1a978b0-fa2b-483d-97a5-d0daaf7a266b,employee
1,Aravindh,Engineering,75000,N,2024-08-27,2024-08-26,2024-08-27T06:35:08.623+0000,2024-08-27T06:35:08.623+0000,ab3de10e-bffd-4e85-bebc-776f277f8ccc,494d6856-4fda-48bd-9b51-17136cfb5e5d,employee
1,Aravindh,Jobless,0,N,2024-08-27,2024-08-26,2024-08-27T06:34:50.228+0000,2024-08-27T06:34:50.228+0000,c1a978b0-fa2b-483d-97a5-d0daaf7a266b,ab3de10e-bffd-4e85-bebc-776f277f8ccc,employee
2,Jane Smith,Marketing,65000,Y,1900-01-01,9999-12-31,2024-08-27T06:34:03.353+0000,2024-08-27T06:34:03.353+0000,f3dd1c2c-07e1-45ed-905f-d6e787c7a4aa,8b6705ce-6c68-4aac-94d6-6dd80665767a,employee
3,Sam Brown,Sales,55000,Y,1900-01-01,9999-12-31,2024-08-27T06:34:03.353+0000,2024-08-27T06:34:03.353+0000,f3dd1c2c-07e1-45ed-905f-d6e787c7a4aa,8b6705ce-6c68-4aac-94d6-6dd80665767a,employee
4,Emily Davis,HR,60000,Y,1900-01-01,9999-12-31,2024-08-27T06:34:03.353+0000,2024-08-27T06:34:03.353+0000,f3dd1c2c-07e1-45ed-905f-d6e787c7a4aa,8b6705ce-6c68-4aac-94d6-6dd80665767a,employee
5,Michael Johnson,Finance,70000,Y,1900-01-01,9999-12-31,2024-08-27T06:34:03.353+0000,2024-08-27T06:34:03.353+0000,f3dd1c2c-07e1-45ed-905f-d6e787c7a4aa,8b6705ce-6c68-4aac-94d6-6dd80665767a,employee
6,Pavithran,Marketing,65000,Y,1900-01-01,9999-12-31,2024-08-27T06:34:50.228+0000,2024-08-27T06:34:50.228+0000,ab3de10e-bffd-4e85-bebc-776f277f8ccc,19bce467-6c7e-412e-9eef-4c8d7c8fdc78,employee
7,Samantha,Sales,55000,Y,1900-01-01,9999-12-31,2024-08-27T06:34:50.228+0000,2024-08-27T06:34:50.228+0000,ab3de10e-bffd-4e85-bebc-776f277f8ccc,19bce467-6c7e-412e-9eef-4c8d7c8fdc78,employee


Processing table product with last_modified None
All files in the table are above 90 days
No valid last_modified dates found for table product.
Max last_modified date for product: 2024-08-14
The final result of product table is:


id,name,category,flag,start_date,end_date,inserted_date,updated_date,inserted_batch_id,updated_batch_id,file_name
1,Bed,Furniture,Y,2024-08-27,9999-12-31,2024-08-27T06:35:29.184+0000,2024-08-27T06:35:29.184+0000,a6b7f0fb-ff42-4963-a2b0-353ebc986221,5b10551b-b276-43d3-83f2-a9f1dd9baf37,product
1,Laptop,Electronics,N,1900-01-01,2024-08-26,2024-08-27T06:35:29.184+0000,2024-08-27T06:35:29.184+0000,5f473fb6-c660-42e8-8b2b-9d780f15272d,a6b7f0fb-ff42-4963-a2b0-353ebc986221,product
2,PS4,Electronics,Y,2024-08-27,9999-12-31,2024-08-27T06:35:29.184+0000,2024-08-27T06:35:29.184+0000,a6b7f0fb-ff42-4963-a2b0-353ebc986221,5b10551b-b276-43d3-83f2-a9f1dd9baf37,product
2,Chair,Furniture,N,1900-01-01,2024-08-26,2024-08-27T06:35:29.184+0000,2024-08-27T06:35:29.184+0000,5f473fb6-c660-42e8-8b2b-9d780f15272d,a6b7f0fb-ff42-4963-a2b0-353ebc986221,product
3,Desk,Furniture,Y,1900-01-01,9999-12-31,2024-08-27T06:33:59.536+0000,2024-08-27T06:33:59.536+0000,5f473fb6-c660-42e8-8b2b-9d780f15272d,d258c67c-b0c3-453b-b92e-f947eb93199b,product
4,Smartphone,Electronics,Y,1900-01-01,9999-12-31,2024-08-27T06:33:59.536+0000,2024-08-27T06:33:59.536+0000,5f473fb6-c660-42e8-8b2b-9d780f15272d,d258c67c-b0c3-453b-b92e-f947eb93199b,product
5,Monitor,Electronics,Y,1900-01-01,9999-12-31,2024-08-27T06:33:59.536+0000,2024-08-27T06:33:59.536+0000,5f473fb6-c660-42e8-8b2b-9d780f15272d,d258c67c-b0c3-453b-b92e-f947eb93199b,product
6,Fan,Electronics,Y,1900-01-01,9999-12-31,2024-08-27T06:35:29.184+0000,2024-08-27T06:35:29.184+0000,a6b7f0fb-ff42-4963-a2b0-353ebc986221,5b10551b-b276-43d3-83f2-a9f1dd9baf37,product


Processing table student with last_modified None
All files in the table are above 90 days
No valid last_modified dates found for table student.
Max last_modified date for student: 2024-08-14
The final result of student table is:


id,name,dep,flag,start_date,end_date,inserted_date,updated_date,inserted_batch_id,updated_batch_id,file_name
1,Aravindh,Mct,Y,1900-01-01,9999-12-31,2024-08-27T06:33:55.908+0000,2024-08-27T06:33:55.908+0000,e83259c2-789e-4f03-ae21-017c98c333da,674000a8-26fd-47dc-b5e1-c6bd24aeffe6,student
2,Vasanth,Mct,Y,1900-01-01,9999-12-31,2024-08-27T06:33:55.908+0000,2024-08-27T06:33:55.908+0000,e83259c2-789e-4f03-ae21-017c98c333da,674000a8-26fd-47dc-b5e1-c6bd24aeffe6,student
3,Jana,Mct,N,1900-01-01,2024-08-26,2024-08-27T06:35:48.183+0000,2024-08-27T06:35:48.183+0000,e83259c2-789e-4f03-ae21-017c98c333da,b6326898-02d0-4a05-bec8-3eb5d019c5f4,student
3,Jana,Mech,Y,2024-08-27,9999-12-31,2024-08-27T06:35:48.183+0000,2024-08-27T06:35:48.183+0000,b6326898-02d0-4a05-bec8-3eb5d019c5f4,c175b87b-c167-4526-b4ed-b2f56a38298f,student
4,Pavithran,Cse,Y,2024-08-27,9999-12-31,2024-08-27T06:35:48.183+0000,2024-08-27T06:35:48.183+0000,b6326898-02d0-4a05-bec8-3eb5d019c5f4,c175b87b-c167-4526-b4ed-b2f56a38298f,student
4,Pavithran,Mct,N,1900-01-01,2024-08-26,2024-08-27T06:35:48.183+0000,2024-08-27T06:35:48.183+0000,e83259c2-789e-4f03-ae21-017c98c333da,b6326898-02d0-4a05-bec8-3eb5d019c5f4,student
5,Nishanth,Mech,Y,1900-01-01,9999-12-31,2024-08-27T06:35:48.183+0000,2024-08-27T06:35:48.183+0000,b6326898-02d0-4a05-bec8-3eb5d019c5f4,c175b87b-c167-4526-b4ed-b2f56a38298f,student
6,Vikram,Mct,Y,1900-01-01,9999-12-31,2024-08-27T06:35:48.183+0000,2024-08-27T06:35:48.183+0000,b6326898-02d0-4a05-bec8-3eb5d019c5f4,c175b87b-c167-4526-b4ed-b2f56a38298f,student


In [0]:
%sql
select * from a_source.employee

id,name,department,salary,last_modified
1,Aravindh,Engineering,75000,2024-08-14
6,Pavithran,Marketing,65000,2024-08-14
8,Bala,Engineering,75000,2024-08-15
7,Samantha,Sales,55000,2024-08-14
1,Aravindh,IT,75000,2024-08-15


In [0]:
%sql 
select * from audit_log

version,job_name,source_name,source_table_name,target_table_name,load_date,job_status,source_received_count,new_inserted_count,new_updated_count,total_inserted_count,total_updated_count,skip_count,filename_ts,source_file_name,batch_id
1,Audit_column,dbfs:/user/hive/warehouse/a_source.db/employee/last_modified=2014-08-14,employee,employee,2024-08-27T06:34:44.743+0000,Completed,2,1,1,2,1,0,employee,employee,df81ce90-ef1f-4d62-9fbf-18c11f9ea5be
2,Audit_column,dbfs:/user/hive/warehouse/a_source.db/employee/last_modified=2024-08-14,employee,employee,2024-08-27T06:35:04.159+0000,Completed,4,3,1,4,1,0,employee,employee,65060d14-24ff-4179-9281-fd3f90ed23ea
3,Audit_column,dbfs:/user/hive/warehouse/a_source.db/employee/last_modified=2024-08-15,employee,employee,2024-08-27T06:35:23.105+0000,Completed,3,2,1,3,1,0,employee,employee,6ec3ab0a-1efb-40a8-94b1-1afd5e6e4b4b
1,Audit_column,dbfs:/user/hive/warehouse/a_source.db/product/last_modified=2024-08-14,product,product,2024-08-27T06:35:42.335+0000,Completed,5,3,2,5,2,0,product,product,5f947bc5-5df0-4b46-a72e-d777c68eb2ca
1,Audit_column,dbfs:/user/hive/warehouse/a_source.db/student/last_modified=2024-08-14,student,student,2024-08-27T06:36:01.298+0000,Completed,6,4,2,6,2,0,student,student,a4951ccc-e4ff-4ebd-b8de-5ec2f8f424bb


# Learning 

In [0]:
for tbl in target_tbl:
    delta_df = DeltaTable.forPath(spark, f'dbfs:/user/hive/warehouse/a_target.db/{tbl}')
    last_operation = delta_df.history(1)

    explode_df = last_operation.select(last_operation.operation, last_operation.version, explode(last_operation.operationMetrics))

    explode_df_sel = explode_df.select(explode_df.operation, explode_df.version,explode_df.key, explode_df.value.cast('int'))

    pivot = explode_df_sel.groupBy(['operation', 'version']).pivot('key').sum('value')
    pivot.display()

operation,version,executionTimeMs,numOutputRows,numSourceRows,numTargetBytesAdded,numTargetBytesRemoved,numTargetChangeFilesAdded,numTargetDeletionVectorsAdded,numTargetDeletionVectorsRemoved,numTargetFilesAdded,numTargetFilesRemoved,numTargetRowsCopied,numTargetRowsDeleted,numTargetRowsInserted,numTargetRowsMatchedDeleted,numTargetRowsMatchedUpdated,numTargetRowsNotMatchedBySourceDeleted,numTargetRowsNotMatchedBySourceUpdated,numTargetRowsUpdated,rewriteTimeMs,scanTimeMs
MERGE,3,6321,3,3,11660,3918,0,0,0,3,1,0,0,2,0,1,0,0,1,1599,1715


operation,version,executionTimeMs,numOutputRows,numSourceRows,numTargetBytesAdded,numTargetBytesRemoved,numTargetChangeFilesAdded,numTargetDeletionVectorsAdded,numTargetDeletionVectorsRemoved,numTargetFilesAdded,numTargetFilesRemoved,numTargetRowsCopied,numTargetRowsDeleted,numTargetRowsInserted,numTargetRowsMatchedDeleted,numTargetRowsMatchedUpdated,numTargetRowsNotMatchedBySourceDeleted,numTargetRowsNotMatchedBySourceUpdated,numTargetRowsUpdated,rewriteTimeMs,scanTimeMs
MERGE,1,6043,5,5,18265,7325,0,0,0,5,2,0,0,3,0,2,0,0,2,1475,1655


operation,version,executionTimeMs,numOutputRows,numSourceRows,numTargetBytesAdded,numTargetBytesRemoved,numTargetChangeFilesAdded,numTargetDeletionVectorsAdded,numTargetDeletionVectorsRemoved,numTargetFilesAdded,numTargetFilesRemoved,numTargetRowsCopied,numTargetRowsDeleted,numTargetRowsInserted,numTargetRowsMatchedDeleted,numTargetRowsMatchedUpdated,numTargetRowsNotMatchedBySourceDeleted,numTargetRowsNotMatchedBySourceUpdated,numTargetRowsUpdated,rewriteTimeMs,scanTimeMs
MERGE,1,6111,6,6,21654,7211,0,0,0,6,2,0,0,4,0,2,0,0,2,1618,1679


In [0]:
delta_df = DeltaTable.forPath(spark, f'dbfs:/user/hive/warehouse/a_target.db/{tbl}')
last_operation = delta_df.history(1)
last_operation.display()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2024-08-23T09:52:50.000+0000,4792984848781430,arvndh001@outlook.com,MERGE,"Map(predicate -> [""((id#33232 = cast(Merge_Key#33191 as int)) AND (flag#33235 = Y))""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2168272299856041),0823-083153-73wr5gxw,3,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 5119, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 1275, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 0, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 993)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
explode_df = last_operation.select(last_operation.operation, explode(last_operation.operationMetrics))
explode_df.display()

explode_df_sel = explode_df.select(explode_df.operation, explode_df.key, explode_df.value.cast('int'))
explode_df_sel.display()

operation,key,value
MERGE,numTargetRowsCopied,0
MERGE,numTargetRowsDeleted,0
MERGE,numTargetFilesAdded,0
MERGE,numTargetBytesAdded,0
MERGE,numTargetBytesRemoved,0
MERGE,numTargetDeletionVectorsAdded,0
MERGE,numTargetRowsMatchedUpdated,0
MERGE,executionTimeMs,5119
MERGE,numTargetRowsInserted,0
MERGE,numTargetRowsMatchedDeleted,0


operation,key,value
MERGE,numTargetRowsCopied,0
MERGE,numTargetRowsDeleted,0
MERGE,numTargetFilesAdded,0
MERGE,numTargetBytesAdded,0
MERGE,numTargetBytesRemoved,0
MERGE,numTargetDeletionVectorsAdded,0
MERGE,numTargetRowsMatchedUpdated,0
MERGE,executionTimeMs,5119
MERGE,numTargetRowsInserted,0
MERGE,numTargetRowsMatchedDeleted,0


In [0]:
pivot = explode_df_sel.groupBy('operation').pivot('key').sum('value')
pivot.display()

operation,executionTimeMs,numOutputRows,numSourceRows,numTargetBytesAdded,numTargetBytesRemoved,numTargetChangeFilesAdded,numTargetDeletionVectorsAdded,numTargetDeletionVectorsRemoved,numTargetFilesAdded,numTargetFilesRemoved,numTargetRowsCopied,numTargetRowsDeleted,numTargetRowsInserted,numTargetRowsMatchedDeleted,numTargetRowsMatchedUpdated,numTargetRowsNotMatchedBySourceDeleted,numTargetRowsNotMatchedBySourceUpdated,numTargetRowsUpdated,rewriteTimeMs,scanTimeMs
MERGE,5119,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,993,1275


In [0]:
dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()

Out[41]: 'arvndh001@outlook.com'

In [0]:
dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()

Out[33]: '/Task/Audit_column'