In [0]:
%sql
CREATE TABLE IF NOT EXISTS ds_training_1.default.schema_registry (
    id BIGINT GENERATED ALWAYS AS IDENTITY,  -- Unique ID for each schema entry
    catalog_name VARCHAR(255),  -- Database/catalog name
    schema_name VARCHAR(255),   -- Schema name of the table
    table_name VARCHAR(255),    -- Table name
    schema_version INT,         -- Schema version number
    created_by VARCHAR(255),    -- User who created the schema
    modified_by VARCHAR(255),   -- User who modified the schema
    modified_timestamp TIMESTAMP, -- Timestamp of modification
    schema_json STRING,         -- Schema represented as a JSON string
    change_type VARCHAR(50),    -- Type of change (ADD, MODIFY, DELETE)
    column_name VARCHAR(255),   -- Affected column name during schema change
    table_version INT,          -- Table version from DESCRIBE HISTORY
    table_version_timestamp TIMESTAMP,  -- Timestamp when this table version was recorded
    status VARCHAR(50),         -- 'Active' or 'Inactive'
    check_timestamp TIMESTAMP,  -- Timestamp when schema was checked
    schema_change_alert_status VARCHAR(50),   -- 'Alert Sent' or 'Alert Pending'
    rollback_notification_status VARCHAR(50) -- 'Notified' or 'Not Yet Notified'

);


In [0]:
# Create widgets for input
dbutils.widgets.text("catalog_name", "", "catalog_name")


In [0]:
catalog_name = dbutils.widgets.get("catalog_name")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import json
from datetime import datetime

# Helper function to check if the table exists in schema_registry
def check_schema_registry(catalog_name, schema_name, table_name):
    query = f"""
        SELECT schema_version, table_version 
        FROM {catalog_name}.default.schema_registry 
        WHERE catalog_name = '{catalog_name}' 
          AND schema_name = '{schema_name}' 
          AND table_name = '{table_name}'
    """
    return spark.sql(query)

# Helper function to get the latest table version and timestamp from DESCRIBE HISTORY
def get_latest_table_version(catalog_name, schema_name, table_name):
    describe_history = spark.sql(f"DESCRIBE HISTORY {catalog_name}.{schema_name}.{table_name}")
    latest_version = describe_history.agg({"version": "max"}).collect()[0][0]
    latest_timestamp = describe_history.filter(f"version = {latest_version}").select("timestamp").collect()[0][0]
    return latest_version, latest_timestamp

# Helper function to fetch the table owner from information_schema.tables
def get_table_owner(catalog_name, schema_name, table_name):
    query = f"""
        SELECT table_owner
        FROM {catalog_name}.information_schema.tables 
        WHERE table_schema = '{schema_name}' 
          AND table_name = '{table_name}'
    """
    result = spark.sql(query).collect()
    return result[0]['table_owner'] if result else None

# Helper function to fetch and convert table schema to JSON format
def get_table_schema_json(catalog_name, schema_name, table_name):
    query = f"""
        SELECT column_name, data_type
        FROM {catalog_name}.information_schema.columns 
        WHERE table_schema = '{schema_name}' 
          AND table_name = '{table_name}'
    """
    table_schema = spark.sql(query)
    return json.dumps({row['column_name']: row['data_type'] for row in table_schema.collect()})

# Helper function to define schema registry structure
def get_schema_registry_schema():
    return StructType([
        StructField("catalog_name", StringType(), False),
        StructField("schema_name", StringType(), False),
        StructField("table_name", StringType(), False),
        StructField("schema_version", IntegerType(), False),
        StructField("created_by", StringType(), True),
        StructField("modified_by", StringType(), True),
        StructField("modified_timestamp", TimestampType(), True),
        StructField("schema_json", StringType(), False),
        StructField("change_type", StringType(), True),
        StructField("column_name", StringType(), True),
        StructField("table_version", IntegerType(), False),
        StructField("table_version_timestamp", TimestampType(), True),
        StructField("status", StringType(), False),
        StructField("check_timestamp", TimestampType(), False),
        StructField("schema_change_alert_status", StringType(), True),
        StructField("rollback_notification_status", StringType(), True)
    ])

