In [18]:
import sys
sys.path.append('/home/armando/git/shorts/dimensional')

import pandas as pd
import numpy as np
from abc import ABC, abstractmethod
from lib.data_profiler.data_profiler import TableProfiler


In [26]:
class AbstractReadTable(ABC):

    @abstractmethod
    def read_table(self, table_name: str) -> pd.DataFrame:
        pass

class ReadTableDummy(AbstractReadTable):
    
    def read_table(self, table_name: str) -> pd.DataFrame:
        
        if table_name == 'dim_vendor':

            dim_vendor = pd.read_csv('/home/armando/git/shorts/dimensional/data/dim_vendor.csv')
            dim_vendor.loc[1, 'vendor_name'] = 'Libros Osom'
            dim_vendor['created_at'] = pd.to_datetime('2025-10-01 01:00:00')
            dim_vendor['updated_at'] = None
            dim_vendor['deleted_at'] = None
            dim_vendor['altered_at'] = pd.to_datetime('2025-10-01 01:00:00')

            return dim_vendor
        
        elif table_name == 'dim_vendor_dwh':

            dim_vendor_dwh = pd.read_csv('/home/armando/git/shorts/dimensional/data/dim_vendor.csv')
            dim_vendor_dwh['effective_start_date'] = pd.to_datetime('1900-01-01 00:00:00')
            dim_vendor_dwh['effective_end_date'] = pd.to_datetime('2100-01-01 00:00:00')
            dim_vendor_dwh['current_flag'] = 'Current'

            return dim_vendor_dwh
        
        elif table_name == 'dim_vendor_dwh_most_recent_surrogate_key':

            most_recent_surrogate_key = pd.read_csv('/home/armando/git/shorts/dimensional/data/dim_vendor.csv')
            most_recent_surrogate_key = most_recent_surrogate_key[['id_vendor', 'vendor_name']]
            return most_recent_surrogate_key
        
        elif table_name == 'fact_currency_rates':

            table = pd.read_csv('/home/armando/git/shorts/dimensional/data/fact_currency_rates.csv')
            return table

        else:
            raise ValueError(f"Table '{table_name}' not found.")

In [6]:
#SUBSISTEMA 01 - EXTRACCION - PERFILAMIENTO DE DATOS
data = pd.DataFrame({
    'user': ['Usuario 1', 'Usuario 2', 'Usuario 3', 'Usuario 4'],
    'age': [25.0, 30.0, 22.0, 35.0],
    'weight': [70.5, 80.2, 65.0, 90.3],
    'sex': ['M', 'F', 'F', 'M']
})

table_profiler = TableProfiler(data)
profile = table_profiler.profile()
print(profile)

{'user': {'unique_count': 4, 'unique_values': ['Usuario 1', 'Usuario 2', 'Usuario 3', 'Usuario 4']}, 'age': {'mean': np.float64(28.0), 'min': np.float64(22.0), 'max': np.float64(35.0)}, 'weight': {'mean': np.float64(76.5), 'min': np.float64(65.0), 'max': np.float64(90.3)}, 'sex': {'unique_count': 2, 'unique_values': ['M', 'F']}}


In [66]:
#SUBSISTEMA 02 - EXTRACCION - CAPTURA DE CAMBIO DE DATOS
class AbstractChangeDataCapture(ABC):

    @abstractmethod
    def detect_changes_by_altered_date(self, origin_table, final_table):
        pass

class ChangeDataCapture(AbstractChangeDataCapture):

    def detect_changes_by_altered_date(self, origin_table: pd.DataFrame, final_table: pd.DataFrame) -> pd.DataFrame:
        
        merged_table = origin_table[['identifier', 'created_at', 'updated_at', 'deleted_at', 'altered_at']].merge(final_table[['identifier', 'created_at', 'updated_at', 'deleted_at', 'altered_at']], on='identifier', suffixes=('_origin', '_final'), how='left')
        merged_table = merged_table.loc[merged_table['altered_at_origin'] != merged_table['altered_at_final']]

        events = []
        i = 1001
        
        for index, row in merged_table.iterrows():
            
            event = None
            event = {'event_id': f'E{str(i)}'}

            if row['created_at_final'] is pd.NaT:
                event['event'] = 'insert'

            elif row['deleted_at_origin'] is not pd.NaT and row['deleted_at_final'] is pd.NaT:
                event['event'] = 'delete'

            else:
                event['event'] = 'update'

            event['table'] = 'source_table'
            event['values'] = {}
            for column in origin_table.columns:
                event['values'][column] = origin_table.loc[index, column]

            events.append(event)
            i = i + 1
       
        return events
    
