In [0]:
from abc import ABC, abstractmethod
from lib.gateway.database import SparkSQLDatabaseGateway
from lib.interactor.governance import GovernanceInteractor
from lib.interactor.asset import AssetInteractor
from lib.interactor.surrogate_key import SurrogateKeyInteractor

import pandas as pd
import yaml
import json

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from delta.tables import DeltaTable

In [0]:
def propagate_raw_to_audit(target_layer, source_layer):

    for column_name in target_layer['schema'].keys():
        
        if 'rename_from' in target_layer['schema'][column_name].keys():
            rename_from = target_layer['schema'][column_name]['rename_from']
        else:
            rename_from = column_name
            target_layer['schema'][column_name]['rename_from'] = rename_from

        column = target_layer['schema'][column_name]
        for fill_column in ['data_type', 'is_primary_key', 'is_nullable', 'is_partition', 'is_pii', 'comment']:
            if fill_column not in column.keys() and fill_column in source_layer['schema'][rename_from].keys():
                target_layer['schema'][column_name][fill_column] = source_layer['schema'][rename_from][fill_column]

    return target_layer

def propagate_audit_to_historic(target_layer, source_layer):

    for column_name in target_layer['schema'].keys():

        rename_from = column_name
        target_layer['schema'][column_name]['rename_from'] = rename_from

        column = target_layer['schema'][column_name]
        for fill_column in ['data_type', 'is_primary_key', 'is_nullable', 'is_partition', 'is_pii', 'comment']:
            if fill_column not in column.keys() and fill_column in source_layer['schema'][rename_from].keys():
                target_layer['schema'][column_name][fill_column] = source_layer['schema'][rename_from][fill_column]

    return target_layer

In [0]:
database_gateway = SparkSQLDatabaseGateway()
governance_interactor = GovernanceInteractor(database_gateway=database_gateway)
asset_interactor = AssetInteractor(database_gateway=database_gateway)
surrogate_key_interactor = SurrogateKeyInteractor(database_gateway=database_gateway)

schema_table_detail = StructType([
    StructField('table_id', IntegerType(), False),
    StructField('column_id', IntegerType(), False),
    StructField('column_name', StringType(), False),
    StructField('rename_from', StringType(), True),
    StructField('data_type', StringType(), False),
    StructField('ordinal_position', IntegerType(), False),
    StructField('is_primary_key', BooleanType(), False),
    StructField('is_nullable', BooleanType(), False),
    StructField('is_partition', BooleanType(), False),
    StructField('is_pii', BooleanType(), False),
    StructField('validations', StringType(), True),
    StructField('track_changes', BooleanType(), True),
    StructField('comment', StringType(), True)
])

In [0]:
path_base = '/Workspace/Users/armando.n90@gmail.com/users_case/lakehouse/governance/metadata/assets/'
domain = 'domain'
environment = 'dev'
source = 'analytics'
plain_tables = ['visitas']
plain_table = plain_tables[0]
   
print('Processing plain table:', plain_table)
path = f'{path_base}/{domain}/{source}/{plain_table}.yml'

with open(path) as file:
    try:
        metadata = yaml.safe_load(file)
    except yaml.YAMLError as exc:
        print(exc)

name = metadata['name']
raw_layer = None
audit_layer = None
historic_layer = None

if 'raw' in metadata['layers'].keys():
    raw_layer = metadata['layers']['raw']
    raw_layer['layer'] = 'raw'

if 'audit' in metadata['layers'].keys():
    audit_layer = metadata['layers']['audit']
    audit_layer['layer'] = 'audit'

if 'historic' in metadata['layers'].keys():
    historic_layer = metadata['layers']['historic']
    historic_layer['layer'] = 'historic'

audit_layer = propagate_raw_to_audit(audit_layer, raw_layer)
historic_layer = propagate_audit_to_historic(historic_layer, audit_layer)