# New function to prepare data for schema registry entry
def prepare_schema_registry_data(catalog_name, schema_name, table_name, schema_version, created_by, schema_json, 
                                 latest_table_version, table_version_timestamp, schema_change_alert_status=None, 
                                 rollback_notification_status=None):
    # Prepare the data dictionary
    return [{
        "catalog_name": catalog_name,
        "schema_name": schema_name,
        "table_name": table_name,
        "schema_version": schema_version,
        "created_by": created_by,
        "modified_by": None,
        "modified_timestamp": None,
        "schema_json": schema_json,
        "change_type": None,
        "column_name": None,
        "table_version": latest_table_version,
        "table_version_timestamp": table_version_timestamp,
        "status": "Active",
        "check_timestamp": datetime.utcnow(),
        "schema_change_alert_status": schema_change_alert_status,
        "rollback_notification_status": rollback_notification_status
    }]

# Main function to populate schema registry
def populate_schema_registry(table_reference):
    catalog_name, schema_name, table_name = table_reference.split('.')
    
    # Step 1: Check if the table already exists in schema_registry
    schema_registry_check = check_schema_registry(catalog_name, schema_name, table_name)
    
    if schema_registry_check.count() > 0:
        print(f"Table {catalog_name}.{schema_name}.{table_name} is already present in schema_registry.")
        return  # Exit if the table already exists
    
    # Step 2: Fetch latest table version and timestamp from DESCRIBE HISTORY
    latest_table_version, table_version_timestamp = get_latest_table_version(catalog_name, schema_name, table_name)
    
    # Step 3: Fetch created_by (table owner)
    created_by = get_table_owner(catalog_name, schema_name, table_name)
    
    # Step 4: Fetch schema and convert to JSON
    schema_json = get_table_schema_json(catalog_name, schema_name, table_name)
    
    # Step 5: Prepare data for schema registry entry
    schema_version = 1  # Starting with version 1 for new tables
    schema_registry_data = prepare_schema_registry_data(
        catalog_name=catalog_name,
        schema_name=schema_name,
        table_name=table_name,
        schema_version=schema_version,
        created_by=created_by,
        schema_json=schema_json,
        latest_table_version=latest_table_version,
        table_version_timestamp=table_version_timestamp
    )
    
    # Step 6: Create DataFrame and insert into schema registry
    schema_registry_schema = get_schema_registry_schema()
    schema_registry_df = spark.createDataFrame(schema_registry_data, schema=schema_registry_schema)
    schema_registry_df.write.mode("append").saveAsTable(f"{catalog_name}.default.schema_registry")
    print(f"Table {catalog_name}.{schema_name}.{table_name} successfully added to schema_registry.")


In [0]:
def populate_for_all_tables(catalog_name):
    # Fetch all schemas in the specified catalog
    schemas = spark.sql(f"""
        SELECT schema_name 
        FROM {catalog_name}.information_schema.schemata where schema_name <> 'information_schema'
    """).collect()

    # Iterate through the list of schemas
    for row in schemas:
        schema_name = row['schema_name']
        
        # Fetch all tables that we are monitoring in the current schema 
        tables = spark.sql(f"""
            SELECT table_name 
            FROM {catalog_name}.information_schema.tables 
            WHERE table_schema = '{schema_name}' and created_by='brindavivek.kotha@latentviewo365.onmicrosoft.com' and table_name like '%book%'
        """).collect()

        # Iterate through the list of tables and call the populate function
        for table_row in tables:
            table_name = table_row['table_name']
            table_reference = f"{catalog_name}.{schema_name}.{table_name}"
            populate_schema_registry(table_reference)

# Example call to populate for all tables in a specific catalog
populate_for_all_tables(catalog_name)


Table ds_training_1.ds_bronze.book_xml_vol is already present in schema_registry.
Table ds_training_1.ds_gold.book_author is already present in schema_registry.
Table ds_training_1.ds_silver.book_parquet is already present in schema_registry.
Table ds_training_1.ds_silver.book is already present in schema_registry.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import json
from datetime import datetime

def get_latest_check_timestamp(catalog_name, table_name):
    """Retrieve the latest check timestamp from the schema registry."""
    latest_timestamp_df = spark.sql(f"""
        SELECT check_timestamp 
        FROM {catalog_name}.default.schema_registry 
        WHERE table_name = '{table_name}' 
        ORDER BY modified_timestamp DESC 
        LIMIT 1
    """)
    return latest_timestamp_df.collect()[0]['check_timestamp'] if latest_timestamp_df.count() > 0 else None

def get_max_schema_version(catalog_name, table_name):
    """Get the maximum schema version from the schema registry."""
    max_version_df = spark.sql(f"""
        SELECT MAX(schema_version) AS max_version 
        FROM {catalog_name}.default.schema_registry 
        WHERE table_name = '{table_name}'
    """)
    return max_version_df.collect()[0]['max_version'] if max_version_df.count() > 0 else None