source_table = pd.DataFrame({'identifier': [1, 2, 3, 4]})
source_table['value'] = ['A', 'E', 'C', 'D']
source_table['created_at'] = pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03', '2023-02-04'])
source_table['updated_at'] = pd.to_datetime([None, '2023-02-11', None, None])
source_table['deleted_at'] = pd.to_datetime([None, None, '2023-02-12', None])
source_table['altered_at'] = source_table[['created_at', 'updated_at', 'deleted_at']].max(axis=1)
source_table['change_reason'] = ['Created by user', 'Updated by user', 'Deleted by user', 'Created by user']

end_table = pd.DataFrame({'identifier': [1, 2, 3]})
end_table['value'] = ['A', 'B', 'C']
end_table['created_at'] = pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
end_table['updated_at'] = pd.to_datetime([None, None, None])
end_table['deleted_at'] = pd.to_datetime([None, None, None])
end_table['altered_at'] = end_table[['created_at', 'updated_at', 'deleted_at']].max(axis=1)
end_table['change_reason'] = ['Created by user', 'Created by user', 'Created by user']

cdc = ChangeDataCapture()
events_cdc = cdc.detect_changes_by_altered_date(source_table, end_table)
for event in events_cdc:
    print(event)

{'event_id': 'E1001', 'event': 'update', 'table': 'source_table', 'values': {'identifier': np.int64(2), 'value': 'E', 'created_at': Timestamp('2023-01-02 00:00:00'), 'updated_at': Timestamp('2023-02-11 00:00:00'), 'deleted_at': NaT, 'altered_at': Timestamp('2023-02-11 00:00:00'), 'change_reason': 'Updated by user'}}
{'event_id': 'E1002', 'event': 'delete', 'table': 'source_table', 'values': {'identifier': np.int64(3), 'value': 'C', 'created_at': Timestamp('2023-01-03 00:00:00'), 'updated_at': NaT, 'deleted_at': Timestamp('2023-02-12 00:00:00'), 'altered_at': Timestamp('2023-02-12 00:00:00'), 'change_reason': 'Deleted by user'}}
{'event_id': 'E1003', 'event': 'insert', 'table': 'source_table', 'values': {'identifier': np.int64(4), 'value': 'D', 'created_at': Timestamp('2023-02-04 00:00:00'), 'updated_at': NaT, 'deleted_at': NaT, 'altered_at': Timestamp('2023-02-04 00:00:00'), 'change_reason': 'Created by user'}}


In [79]:
#SUBSISTEMA 03 - EXTRACCION - SISTEMA DE EXTRACCION DE DATOS

#EXTRCTORES DE FUENTES
class AbstractSourceExtractor(ABC):

    @abstractmethod
    def extract_data(self):
        pass

    @abstractmethod
    def parse_data(self):
        pass

class AbstractFileExtractor(AbstractSourceExtractor):

    @abstractmethod
    def extract_data(self):
        pass

    @abstractmethod
    def parse_data(self):
        pass

class AbstractDatabaseExtractor(AbstractSourceExtractor):

    @abstractmethod
    def connect(self):
        pass

    @abstractmethod
    def extract_data(self):
        pass

    @abstractmethod
    def parse_data(self):
        pass

class GenericJsonExtractor(AbstractFileExtractor):

    def __init__(self):
        self.data = None

    def extract_data(self, file_path: str):
        print('Simulando la extracción de datos JSON desde:', file_path)
        self.data = {'titulo': 'Titulo', 'autor': 'Autor', 'año': 2024, 'genero': 'Ficción'}

    def parse_data(self) -> dict:
        return self.data
    
class GenericCsvExtractor(AbstractFileExtractor):

    def __init__(self):
        self.data = None

    def extract_data(self, file_path: str):
        print('Simulando la extracción de datos CSV desde:', file_path)
        self.data = pd.DataFrame({
            'nombre': ['Ana', 'Luis', 'Marta'],
            'edad': [28, 34, 22],
            'ciudad': ['Madrid', 'Barcelona', 'Valencia']
        })

    def parse_data(self) -> pd.DataFrame:
        return self.data
    
