In [218]:
pip install pandas SQLAlchemy mysql-connector-python python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-1.2.1-py3-none-any.whl.metadata (25 kB)
Using cached python_dotenv-1.2.1-py3-none-any.whl (21 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.2.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
import pandas as pd
import json
from sqlalchemy import create_engine

from collections import namedtuple
import os
from dotenv import load_dotenv

In [21]:
def load_schema_from_db():

    # Removed Columns: IS_NULLABLE, COLUMN_TYPE
    query = """
    select COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH
    from INFORMATION_SCHEMA.COLUMNS
    where table_name
    """
    DB_HOST = os.environ['DB_HOST']
    DB_PORT = os.environ['DB_PORT']
    DB_USER = os.environ["DB_USER"]
    DB_PASSW = os.environ['DB_PASSW']
    DB_NAME = os.environ['DB_NAME']
    DATABASE_URL = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSW}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
    
    try:
        engine = create_engine(DATABASE_URL)
        print("Engine created...")
        
        src_col_schema_df = pd.read_sql(f"{query} = '{'transactions_source'}';",engine)
        dest_col_schema_df = pd.read_sql(f"{query} = '{'transactions_dw'}';",engine)
        print(f"Schema loaded.. \n{len(src_col_schema_df['COLUMN_NAME'][:5].values)} columns found\n{len(dest_col_schema_df['COLUMN_NAME'].values)} columns expected")  
        
    except Exception as e:
        print (f"Error: {e}")

In [22]:
def init():
    try:
        load_dotenv()
      
        load_schema_from_db()
     
    except Exception as e:
        print (f"Error: {e}")    
    

In [23]:
# Init DB Connection and Parameters, Load Schema

init()

Engine created...
Schema loaded.. 
5 columns found
22 columns expected


In [173]:

# validate the first 3 fields 
# Validation Criteria: 
    # 1. Identify New or Removed Fields 
    # 2. Data Type Matching
    # 3. Field Length Matching
    # 4. Column Order Matching


def find_columns_diff(src_schema_df, dest_schema_df):
    
    src_columns = set(src_schema_df['COLUMN_NAME'][:21].values)
    dest_columns = set(dest_schema_df['COLUMN_NAME'][5:].values)

    new_columns = src_columns - dest_columns
    removed_columns = dest_columns - src_columns

    # print(f" New Columns: { new_columns }")
    # print(f" Removed Columns: {removed_columns}")

    return (new_columns, removed_columns)

In [201]:

def validate_col_details(actual_col_schema_df, expected_col_schema_df):
    validation_result={}
    for index,row in actual_col_schema_df.iterrows():
        expected_col_def = expected_col_schema_df[expected_col_schema_df['COLUMN_NAME']==row['COLUMN_NAME']].iloc[0]
        validation_result[row['COLUMN_NAME']]=[]

        # Data Type Validation
        if row['DATA_TYPE'] != expected_col_def['DATA_TYPE']:
            
            
            validation_result[row['COLUMN_NAME']].extend([{
                'data_type': {
                    'expected': expected_col_def['DATA_TYPE'],
                    'actual': row['DATA_TYPE']
                }
            }])
            
        if row['ORDINAL_POSITION'] != expected_col_def['ORDINAL_POSITION'].item():
            validation_result[row['COLUMN_NAME']].extend([ {
                'ordinal_position': {
                    'expected': expected_col_def['ORDINAL_POSITION'].item(),
                    'actual': row['ORDINAL_POSITION']
                }
            }])
        if row['CHARACTER_MAXIMUM_LENGTH']!= expected_col_def['CHARACTER_MAXIMUM_LENGTH'].item():
            validation_result[row['COLUMN_NAME']].extend([ {
                'max_length': {
                    'expected': expected_col_def['ORDINAL_POSITION'].item(),
                    'actual': row['ORDINAL_POSITION']
                }
            }])
    return validation_result        
    

In [209]:
# Make sure to pass the source(actual) & destination(expected) schemas
# for testing I'm passing a filtered source schema having a varying set of cols that the expected schema
src_col_schema_df.sort_values(by="ORDINAL_POSITION")
dest_col_schema_df.sort_values(by="ORDINAL_POSITION")

# Find added & removed columns
new_cols, removed_cols = find_columns_diff(src_col_schema_df,dest_col_schema_df)

# Validate columns (actual vs expected) across column data type, length, order
cols_to_validate_df = src_col_schema_df[~src_col_schema_df['COLUMN_NAME'].isin(added_cols)]

validation_result = validate_col_details(cols_to_validate_df, dest_col_schema_df)


final_result = {
    "col_diff":{
            "new_columns": list(new_cols),
            "removed_columns": list(removed_cols)    
            },
    "val_details":validation_result
    
}

print(json.dumps(final_result, indent=4))

{
    "col_diff": {
        "new_columns": [
            "trans_num",
            "unix_time",
            "cc_num",
            "merchant",
            "trans_date_trans_time"
        ],
        "removed_columns": [
            "is_fraud"
        ]
    },
    "val_details": {
        "category": [
            {
                "data_type": {
                    "expected": "varchar",
                    "actual": "text"
                }
            },
            {
                "ordinal_position": {
                    "expected": 6,
                    "actual": 4
                }
            },
            {
                "max_length": {
                    "expected": 6,
                    "actual": 4
                }
            }
        ],
        "amt": [
            {
                "data_type": {
                    "expected": "decimal",
                    "actual": "double"
                }
            },
            {
                "ordinal_position": {
     