def fetch_created_by(catalog_name, schema_name, table_name):
    """Retrieve the 'created_by' information from information_schema.tables."""
    created_by_df = spark.sql(f"""
        SELECT created_by 
        FROM {catalog_name}.information_schema.tables 
        WHERE table_schema = '{schema_name}' AND table_name = '{table_name}'
    """)
    return created_by_df.collect()[0]['created_by'] if created_by_df.count() > 0 else "unknown"

def fetch_schema_json(catalog_name, schema_name, table_name):
    """Retrieve the schema as JSON from information_schema.columns."""
    schema_df = spark.sql(f"""
        SELECT COLUMN_NAME, DATA_TYPE 
        FROM {catalog_name}.information_schema.columns 
        WHERE table_schema = '{schema_name}' AND table_name = '{table_name}'
    """)
    return {row['COLUMN_NAME']: row['DATA_TYPE'] for row in schema_df.collect()}

def collect_operations_after_check_timestamp(describe_history_df, latest_check_timestamp):
    """Collect operations that occurred after the latest check timestamp."""
    operations_after_check_timestamp = []
    
    for row in describe_history_df.collect():
        operation_timestamp = row['timestamp']
        if latest_check_timestamp and operation_timestamp <= latest_check_timestamp:
            continue  # Skip operations that occurred before or at the latest check timestamp

        operation = row['operation']
        modified_by = row['userName']
        operation_parameters = row['operationParameters']

        # Handle operation parameters
        if isinstance(operation_parameters, str):
            operation_params_dict = json.loads(operation_parameters)
        else:
            operation_params_dict = operation_parameters

        drop_column_name = None
        rename_column_name = None
        column_names = []  # For storing names of added columns

        # Process based on operation type
        if operation == "ADD COLUMNS" and operation_params_dict is not None:
            if 'columns' in operation_params_dict:
                columns_list = json.loads(operation_params_dict['columns'])
                column_names = [col['column']['name'] for col in columns_list]

        elif operation == "DROP COLUMNS" and operation_params_dict is not None:
            if 'columns' in operation_params_dict:
                columns = json.loads(operation_params_dict['columns'])
                drop_column_name = columns[0]  # Assuming only one column is dropped

        elif operation == "RENAME COLUMN" and operation_params_dict is not None:
            if 'oldColumnPath' in operation_params_dict and 'newColumnPath' in operation_params_dict:
                rename_column_name = operation_params_dict['newColumnPath']  # Store the new column name

        # Append operation details to the list
        operations_after_check_timestamp.append({
            "operation": operation,
            "modified_by": modified_by,
            "add_column_names": column_names,
            "drop_column_name": drop_column_name,
            "rename_column_name": rename_column_name,
            "timestamp": operation_timestamp,
            "version": row['version']
        })
    
    return operations_after_check_timestamp
def detect_data_type_changes(operations_after_check_timestamp):
    # Step 5: Detect data type change based on the specific sequence of operations
    operation_sequences = [
        ("RENAME COLUMN", "DROP COLUMNS", "UPDATE", "ADD COLUMNS")
    ]
    data_type_changes = []  # List to store the detected changes

    # Iterate over the operations
    for index in range(len(operations_after_check_timestamp) - 3):
        # Extract the last four operations from the current index
        current_sequence = (
            operations_after_check_timestamp[index]['operation'],
            operations_after_check_timestamp[index + 1]['operation'],
            operations_after_check_timestamp[index + 2]['operation'],
            operations_after_check_timestamp[index + 3]['operation']
        )
        
        # Check if the current sequence matches the defined operation sequence
        if current_sequence == operation_sequences[0]:
            rename_operation = operations_after_check_timestamp[index]
            drop_operation = operations_after_check_timestamp[index + 1]
            
            # Check if the drop column name matches the rename column name
            if drop_operation.get('drop_column_name') == rename_operation.get('rename_column_name'):
                change_column_name = rename_operation.get('rename_column_name')
                change_index = index + 3  # Mark the index of the change
                
                # Append the result to the list
                data_type_changes.append({
                    'change_column_name': change_column_name,
                    'change_index': change_index
                })

    # Output the results
    if data_type_changes:
        print("Data type changes detected:")
        for change in data_type_changes:
            print(f"Column: '{change['change_column_name']}' at index {change['change_index']}")
        return data_type_changes
    else:
        print("No data type changes detected.")
        return []