class PostgreSqlExtractor(AbstractDatabaseExtractor):

    def __init__(self):
        self.data = None

    def connect(self, connection_string):
        print('Simulando la conexión a PostgreSQL con:', connection_string)
        self.connection = True

    def extract_data(self, query: str):

        if not self.connection:
            raise Exception('No hay conexión a la base de datos.')
        
        print('Simulando la extracción de datos con la consulta:', query)
        self.data = pd.DataFrame({
            'producto': ['Producto A', 'Producto B', 'Producto C'],
            'precio': [10.5, 20.0, 15.75],
            'stock': [100, 150, 200]
        })

    def parse_data(self) -> pd.DataFrame:
        return self.data

#ESCRITORES DE ARCHIVOS
class AbstractFileWriter(ABC):

    @abstractmethod
    def write_file(self, data, file_path):
        pass

class JsonFileWriter(AbstractFileWriter):

    def write_file(self, data: dict, file_path: str):
        print('Simulando la escritura de datos JSON en:', file_path)
        print(data)

class CSVFileWriter(AbstractFileWriter):

    def write_file(self, data: pd.DataFrame, file_path: str):
        print('Simulando la escritura de datos CSV en:', file_path)
        print(data)

#CANALIZACIONES
class AbstractPipelineExtraction():

    @abstractmethod
    def execute(self, source_extractor: AbstractSourceExtractor, file_writer: AbstractFileWriter):
        pass

class PipelineFileExtraction(AbstractPipelineExtraction):

    def __init__(self, source_path: str, destination_path: str):
        self.source_path = source_path
        self.destination_path = destination_path
    
    def execute(self, source_extractor: AbstractFileExtractor, file_writer: AbstractFileWriter):
        source_extractor.extract_data(self.source_path)
        data = source_extractor.parse_data()
        file_writer.write_file(data, self.destination_path)

class PipelineDatabaseExtraction(AbstractPipelineExtraction):

    def __init__(self, connection_string: str, query: str, destination_path: str):
        self.connection_string = connection_string
        self.query = query
        self.destination_path = destination_path
    
    def execute(self, source_extractor: AbstractDatabaseExtractor, file_writer: AbstractFileWriter):
        source_extractor.connect(self.connection_string)
        source_extractor.extract_data(self.query)
        data = source_extractor.parse_data()
        file_writer.write_file(data, self.destination_path)


#EJECUCION DE CANALIZACIONES
json_extractor = GenericJsonExtractor()
csv_extractor = GenericCsvExtractor()
postgres_extractor = PostgreSqlExtractor()

json_writer = JsonFileWriter()
csv_writer = CSVFileWriter()

pipeline_01 = PipelineFileExtraction('/ruta/al/archivo.json', '/ruta/de/salida/archivo_pipeline_salida.json')
pipeline_01.execute(json_extractor, json_writer)

pipeline_02 = PipelineFileExtraction('/ruta/al/archivo.csv', '/ruta/de/salida/archivo_pipeline_salida.csv')
pipeline_02.execute(csv_extractor, csv_writer)

pipeline_03 = PipelineDatabaseExtraction('dbname=test user=postgres password=secret', 'SELECT * FROM productos;', '/ruta/de/salida/productos_pipeline_salida.csv')
pipeline_03.execute(postgres_extractor, csv_writer)

Simulando la extracción de datos JSON desde: /ruta/al/archivo.json
Simulando la escritura de datos JSON en: /ruta/de/salida/archivo_pipeline_salida.json
{'titulo': 'Titulo', 'autor': 'Autor', 'año': 2024, 'genero': 'Ficción'}
Simulando la extracción de datos CSV desde: /ruta/al/archivo.csv
Simulando la escritura de datos CSV en: /ruta/de/salida/archivo_pipeline_salida.csv
  nombre  edad     ciudad
0    Ana    28     Madrid
1   Luis    34  Barcelona
2  Marta    22   Valencia
Simulando la conexión a PostgreSQL con: dbname=test user=postgres password=secret
Simulando la extracción de datos con la consulta: SELECT * FROM productos;
Simulando la escritura de datos CSV en: /ruta/de/salida/productos_pipeline_salida.csv
     producto  precio  stock
