# DataQualityManager

Il DataQualityManager:
- carica ed interpreta le configurazioni a partire dal file YAML
- coordina il processo di applicazione delle aspettative di qualità dei dati ai DataFrame Spark
- interagisce con ExecutionStrategy, delegando l'esecuzione della expectations ad una strategia scelta, come sequenziale, parallela o basata su micro-batch
- aggrega i risultati, raccoglie ed elabora i risultati delle strategie di esecuzione, incluse la gestione delle politiche di blocco e l'esecuzione delle procedure di fallback come definito nella configrazione

In [6]:
import yaml
from pyspark.sql import DataFrame
from typing import Dict, Any
#from manager.validators.spark_dataframe_validator import SparkDataFrameValidator
#from manager.strategies.execution_strategy import ExecutionStrategy
import re
import pprint
from typing import Tuple

# from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import ResourceNotFoundError

class DataQualityManager:
    """
    Manages the application of data quality rules and expectations using Apache Spark and the great_expectations library.
    
    This class is responsible for managing and applying data quality expectations defined in a YAML configuration file.
    It integrates with Apache Spark to apply these expectations on Spark DataFrames.

    Attributes:
        config_path (str): Path to the YAML configuration file.
        config_data (Dict[str, Any]): Data loaded from the YAML configuration file.
    """

    # Constants for YAML file tags 
    TAG_EXPECTATIONS: str = 'expectations'
    TAG_EXPECTATIONS_CATALOG: str = 'expectations_catalog'
    TAG_DATA_QUALITY_DIMENSIONS: str = 'data_quality_dimensions'
    TAG_BLOCKING_EXPECTATIONS_POLICY: str = 'blocking_expectations_policy'
    TAG_REFERENCE: str = 'reference'
    TAG_NAME: str = 'name'
    TAG_ERROR_HANDLING: str = 'error_handling' 
    TAG_FALLBACK_PROCEDURE: str = 'fallback_procedure'
    TAG_BLOCKING_EXPECTATIONS: str = 'blocking_expectations'
    TAG_HALT_IF_FAILED = 'execute_all_expectations_then_halt_if_failed'
    STRATEGY_TYPE: str = 'basic'  # Può essere modificato in 'blocking_first', 'ml_based'

    def __init__(self, config_path: str,sql_config_df, *args, **kwargs) -> None:
        """
        Initializes the DataQualityManager with the specified configuration file.
        
        Args:
            config_path (str): Path to the YAML configuration file.
        """
        self.args=args
        self.kwargs=kwargs

        self.external_references = {
            "keyargs": {
                "type": "keyargs",
                "logic": self.get_secret_from_key_args
            }
        }
        self.config_path = config_path
        self.sql_config_df=  sql_config_df
        self.config_data: Dict[str, Any] = self._load_and_resolve_yaml_config()
        self.data_sources = self._read_data_sources()  # Legge e memorizza i data source
        self.expectations_catalog: Dict[str, Any] = self._load_expectations_catalog()
        self.input_path_dynamic = input_path_dynamic
    def get_config_data(self):
        return self.config_data
    def _read_data_sources(self):
        """
        Legge tutti i data source definiti nel tag "data_sources" del file YAML.
        """
        data_sources = {}
        for data_source_name, data_source_config in self.config_data.get('data_sources', {}).items():
            data_source = DataSourceRegistry.get_data_source(data_source_config["type"], data_source_config)
            #print(f"data:sources.items(): {self.config_data.get('data_sources', {}).items():} , data_source_name:{data_source_name}, data_source_config:{data_source_config}, data_source:{data_source}")
            if data_source:
                data_sources[data_source_name] = data_source
            else:
                raise ValueError(f"Data source '{data_source_name}' not registered or not found.")
        return data_sources    
    

    def get_secret_from_key_args(self,key:str) -> str:
        """
        Retrieves a parameter from args.

        :param key: The name of the args key.
        :return: The value of the key.
        """
        return self.kwargs.get(key,None)
    

    def _load_yaml_config(self):
        """
        Legge il contenuto di un file YAML da Azure Data Lake Storage Gen2 tramite il percorso ABFS.

        Parameters:
        - file_path_abfs (str): Il percorso ABFS del file YAML su Azure Data Lake Storage Gen2.

        Returns:
        - yaml_data: Il dizionario contenente i dati del file YAML.
        """

        # Leggi il contenuto del file YAML come DataFrame
        yaml_content = spark.read.text(self.config_path)

        # Estrai il testo grezzo dalla colonna 'value'
        raw_yaml_text = '\n'.join(yaml_content.select("value").rdd.flatMap(lambda x: x).collect())

        # Rimuovi eventuali virgole che potrebbero causare errori nella conversione YAML
        #raw_yaml_text = raw_yaml_text.replace(',', '')

        # Carica il contenuto YAML in una struttura dati Python
        yaml_data = yaml.safe_load(raw_yaml_text)

        return yaml_data
    
    @staticmethod
    def _extract_data_source_info(path: str) -> Tuple[str, str]:
        match = re.match(r"\#\{data_sources:([^}]+)\}(.+)", path)
        if match:
            return match.groups()
        raise ValueError(f"Invalid data source path: {path}")


    
    import logging

    
    def _load_expectations_catalog(self):
        """
        Carica il catalogo delle aspettative utilizzando i data source letti precedentemente.
        Restituisce un dizionario in cui ogni chiave corrisponde a un'aspettativa e ogni valore
        è il contenuto letto dalla sorgente dati associata.

        Solleva:
            KeyError: Se TAG_EXPECTATIONS_CATALOG non è presente in config_data.
            ValueError: Se ci sono problemi con l'estrazione delle informazioni del data source.

        Returns:
            dict: Il catalogo delle aspettative.
        """
        logger = logging.getLogger(__name__)
        catalog = {}
        # Verifica che TAG_EXPECTATIONS_CATALOG sia presente in config_data
        if self.TAG_EXPECTATIONS_CATALOG not in self.config_data:
            logger.error(f'{self.TAG_EXPECTATIONS_CATALOG} not found in config_data')
            #raise KeyError(f'{self.TAG_EXPECTATIONS_CATALOG} not found in config_data')
            return catalog

        else:       
            for key, path in self.config_data.get(self.TAG_EXPECTATIONS_CATALOG, {}).items():
                try:
                    # Utilizza il metodo extract_data_source_info in DataSourceRegistry
                    data_source_type, path_suffix = DataSourceRegistry.extract_data_source_info(path)
                    data_source = self.data_sources.get(data_source_type)

                    if not data_source:
                        raise ValueError(f"Data source '{data_source_type}' not found in loaded data sources.")

                    # Utilizza il data source per leggere i dati relativi al catalogo delle aspettative
                    catalog_content = data_source.read_data(path_suffix)
                    if catalog_content is None:
                        raise ValueError("Catalog content is None")

                    catalog[key] = catalog_content

                except ValueError as ve:
                    logger.error(f"Error processing catalog entry '{key}': {ve}")
                    continue
                except Exception as e:
                    logger.error(f"Unexpected error processing catalog entry '{key}': {e}")
                    continue

            return catalog



        

    

    def _resolve_external_references(self, yaml_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Risolve i riferimenti esterni nel file YAML.

        Sample:
        
        data_sources:
            my_secure_blob:
                type: "azure-blob"
                container_name: "mycontainer"
                account_name: "${keyvault:azure_storage_account_name}"
                sas_token: "${keyvault:azure_storage_sas_token}"
                file_format: "parquet"
                options:
                option1: "value1"
                option2: "value2"

        :param yaml_data: Dizionario contenente i dati YAML.
        :return: Dizionario con i riferimenti esterni risolti.
        """

        regex = r"\$\{(?P<key>[^}]+)\}"

        def resolve_value(value: str) -> str:
            match = re.search(regex, value)
            if match:
                key = match.group("key")
                start, end = match.span()

                # Cerca la variabile esterna nel dizionario
                for external_key, external_info in self.external_references.items():
                    if key.startswith(external_key):
                        # Estrae il valore reale della chiave
                        actual_key = key[len(external_key) + 1:]
                        tmp_value = value[:start] + external_info["logic"](actual_key) + value[end:]  
                        return tmp_value
            return value

        def resolve_list(l: List[Any]) -> None:
            for index, item in enumerate(l):
                if isinstance(item, dict):
                    resolve_dict(item)
                elif isinstance(item, str):
                    l[index] = resolve_value(item)

        def resolve_dict(d: Dict[str, Any]) -> None:
            for k, v in d.items():
                if isinstance(v, dict):
                    resolve_dict(v)
                elif isinstance(v, str):
                    d[k] = resolve_value(v)
                elif isinstance(v, list):
                    resolve_list(v)

        resolve_dict(yaml_data)
        return yaml_data

    
    
    
  
     

    def _load_and_resolve_yaml_config(self) -> Dict[str, Any]:
        # Carica il contenuto YAML come prima
        yaml_data = self._load_yaml_config()
        #sql_csv_path="abfss://dev@cfoadaadslgen2dlcert.dfs.core.windows.net/landing/prj_idqs/dq_control_results_test/file-sql-test.csv"

        df_from_sql_table =  self.sql_config_df
        

        # Caricare l'YAML in un dizionario

        # Caricare il CSV in un DataFrame
        

        # Iterare sul DataFrame
        import pandas as pd  
        import yaml  
        
         
        
        from pyspark.sql import SparkSession  
          
        
  
        # UPDATE DELLO YAML CON CONFIGURAZIONE SQL
        for row in df_from_sql_table.collect():  
            row_dict = row.asDict()  
            #print(f"Processing row in CSV...")  
            # Trova la configurazione corrispondente nell'YAML  
            for dimension in yaml_data['data_quality_dimensions']: 
                #print(f"dimension:{yaml_data['data_quality_dimensions']}")
                for dq_rule in yaml_data['data_quality_dimensions'][dimension]:
                    #print(f"dq_rule:{dq_rule}")   
                    if dq_rule['name'] == row_dict['CONTROL_ID']:  
                        #print(f"Found matching rule {dq_rule['name']} in YAML...")  
                        # Aggiorna i campi
                        dq_rule['enabled'] = False
                        if row_dict['FLG_ENABLE_CONTROL'].lower() == 'y' or row_dict['FLG_ENABLE_CONTROL'].lower() == 's':  
                            #print("Setting rule as enabled...")  
                            dq_rule['enabled'] = True  
                        if row_dict['CONTROL_RESULTS'] == 'BLOCKING':  
                            #print("Rule is blocking, updating blocking expectations...")  
                            blocking_expectation = '#{data_quality_dimensions:'+ dimension +':' + row_dict['CONTROL_ID'] + '}'  
                            for policy in yaml_data['blocking_expectations_policy']:
                                #controllo che non ci sia già prima di aggiungerla alla lista  
                                if 'blocking_expectations' in policy and policy['blocking_expectations'] is not None:
                                    if blocking_expectation not in policy['blocking_expectations'] and policy['policy_name']==row_dict['CONTROL_RESULTS']:  
                                        #print(f"Adding {blocking_expectation} to blocking expectations for policy {policy['policy_name']}...")  
                                        policy['blocking_expectations'].append(blocking_expectation)  
                                    
        print("Done processing CSV.")
  
                

        self.yaml_data=yaml_data

        if self.external_references is not None:
            # Risolve i riferimenti esterni
            resolved_yaml_data = self._resolve_external_references(yaml_data)
            
            return resolved_yaml_data
        else:
            return yaml_data
    import logging

    

    def _merge_dimension_and_catalog_expectations(self, dimension_expectation: Dict[str, Any]) -> Dict[str, Any]:
        """
        Combines an expectation from the dimension with its definition in the catalog.

        This method merges an expectation defined in a data quality dimension
        with its corresponding definition in the expectations catalog. It allows for
        overriding default expectation settings with dimension-specific ones.

        :param dimension_expectation: The expectation information from the data quality dimension.
        :return: A combined expectation dictionary.
        """
        if self.TAG_REFERENCE in dimension_expectation:
            reference = dimension_expectation[self.TAG_REFERENCE]

            # Utilizza la costante TAG_EXPECTATIONS_CATALOG per la sintassi #{expectations_catalog:<NOME CATALOGO>:<EXPERTATION NAME>}
            #match = re.match(rf'#\{{{self.TAG_EXPECTATIONS_CATALOG}:([^:]+):([^}]+)\}}', reference)
            #match = re.match(rf'#{{{{self.TAG_EXPECTATIONS_CATALOG}}}}:([^:]+):([^}]+)', reference)
            #match = re.match(rf'#\{{self.TAG_EXPECTATIONS_CATALOG}}:([^:]+):([^}]+)', reference)
            #match = re.match(rf'#\{{{{self.TAG_EXPECTATIONS_CATALOG}}}:([^:]+):([^}]+)', reference)
            match = re.match(rf'#\{{{self.TAG_EXPECTATIONS_CATALOG}}}:([^:]+):([^}}]+)', reference)


            if match:
                catalog_name, expectation_name = match.groups()
                catalog_expectations = self.expectations_catalog.get(catalog_name, {}).get(TAG_EXPECTATIONS, [])
                catalog_expectation = next((exp for exp in catalog_expectations if exp[self.TAG_NAME] == expectation_name), None)

                if not catalog_expectation:
                    raise ValueError(f"Expectation {expectation_name} not found in catalog {catalog_name}.")

                combined_expectation = {**catalog_expectation, **dimension_expectation}
            else:
                raise ValueError(f"Invalid reference format in dimension_expectation: {reference}")
        else:
            combined_expectation = dimension_expectation

        return combined_expectation

    def _resolve_expectation_syntax(self, expectation_ref: str) -> Dict[str, Any]:
        """
        Resolves the syntax of an expectation reference from the YAML configuration.

        This method interprets the custom syntax used in the YAML file to reference expectations.
        The expectation references are expected to be in the format "#{...}".

        Args:
            expectation_ref (str): The expectation reference string in custom syntax.

        Returns:
            Dict[str, Any]: A dictionary representing the resolved expectation details.
        """

        # Regular expression to parse the custom syntax
        pattern = r'\#\{([^}]+)\}'
        match = re.match(pattern, expectation_ref)

        if match:
            ref_parts = match.group(1).split(':')
            
            if len(ref_parts) == 3 and ref_parts[0] == self.TAG_DATA_QUALITY_DIMENSIONS:
                dimension_name, expectation_name = ref_parts[1], ref_parts[2]
                dimension_expectations = self.config_data[self.TAG_DATA_QUALITY_DIMENSIONS].get(dimension_name, [])
                expectation = next((exp for exp in dimension_expectations if exp['name'] == expectation_name), None)

                if expectation:
                    return expectation
                else:
                    raise ValueError(f"Expectation {expectation_name} not found in dimension {dimension_name}.")
            else:
                raise ValueError(f"Invalid reference format: {expectation_ref}")
        else:
            raise ValueError(f"Expectation reference does not match expected pattern: {expectation_ref}")

        return {}

    def _get_blocking_expectations_info(self) -> Dict[str, Any]:
        """
        Extracts blocking expectations information from the YAML configuration file.

        This method parses the 'blocking_expectations_policy' section in the YAML file and
        retrieves details about each policy including its name, whether to stop on the first failure,
        error handling strategy, fallback procedure, and the list of expectations.

        :return: A dictionary where each key is a policy name and the value is another dictionary
                containing details of the policy and its expectations.
        """
        blocking_policies = self.config_data.get(self.TAG_BLOCKING_EXPECTATIONS_POLICY, [])
        blocking_expectations_info = {}
        #print(f"blocking_policies:{blocking_policies}")
        for policy in blocking_policies:
            policy_name = policy.get('policy_name')
            stop_on_first_failure = policy.get('stop_on_first_failure', False)
            error_handling_type = policy.get(self.TAG_ERROR_HANDLING)
            fallback_procedure_type = policy.get(self.TAG_FALLBACK_PROCEDURE)
            expectations_refs = policy.get(self.TAG_BLOCKING_EXPECTATIONS, [])
            execute_all_expectations_then_halt_if_failed  = policy.get(self.TAG_HALT_IF_FAILED, False)
            #print(f"policy:{policy}")

            # Crea istanze delle classi ErrorHandler e FallbackProcedure
            error_handling = ErrorHandler.get_instance(error_handling_type) if error_handling_type else None
            fallback_procedure = FallbackProcedure.get_instance(fallback_procedure_type) if fallback_procedure_type else None

            policy_info = {
                'policy_name': policy_name,
                'stop_on_first_failure': stop_on_first_failure,
                'execute_all_expectations_then_halt_if_failed':execute_all_expectations_then_halt_if_failed,
                'error_handling': error_handling,
                'fallback_procedure': fallback_procedure,
                'expectations': []
            }
            #if expectations_refs is not None:
            for exp_ref in expectations_refs:
                #print(f"DQMANAGER:_get_blocking_expectations_info expectations_refs:{expectations_refs}, exp_ref:{exp_ref} ")
                # Extract and combine expectations from the YAML configuration
                # Replace the following line with your existing logic to handle exp_ref
                expectation = self._resolve_expectation_syntax(exp_ref)
                
                if expectation:
                    # Is used to combine the blocking expectations specified in the blocking expectations policy with their definitions in the expectations catalogue. 
                    # This method is essential to ensure that the blocking expectations have all the necessary information before being applied.
                    combined_expectation = self._merge_dimension_and_catalog_expectations(expectation)
                    policy_info['expectations'].append(combined_expectation)
                else:
                    print(f"DqManager._get_blocking_expectations_info expectation not found:{expectation}")

            blocking_expectations_info[policy_name] = policy_info

        return blocking_expectations_info
    def _load_transformations(self):
        """Carica definizioni per "data_transformation_and_distribution" dal file YAML.
        """
        transformations = self.config_data.get('data_transformation_and_distribution', [])
        #print(transformations)
        if transformations == []:
            raise ValueError("Nessuna logica di traformazione inserita nello yaml, controllare presenza tag data_transformation_and_distribution")
        return transformations
    
    def apply_transformations(self, spark_df: DataFrame, *args, **kwargs) -> DataFrame: 
        # ... (Logica pre-esistente per gestione 'expectations') 
        
        # ... la logica della trasformazione potrebbe essere gestita dopo la logica di 'expectations' o in metodo separato
        transformations = self._load_transformations()
        df_list = []
        path_list = []
        for transformation_def in transformations:
            rule_name = transformation_def["rule_name"].lower()
            transformer_cls = FlowAdapterMeta.get_adapter(rule_name) 
            if not transformer_cls:
                raise ValueError(f"Transformer class for rule '{rule_name}' not found.")

            replaced_name = transformation_def.get("naming_convention",None)
            if(replaced_name):
                input_path_dynamic = self.input_path_dynamic.split("-")
                input_path_dynamic[0] = replaced_name
                self.input_path_dynamic = '-'.join(input_path_dynamic)
                self.kwargs['input_path_dynamic']=self.input_path_dynamic
            transformer = transformer_cls(data_sources=self.data_sources)  # Crea istanza del transformer di regola specifico
            df,path = transformer.process_data(df =spark_df, 
                                    column_selection=transformation_def['column_selection'],
                                    column_enrichment=transformation_def['column_enrichment'], 
                                    write_options = transformation_def.get('write_options', {}),
                                    external_data = transformation_def.get('external_data', {}),
                                    *self.args, 
                                    **self.kwargs
                                    )
                     
            # write_options
            df_list.append(df)
            path_list.append(path.split(".net")[1])
        return  df_list, path_list

    def apply_expectations(self, spark_df: DataFrame, *args, **kwargs) -> DataFrame: 
        # Estrai informazioni sulle aspettative bloccanti
        blocking_expectations_info = self._get_blocking_expectations_info()
        
        # Scegli la strategia in base alla presenza di aspettative bloccanti
        if blocking_expectations_info:
            strategy = BlockingFirstExecutionStrategy(policy_info=blocking_expectations_info)
        else:
            strategy = ExecutionStrategy.get_instance(self.STRATEGY_TYPE)

        info_df = InfoDataframeDict(spark_df, {})
        validator = SparkDataFrameValidator(info_df, data_sources=self.data_sources, args = self.args, kwargs = self.kwargs)
        all_results_list = []
        combined_expectations = {} # Dict[Dimension, List[Dict]] contiene la lista delle expertation e tutti gli attributi per ogni dimensione
        
        validator = SparkDataFrameValidator(info_df, data_sources=self.data_sources, args = self.args, kwargs = self.kwargs)

        for dimension_name, dimension_expectations in self.config_data[self.TAG_DATA_QUALITY_DIMENSIONS].items():
            dimension = Dimension.get_instance(dimension_name)
            combined_expectations[dimension]=[]
            for expectation in dimension_expectations:
                # It is used to combine the expectation information specified in the dimension configuration with that defined in the expectations catalogue. 
                # This ensures that each applied expectation has the dimension-specific settings, overriding the default settings in the catalogue if necessary.
                if expectation['enabled'] == True:
                    combined_expectation = self._merge_dimension_and_catalog_expectations(expectation)
                    combined_expectations[dimension].append(combined_expectation)
                elif expectation['enabled'] == False :
                    
                    logger.info(f"Expectation {expectation['name']} is disabled")
                    #print(f"Expectation {expectation['name']}: is disabled {expectation} ")

        # print("Parameters:")
        # pprint.pprint({
        #     "combined_expectations": (type(combined_expectations), combined_expectations),
        #     "dimension": (type(dimension), dimension),
        #     "validator": (type(validator), validator),
        #     "spark_df": (type(validator.get_df()), validator.get_df()),
        # })
        all_results, failed_controls = strategy.execute_expectations(dimension_expectations=combined_expectations,spark_data_frame_validator= validator)
        all_results_list.append(all_results)

        return validator,all_results_list , failed_controls