def define_schema():
    """Define the schema for the DataFrame."""
    return StructType([
        StructField("catalog_name", StringType(), True),
        StructField("schema_name", StringType(), True),
        StructField("table_name", StringType(), True),
        StructField("schema_version", IntegerType(), True),
        StructField("created_by", StringType(), True),
        StructField("modified_by", StringType(), True),
        StructField("modified_timestamp", TimestampType(), True),
        StructField("schema_json", StringType(), True),
        StructField("change_type", StringType(), True),
        StructField("column_name", StringType(), True),
        StructField("table_version", IntegerType(), True),
        StructField("table_version_timestamp", TimestampType(), True),
        StructField("status", StringType(), True),
        StructField("check_timestamp", TimestampType(), True),
        StructField("schema_change_alert_status", StringType(), True),  # New field added
        StructField("rollback_notification_status", StringType(), True)  # New field added
    ])

def create_schema_registry_entry(schema_registry_data, catalog_name, schema_name, table_name, schema_version, created_by, modified_by, modified_timestamp, schema_json, change_type, column_name, table_version, table_version_timestamp, status):
    """Append a new entry to the schema_registry_data."""
    schema_registry_data.append({
        "catalog_name": catalog_name,
        "schema_name": schema_name,
        "table_name": table_name,
        "schema_version": schema_version,
        "created_by": created_by,
        "modified_by": modified_by,
        "modified_timestamp": modified_timestamp,
        "schema_json": schema_json,
        "change_type": change_type,
        "column_name": column_name,
        "table_version": table_version,
        "table_version_timestamp": table_version_timestamp,
        "status": status,
        "check_timestamp": datetime.utcnow(),
        "schema_change_alert_status": "Pending",  # Default value
        "rollback_notification_status": "NA"  # Default value
    })


def mark_existing_records_inactive(catalog_name, table_name):
    """Update all existing records for this table's status to 'Inactive'."""
    update_query = f"""
        UPDATE {catalog_name}.default.schema_registry 
        SET status = 'Inactive' 
        WHERE table_name = '{table_name}' AND status = 'Active'
    """
    spark.sql(update_query)

def update_schema_registry_with_changes(table_reference):
    catalog_name, schema_name, table_name = table_reference.split('.')

    # Step 1: Get the latest check timestamp and maximum schema version
    latest_check_timestamp = get_latest_check_timestamp(catalog_name, table_name)
    max_version = get_max_schema_version(catalog_name, table_name)
    new_version = (max_version + 1) if max_version is not None else 1

    # Step 2: Describe history to collect operations
    describe_history_df = spark.sql(f"DESCRIBE HISTORY {catalog_name}.{schema_name}.{table_name}")
    operations_after_check_timestamp = collect_operations_after_check_timestamp(describe_history_df, latest_check_timestamp)

    # Step to exit: Check if there are any new operations
    if not operations_after_check_timestamp:
        print("No new operations to process.")
        return 
    # Step 3: Prepare data for the schema registry
    schema_registry_data = []

    # Step 4: Get created_by from information_schema.tables
    created_by = fetch_created_by(catalog_name, schema_name, table_name)

    # Step 5: Retrieve schema from information_schema for the schema_json
    schema_json = fetch_schema_json(catalog_name, schema_name, table_name)
    
    #Data type change detection
    data_type_changes = detect_data_type_changes(operations_after_check_timestamp)

    # Step 6: Initialize variables for tracking schema changes
    schema_version = new_version - 1
    skip_operations_until = -1  # To track how many iterations to skip after a data type change

    # Step 7: Handle operations (data type changes and normal operations in sequence)
    for i in range(len(operations_after_check_timestamp) - 1, -1, -1):  # Iterating backwards
        operation_detail = operations_after_check_timestamp[i]

        # Check if we should skip this iteration (after a data type change)
        if i >= skip_operations_until and skip_operations_until != -1:
            continue  # Skip the next operations after a data type change

        # Check if the current index matches any data type change
        #matching_change = next((change for change in detect_data_type_changes if change['change_index'] == i), None)
        matching_change = next((change for change in data_type_changes if change['change_index'] == i), None)

        if matching_change:
            # Data type change detected, record as a single entry
            schema_version += 1
            create_schema_registry_entry(
                schema_registry_data,
                catalog_name,
                schema_name,
                table_name,
                schema_version,
                created_by,
                operation_detail['modified_by'],
                operation_detail['timestamp'],
                json.dumps(schema_json) if (i - 3 == 0) else None,  # Record schema_json for the first change
                "DATA TYPE CHANGE",
                matching_change['change_column_name'],
                int(operation_detail['version']),
                operation_detail['timestamp'],
                'Active' if (i - 3 == 0) else 'Inactive'# Status can be 'Active' or 'Inactive'
          
            )
            # Update the skip_operations_until to ensure next operations are ignored
            skip_operations_until = i - 3
            continue
        
        # Record normal operations (addition, dropping, renaming)
        if operation_detail['operation'] in ["ADD COLUMNS", "DROP COLUMNS", "RENAME COLUMN"]:
            schema_version += 1
            create_schema_registry_entry(
                schema_registry_data,
                catalog_name,
                schema_name,
                table_name,
                schema_version,
                created_by,
                operation_detail['modified_by'],
                operation_detail['timestamp'],
                json.dumps(schema_json) if i == 0 else None,
                operation_detail['operation'],
                operation_detail.get('rename_column_name') or (operation_detail.get('add_column_names') or operation_detail.get('drop_column_name')),
                int(operation_detail['version']),
                operation_detail['timestamp'],
                'Active' if i == 0 else 'Inactive'
       

            )

    # Step 8: Mark existing records as inactive
    mark_existing_records_inactive(catalog_name, table_name)

    # Step 9: Write the new schema registry data back to the database
    updated_schema_registry_df = spark.createDataFrame(schema_registry_data, schema=define_schema())
    updated_schema_registry_df.write.format("delta").mode("append").saveAsTable(f"{catalog_name}.default.schema_registry")
    display(updated_schema_registry_df)