0  Producto A   10.50    100
1  Producto B   20.00    150
2  Producto C   15.75    200


In [86]:
#SUBSISTEMA 04 - FORMACION - LIMPIEZA DE DATOS

## SCREENS
## 1 - COLUMNAR - HECHO DE CONVERSIONES - VALOR NULO EN CAMPO OBLIGATORIO - ETIQUETAR
## 2 - COLUMNAR - HECHO DE CONVERSIONES - VALOR FUERA DE RANGO - ETIQUETAR
## 3 - ESTRUCTURAL - HECHO DE CONVERSIONES - DUPLICADO DE REGISTROS - DETENER
## 4 - REGLAS DE NEGOCIO - HECHO DE CONVERSIONES - INVERSOS INCONGRUENTES - DETENER

#COLUMNAS
#ESTRUCTURAS
#REGLAS DE NEGOCIO

class AbstractValidator(ABC):
   
    @abstractmethod
    def validate(self) -> bool:
        pass

class ValidatorValueOutOfBounds(AbstractValidator):

    def __init__(self, min_value, max_value):
        self.min_value = min_value
        self.max_value = max_value

    def validate(self, value):
        
        result = False
        if value < self.min_value or value > self.max_value:
            result = True

        return result

class ValidatorValueMissing(AbstractValidator):

    def __init__(self):
        pass

    def validate(value):
        
        result = False
        if pd.isnull(value):
            result = True

        return result
    
class ValidatorIsOutOfList(AbstractValidator):

    def __init__(self, valid_values):
        self.valid_values = valid_values

    def validate(self, value):
        
        result = False
        if value not in self.valid_values:
            result = True

        return result
    
class ValidatorHasIncongruentInverses(AbstractValidator):

    def __init__(self):
        pass

    def validate(self, value_a, value_b):
        
        result = False
        if 1 / value_a != value_b:
            result = True

        return result


    

class AbstractValidationScreen(ABC):

    error_event = None

    def __init__(self):
        self.error_event = {}

    @abstractmethod
    def validate(self):
        pass

#class ValidationScreenMisssingValue(AbstractValidationScreen):

    #def validate(self, df):


    #    return is_value_missing(value)


table_reader = ReadTableDummy()

fact_currency_rates = table_reader.read_table('fact_currency_rates')

validator_out_of_bounds = ValidatorValueOutOfBounds(17.0, 23.0)
fact_currency_rates['source_to_destination_rate__out_of_bounds'] = fact_currency_rates['source_to_destination_rate'].apply(lambda x: validator_out_of_bounds.validate(x))

display(fact_currency_rates.head(3))


#value_validator.is_value_missing(fact_currency_rates.loc[2, 'id_currency_source'])
#value_validator.is_value_out_of_bounds(fact_currency_rates.loc[2, 'source_to_destination_rate'], 0.5, 2.0)
#structural_validator = StructuralValidator()
#structural_validator.has_duplicate_records(fact_currency_rates)
#business_rule_validator = BusinessRuleValidator()
#business_rule_validator.has_incongruent_inverses(fact_currency_rates.loc[0, 'source_to_destination_rate'], fact_currency_rates.loc[0, 'destination_to_source_rate'])

Unnamed: 0,id_currency_rate,currency_rate_date,id_currency_source,id_currency_destinations,source_to_destination_rate,destination_to_source_rate,source_to_destination_rate__out_of_bounds
0,1,20250301,2,1,18.45,0.054201,False
1,2,20250301,3,1,20.01,0.049975,False
2,3,20250301,4,1,24.15,0.041408,True


In [32]:
#SUBSISTEMA 06 - FORMACION - ENSAMBLE DE DIMENSION DE AUDITORIA
class AbstractValueValidator(ABC):

    @abstractmethod
    def is_value_out_of_bounds(self, value, min_value, max_value):
        pass

    @abstractmethod
    def is_value_missing(self):
        pass

class ValueValidator(AbstractValueValidator):

    def is_value_out_of_bounds(self, value, min_value, max_value):
        return value < min_value or value > max_value

    def is_value_missing(self, value):
        return pd.isnull(value)



class AbstractAuditDimensionAssembler(ABC):

    @abstractmethod
    def set_etl_version(self, version):
        pass

    @abstractmethod
    def get_audit_id(self):
        pass
    
    @abstractmethod
    def audit_row(self):
        pass