for layer in [raw_layer, audit_layer, historic_layer]:

    layer_name = layer['layer']
    print('Processing layer: ' + layer_name)

    catalog_name = f'{domain}_{environment}'
    table_name = layer_name +  '_' + name
    print('Ingesting metadata table: ' + table_name)
    
    etl_module = None
    schema_name = None
    quality = None
    table_type = None

    if layer_name == 'raw':
        etl_module = metadata['name'] + '__load'
        schema_name = f'bronze_{source}'
        quality = 'bronze'
        table_type = 'table'

    elif layer_name == 'audit':
        etl_module = metadata['name'] + '__audit'
        schema_name = f'silver_{source}'
        quality = 'silver'
        table_type = 'table'

    elif layer_name == 'historic':
        etl_module = metadata['name'] + '__hist'
        schema_name = f'silver_{source}'
        quality = 'silver'
        table_type = 'table'

    catalog_id = governance_interactor.get_catalog_id(catalog_name)
    schema_id = governance_interactor.get_schema_id(catalog_id, schema_name)

    base_dict = {'schema_id': schema_id, 'table_name': table_name, 'version': 1}
    table_id = surrogate_key_interactor.assign_surrogate_key(catalog_name='governance_prod', schema_name='metadata', 
                                                                table_name='tables', base_values=base_dict, surrogate_column='table_id')

    write_mode = layer['write_mode']
    version = layer['version']
    current_flag = True
    valid_from = pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S') 
    valid_to = '2200-01-01 00:00:00'
    description = layer['description']
    owner = layer['owner']
    retention_policy = layer['retention_policy']

    columns = ['table_id', 'schema_id', 'table_name', 'etl_module', 'write_mode', 'quality', 
            'table_type', 'version', 'current_flag', 'valid_from', 'valid_to', 'description', 
            'owner', 'retention_policy']
    
    ingestion = [(table_id, schema_id, table_name, etl_module, write_mode, quality, 
                table_type, version, current_flag, valid_from, valid_to, description, 
                owner, retention_policy)]

    dataframe_ingestion = spark.createDataFrame(ingestion, columns)
    asset_interactor.merge_dataframe(dataframe_ingestion, catalog_name='governance_prod', schema_name='metadata', 
                                        table_name='tables', match_columns=['table_id'])

    i = 1
    for column_name in layer['schema'].keys():

        base_dict = {'table_id': table_id, 'column_name': column_name}
        print('Ingesting metadata table details for column: ' + column_name)

        column = layer['schema'][column_name]        
        column_id = surrogate_key_interactor.assign_surrogate_key(catalog_name='governance_prod', 
                                                            schema_name='metadata', 
                                                            table_name='tables_detail', 
                                                            base_values=base_dict, 
                                                            surrogate_column='column_id')
        
        is_primary_key = False
        rename_from = None
        validations = None
        is_pii = False
        track_changes = None

        if 'is_primary_key' in column.keys():
            is_primary_key = column['is_primary_key']

        if 'rename_from' in column.keys():
            rename_from = column['rename_from']

        if 'validations' in column.keys():
            validations = json.dumps(column['validations'])

        if 'is_pii' in column.keys():
            is_pii = column['is_pii']

        if 'track_changes' in column.keys():
            track_changes = column['track_changes']
                
        ingestion = [(table_id, column_id, column_name, rename_from, column['data_type'], i, 
                     is_primary_key, column['is_nullable'], column['is_partition'], is_pii, 
                     validations, track_changes, column['comment'])]

        dataframe_ingestion = spark.createDataFrame(ingestion, schema_table_detail)

        asset_interactor.merge_dataframe(dataframe_ingestion, catalog_name='governance_prod', schema_name='metadata',
                                            table_name='tables_detail', match_columns=['column_id'])

        i = i + 1

    print('')

In [0]:
test = spark.sql(f"select * from governance_prod.metadata.tables")
test.show(100)   

In [0]:
test = spark.sql(f"select * from governance_prod.metadata.tables_detail WHERE table_id = 1")
test.show(100)

In [0]:
test = spark.sql(f"select * from governance_prod.metadata.tables_detail WHERE table_id = 2")
test.show(100)

In [0]:
test = spark.sql(f"select * from governance_prod.metadata.tables_detail WHERE table_id = 3")
test.show(100)

In [0]:
#spark.sql('DELETE FROM governance_prod.metadata.tables')
#spark.sql('DELETE FROM governance_prod.metadata.tables_detail')