In [0]:
def update_all_tables_in_schema_registry(catalog_name):
    # Step 1: Get distinct schema and table names from the schema registry
    distinct_tables_df = spark.sql(f"""
        SELECT DISTINCT schema_name, table_name 
        FROM {catalog_name}.default.schema_registry
    """)

    # Step 2: Loop through each distinct table and call update_schema_registry_with_changes
    for row in distinct_tables_df.collect():
        schema_name = row['schema_name']
        table_name = row['table_name']
        
        # Prepare the table reference in the required format
        table_reference = f"{catalog_name}.{schema_name}.{table_name}"
        
        # Call the update function for each table
        update_schema_registry_with_changes(table_reference)

# Example call for all tables in the catalog 'ds_training_1'
update_all_tables_in_schema_registry(catalog_name)

No new operations to process.
No new operations to process.
No data type changes detected.


catalog_name,schema_name,table_name,schema_version,created_by,modified_by,modified_timestamp,schema_json,change_type,column_name,table_version,table_version_timestamp,status,check_timestamp,schema_change_alert_status,rollback_notification_status
ds_training_1,ds_silver,customer_silver_vishal,18,vishal.kokkula@latentviewo365.onmicrosoft.com,vishal.kokkula@latentviewo365.onmicrosoft.com,2024-10-23T10:40:30Z,,ADD COLUMNS,['new_1'],26,2024-10-23T10:40:30Z,Inactive,2024-10-23T10:44:14.541861Z,Pending,
ds_training_1,ds_silver,customer_silver_vishal,19,vishal.kokkula@latentviewo365.onmicrosoft.com,vishal.kokkula@latentviewo365.onmicrosoft.com,2024-10-23T10:40:37Z,,ADD COLUMNS,['new_2'],27,2024-10-23T10:40:37Z,Inactive,2024-10-23T10:44:14.541871Z,Pending,
ds_training_1,ds_silver,customer_silver_vishal,20,vishal.kokkula@latentviewo365.onmicrosoft.com,vishal.kokkula@latentviewo365.onmicrosoft.com,2024-10-23T10:41:26Z,"{""customer_id"": ""INT"", ""name"": ""STRING"", ""age"": ""INT"", ""gender"": ""STRING"", ""phone_number"": ""STRING"", ""email"": ""STRING"", ""account_id"": ""INT"", ""account_type"": ""STRING"", ""balance"": ""INT"", ""opened_date"": ""DATE"", ""status"": ""STRING"", ""business_date"": ""STRING"", ""test_col_3"": ""DATE"", ""test_column_18_10_24"": ""INT"", ""test_column_23_10"": ""INT"", ""new_1"": ""INT"", ""new_2"": ""INT""}",RENAME COLUMN,test_col_3,28,2024-10-23T10:41:26Z,Active,2024-10-23T10:44:14.541913Z,Pending,


No new operations to process.
No new operations to process.
No new operations to process.
No new operations to process.
No new operations to process.
No new operations to process.