class AuditDimensionAssembler(AbstractAuditDimensionAssembler):

    def __init__(self, fact_table: str, value_validator: AbstractValueValidator):

        self.value_validator = value_validator
        self.fact_table = fact_table
        self.etl_version = None

    def set_etl_version(self, version):
        self.etl_version = version

    def audit_row(self, row: dict):

        has_row_missing_values = 'Falso'
        has_row_value_out_of_bounds = 'Falso'
        if self.fact_table == 'fact_currency_rates':

            for column in ['source_to_destination_rate', 'destination_to_source_rate']:
                if self.value_validator.is_value_missing(row[column]):
                    self.has_row_missing_values = 'Verdadero'

            if self.value_validator.is_value_out_of_bounds(row['source_to_destination_rate'], 0.0, 23.0):
                has_row_value_out_of_bounds = 'Verdadero'
            if self.value_validator.is_value_out_of_bounds(row['destination_to_source_rate'], 0.0, 23.0):
                has_row_value_out_of_bounds = 'Verdadero'

        result = {}
        result['etl_version'] = self.etl_version
        result['missing_indicator'] = has_row_missing_values
        result['out_of_bounds_indicator'] = has_row_value_out_of_bounds
        result['inputed_indicator'] = 'Falso'

        quality_approved = 'Verdadero'
        if has_row_missing_values == 'Verdadero' or has_row_value_out_of_bounds == 'Verdadero':
            quality_approved = 'Falso'

        result['quality_approved'] = quality_approved

        return result
    
    def get_audit_id(self, result):
        if result['quality_approved'] == 'Verdadero':
            return 1
        else:
            return 2
        


In [33]:
value_validator = ValueValidator()
table_reader = ReadTableDummy()

fact_currency_rates = table_reader.read_table('fact_currency_rates')
display(fact_currency_rates.head(3))
audit_dim_assembler = AuditDimensionAssembler('fact_currency_rates', value_validator)
audit_dim_assembler.set_etl_version('v0.0.1')

for index, row in fact_currency_rates.iterrows():
    audit_info = audit_dim_assembler.audit_row(row)
    audit_surrogated_key = audit_dim_assembler.get_audit_id(audit_info)
    print(audit_info)
    print('Llave surrogada de DIM audit: ', audit_surrogated_key)
    print('---')



Unnamed: 0,id_currency_rate,currency_rate_date,id_currency_source,id_currency_destinations,source_to_destination_rate,destination_to_source_rate
0,1,20250301,2,1,18.45,0.054201
1,2,20250301,3,1,20.01,0.049975
2,3,20250301,4,1,24.15,0.041408


{'etl_version': 'v0.0.1', 'missing_indicator': 'Falso', 'out_of_bounds_indicator': 'Falso', 'inputed_indicator': 'Falso', 'quality_approved': 'Verdadero'}
Llave surrogada de DIM audit:  1
---
{'etl_version': 'v0.0.1', 'missing_indicator': 'Falso', 'out_of_bounds_indicator': 'Falso', 'inputed_indicator': 'Falso', 'quality_approved': 'Verdadero'}
Llave surrogada de DIM audit:  1
---
{'etl_version': 'v0.0.1', 'missing_indicator': 'Falso', 'out_of_bounds_indicator': 'Verdadero', 'inputed_indicator': 'Falso', 'quality_approved': 'Falso'}
Llave surrogada de DIM audit:  2
---
{'etl_version': 'v0.0.1', 'missing_indicator': 'Falso', 'out_of_bounds_indicator': 'Falso', 'inputed_indicator': 'Falso', 'quality_approved': 'Verdadero'}
Llave surrogada de DIM audit:  1
---
{'etl_version': 'v0.0.1', 'missing_indicator': 'Falso', 'out_of_bounds_indicator': 'Falso', 'inputed_indicator': 'Falso', 'quality_approved': 'Verdadero'}
Llave surrogada de DIM audit:  1
---
{'etl_version': 'v0.0.1', 'missing_indic

In [45]:
#SUBSISTEMA 09 - ENTREGA - GESTOR DE CAMBIOS LENTOS EN DIMENSIONES
class AbstractSlowlyChangingDimensionHandler(ABC):

    @abstractmethod
    def handle_scd(self, events: list) -> pd.DataFrame:
        pass

class SlowlyChangingDimensionHandler(AbstractSlowlyChangingDimensionHandler):

    dimension = None
    source_table = None
    dwh_table = None
    surrogate_key_table = None

    new_dwh_table = None
    new_surrogate_key_table = None

    def __init__(self, dimension):
        self.dimension = dimension
        self.surrogate_key_generator = SurrogateKeyGenerator()

    def read_tables(self):
        table_reader = ReadTableDummy()

        if self.dimension == 'dim_vendor':

            self.source_table = table_reader.read_table('dim_vendor')
            display(self.source_table.head(5))

            self.dwh_table = table_reader.read_table('dim_vendor_dwh')
            display(self.dwh_table.head(5))

            self.surrogate_key_table = table_reader.read_table('dim_vendor_dwh_most_recent_surrogate_key')
            display(self.surrogate_key_table.head(5))

    def handle_scd(self, events: list) -> None:

        for event in events:
            if event['event'] == 'insert':
                
                new_id = self.surrogate_key_generator.generate_surrogate_key('dim_vendor')

                new_record = pd.DataFrame({
                    'id_vendor': [new_id],
                    'vendor_name': [event['values']['vendor_name']],
                    'vendor_city': [event['values']['vendor_city']],
                    'vendor_status': [event['values']['vendor_status']],
                    'effective_start_date': [pd.to_datetime('1990-01-01 00:00:00')],
                    'effective_end_date': [pd.to_datetime('2100-01-01 00:00:00')],
                    'current_flag': ['Current'],
                    'change_reason': [None]
                })

                self.dwh_table = pd.concat([self.dwh_table, new_record], ignore_index=True)

            elif event['event'] == 'update':

                natural_key = event['values']['vendor_name']
                index = self.surrogate_key_table.loc[self.surrogate_key_table['vendor_name'] == natural_key].index
                current_surrogate_key = self.surrogate_key_table.loc[index, 'id_vendor'].values[0]
                index = self.dwh_table.loc[self.dwh_table['id_vendor'] == current_surrogate_key].index

                if event['event_id'] in ['E003', 'E004']:
                    
                    self.dwh_table.loc[index, 'effective_end_date'] = pd.to_datetime('2025-10-01 01:00:00')
                    self.dwh_table.loc[index, 'current_flag'] = 'Expired'

                    #new_id = self.skg.generate_surrogate_key('dim_vendor')
                    #PARA IMPLEMENTARLO BIEN SE TIENE QUE GUARDAR LA LLAMADA DE INSERT
                    max_id = self.dwh_table['id_vendor'].max()
                    new_id = max_id + 1
                    new_record = pd.DataFrame({
                        'id_vendor': [new_id],
                        'vendor_name': [event['values']['vendor_name']],
                        'vendor_city': [event['values']['vendor_city']],
                        'vendor_status': [event['values']['vendor_status']],
                        'effective_start_date': [pd.to_datetime('2025-10-01 01:00:00')],
                        'effective_end_date': [pd.to_datetime('2100-01-01 00:00:00')],
                        'current_flag': ['Current'],
                        'change_reason': ['A|B']
                    })

                    self.dwh_table = pd.concat([self.dwh_table, new_record], ignore_index=True)

                else:
                    
                    self.dwh_table.loc[index, 'vendor_city'] = event['values']['vendor_city']
                    self.dwh_table.loc[index, 'vendor_status'] = event['values']['vendor_status']
        
        self.new_dwh_table = self.dwh_table
        self.new_surrogate_key_table = self.dwh_table.loc[self.dwh_table['current_flag'] == 'Current']
        self.new_surrogate_key_table = self.new_surrogate_key_table[['id_vendor', 'vendor_name']]

In [46]:
events = [{'event_id': 'E001', 'event': 'update', 'table': 'dim_vendor', 
           'values': {'vendor_name': 'Librosom', 'vendor_city': 'Monterrey',
                      'vendor_status': 'Activo'}}, 
          {'event_id': 'E002', 'event': 'insert', 'table': 'dim_vendor', 
           'values': {'vendor_name': 'Libreria Central', 'vendor_city': 'Guadalajara',
                      'vendor_status': 'Activo'}},
          {'event_id': 'E003', 'event': 'update', 'table': 'dim_vendor', 
           'values': {'vendor_name': 'Editores Unidos', 'vendor_city': 'Tijuana',
                      'vendor_status': 'Activo'}},
          {'event_id': 'E004', 'event': 'update', 'table': 'dim_vendor', 
           'values': {'vendor_name': 'Casa Bajio', 'vendor_city': 'León',
                      'vendor_status': 'Activo'}}
         ]

scd_handler = SlowlyChangingDimensionHandler('dim_vendor')
scd_handler.read_tables()
scd_handler.handle_scd(events)

display(scd_handler.new_dwh_table.head(10))
display(scd_handler.new_surrogate_key_table.head(10))

Unnamed: 0,id_vendor,vendor_name,vendor_city,vendor_status,created_at,updated_at,deleted_at,altered_at
0,1,Casa Bajio,Ciudad de México,Activo,2025-10-01 01:00:00,,,2025-10-01 01:00:00
1,2,Libros Osom,Guadalajara,Activo,2025-10-01 01:00:00,,,2025-10-01 01:00:00
2,3,Librosom,Monterrey,Inactivo,2025-10-01 01:00:00,,,2025-10-01 01:00:00


Unnamed: 0,id_vendor,vendor_name,vendor_city,vendor_status,effective_start_date,effective_end_date,current_flag
0,1,Casa Bajio,Ciudad de México,Activo,1900-01-01,2100-01-01,Current
1,2,Editores Unidos,Guadalajara,Activo,1900-01-01,2100-01-01,Current
2,3,Librosom,Monterrey,Inactivo,1900-01-01,2100-01-01,Current


Unnamed: 0,id_vendor,vendor_name
0,1,Casa Bajio
1,2,Editores Unidos
2,3,Librosom


Nueva llave surrogada:  4


Unnamed: 0,id_vendor,vendor_name,vendor_city,vendor_status,effective_start_date,effective_end_date,current_flag,change_reason
0,1,Casa Bajio,Ciudad de México,Activo,1900-01-01 00:00:00,2025-10-01 01:00:00,Expired,
1,2,Editores Unidos,Guadalajara,Activo,1900-01-01 00:00:00,2025-10-01 01:00:00,Expired,
2,3,Librosom,Monterrey,Activo,1900-01-01 00:00:00,2100-01-01 00:00:00,Current,
3,4,Libreria Central,Guadalajara,Activo,1990-01-01 00:00:00,2100-01-01 00:00:00,Current,
4,5,Editores Unidos,Tijuana,Activo,2025-10-01 01:00:00,2100-01-01 00:00:00,Current,A|B
5,6,Casa Bajio,León,Activo,2025-10-01 01:00:00,2100-01-01 00:00:00,Current,A|B


Unnamed: 0,id_vendor,vendor_name
2,3,Librosom
3,4,Libreria Central
4,5,Editores Unidos
5,6,Casa Bajio


In [39]:
#SUBSISTEMA 10 - ENTREGA - GENERADOR DE LLAVES SURROGADAS
class AbstractSurrogateKeyGenerator(ABC):

    @abstractmethod
    def generate_surrogate_key(self, dimensional_table: str) -> int:
        pass

class SurrogateKeyGenerator(AbstractSurrogateKeyGenerator):

    def generate_surrogate_key(self, dimensional_table: str) -> int:

        surrogated_id_column = None
        if dimensional_table == 'dim_vendor':
            surrogated_id_column = 'id_vendor'

        table_reader = ReadTableDummy()
        dim_table = table_reader.read_table(dimensional_table)
        max_id = dim_table[surrogated_id_column].max()
        new_id = max_id + 1
        print('Nueva llave surrogada: ', new_id)

        return new_id

In [40]:
skg = SurrogateKeyGenerator()
new_sk = skg.generate_surrogate_key('dim_vendor')


Nueva llave surrogada:  4


In [None]:
#SUBSISTEMA 11 - ENTREGA - GESTOR DE JERARQUIAS
class AbstractHierarchyManager(ABC):

    @abstractmethod
    def validate_uniqueness_on_hierarchy(self, source_table, columns):
        pass

    @abstractmethod
    def validate_consistency_on_hierarchy(self, source_table, columns):
        pass

    @abstractmethod
    def update_bridge_table(self, specifications):
        pass