# ProcessBiFiles (Upload to Curated)

This notebook will read incoming files from BI Layer, apply data quality checks and move data into the curated layer.

## Import Libraries

In [25]:
import json # to parse JSON data
import requests # python library for HTTP requests
import logging # custom logging
from datetime import datetime # library for manipulating dates and times.

from pyspark.sql.functions import explode, arrays_zip, col, expr, when, md5, concat_ws # to process data in dataframes
import pyspark.sql.functions as F # functions to perform operations on dataframes
from pyspark.sql.types import * # all data types supported in PySpark SQL
from notebookutils import mssparkutils # perform common tasks in synapse

### Set Notebook Parameters

In [26]:
# storage account parameters
storage_account = "spfmvpsynapsedev00"

# conatiner parameters
upload_container = "spf-bi-upload"
metadata_container = "spf-bi-metadata"
curated_container = "spf-bi-curated"

# pipeline parameters
pipeline_id = 'default_id'

controlfile_name = "control_table.parquet"
control_file_path = f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/{controlfile_name}"

adls_path = f"abfss://{curated_container}@{storage_account}.dfs.core.windows.net"

# get list of files in the upload container
list_of_files = mssparkutils.fs.ls(f"abfss://{upload_container}@{storage_account}.dfs.core.windows.net/")

## Configure Logging

In [27]:
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

formatter = logging.Formatter(fmt = FORMAT)

for handler in logging.getLogger().handlers:
    handler.setFormatter(formatter)

# Customize the log level for a specific logger
syn_logger = logging.getLogger(pipeline_id)
syn_logger.setLevel(logging.DEBUG)

Use below functions for logging

- syn_logger.debug("customized debug message")
- syn_logger.info("customized info message")
- syn_logger.warning("customized warning message")
- syn_logger.error("customized error message")
- syn_logger.critical("customized critical message")

Create control table parquet file if it does not exist

In [28]:
metadata_files = mssparkutils.fs.ls(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/")
file_exists = False

controltable_schema = StructType([ \
StructField("raw_filepath",StringType(),True), \
StructField("curated_filepath",StringType(),True), \
StructField("processed",BooleanType(),True), \
StructField("pipeline_id",StringType(),True), \
StructField("raw_curated_timestamp", TimestampType(), True), \
StructField("curated_rdv_timestamp", TimestampType(), True), \
])

for file in metadata_files:
    if(file.name == 'control_table.parquet'):
        # print(file.name, file.isDir, file.isFile, file.path, file.size)
        syn_logger.info("Control file exists. Do not create control table")
        file_exists = True

if (file_exists == False):
    syn_logger.info("COntrol file does not exist. Create empty control file with schema")
    new_df = spark.createDataFrame(data = [], schema = controltable_schema)
    new_df.write.parquet(control_file_path)
    syn_logger.warning("Created control table parquet")
        

Get list of all control files in the upload container

In [29]:
def get_list_of_ctl_files():
    list_of_ctl_files = []
    for f in list_of_files:
        if f.name.find('.ctl') != -1:
            list_of_ctl_files.append(f.name)
    return list_of_ctl_files

In [30]:
list_of_ctl_files = get_list_of_ctl_files()

if(len(list_of_ctl_files) >=1):
    syn_logger.info("{0} Files found in upload conatiner: {1}".format(len(list_of_ctl_files), list_of_ctl_files))
else:
    syn_logger.warning("No files in Upload conatiner to process")

## Fetch Schema for control and data files from EDC

#TODO
- control file schema is still not present in EDC. So defining the control file schema in the notebook based on charset
- Align with Luba/Aliaksei/Arindam to move this under EDC later

In [31]:
class EDC:
    '''
    Class for the EDC objects.
    Attributes:
       url: url of corresponding feeder system.
       fs_id: name of the incoming feeder system.
       edc_metadata_df: dataframe to store metadata from EDC.
    '''
    def __init__(self, fs_id):
        self.fs_id = fs_id
        self.url = None
        self.edc_metadata_df = None
        
    def load_data_from_edc(self):
        '''
        This function reads metadata from the EDC endpoint and stores it as dataframe
        '''
        #TODO: based on fs_id fetch the corresponding EDC metadata
        # get response from the url and read the json into a spark dataframe
        syn_logger.info("Loading data from the EDC endpoint")

        self.url = "https://spf-mvp-edc-connect.azurewebsites.net/api/fetch_metadata_from_edc?code=YpU8SZ6ftEsnB7g4qK46U1P_FQil0vXw5Eo2yLxFkuvHAzFuqJJjvg=="+"&fs_id="+self.fs_id
        syn_logger.debug("Trying to reach the EDC endpoint url: {0}".format(self.url))
        
        resp = requests.get(self.url)
        
        if(resp == 200):
            syn_logger.debug("Internal server error while fetching schema from EDC endpoint: {0}".format(resp))
            return "abort"
        else:
            syn_logger.debug("Received a response from the EDC endpoint: {0}".format(resp))

        df = spark.read.json(sc.parallelize([resp.text]))

        # Explode the JSON into facts and id
        df = (df.withColumn('buffer', explode('items'))
                .withColumn('facts', expr("buffer.facts"))
                .withColumn('id', expr("buffer.id"))
                .drop(*['items','metadata','buffer'])
        )

        # Flatten the json attributes into dataframe columns
        df = df.withColumn("Position", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.Position')")[0]["value"])\
                                .withColumn("ColumnName", expr("filter(facts, x -> x.attributeId = 'core.name')")[0]["value"])\
                                .withColumn("Charset", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.Code')")[0]["value"])\
                                .withColumn("FieldSeparator", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.FieldFormat')")[0]["value"])\
                                .withColumn("DataType", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.Datatype')")[0]["value"])\
                                .withColumn("Length", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.Length')")[0]["value"])\
                                .withColumn("Scale", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.Scale')")[0]["value"])\
                                .withColumn("Nullable", expr("filter(facts, x -> x.attributeId = 'com.infa.ldm.relational.Nullable')")[0]["value"])

        # sort the df by Position
        df = df.orderBy(col('Position').cast("int").asc())

        # drop the facts and id columns now
        df = df.drop("facts", "id")

        # convert datatypes to lower case
        df = df.withColumn("DataType", F.lower(F.col("DataType")))

        # set the edc_metadata_df attribute
        self.edc_metadata_df = df

        syn_logger.info("EDC data is sucessfully loaded into dataframe")

        return "continue"

    def get_charset(self):
        '''
        Returns charset of the corresponding control file 
        '''
        # get only the file level info from edc df
        df = self.edc_metadata_df.filter(F.col('Position').isNull())
        
        # fetch charset from the dataframe
        charset = df.select('Charset').collect()[0][0]

        return charset

    def get_field_separator(self):
        '''
        Returns the Field Separator to read the incoming .dat file
        '''
        # mapping to read Filed Separators to spark dataframe readable format
        field_separator_mapping = {
                                        "TAB": '\t',
                                        "Space": ' ',
                                        "Pipe": '|',
                                        "Semi-Colon": ";"
                                        }

        # get only the file level info from edc df
        df = self.edc_metadata_df.filter(F.col('Position').isNull())

        # fetch the field separator from the dataframe
        field_separator = df.select('FieldSeparator').collect()[0][0]

        # map the fieldseparotr from edc to spark
        field_separator = field_separator_mapping[field_separator]

        return field_separator

    def get_scehma_for_data_file(self):
        '''
        This function returns schema for data file
        Arguemnts:
            self
        Return:
            dat_schema: Schema to read data file
        '''
        # remove the undefined positions (eg: first row)
        df = self.edc_metadata_df.filter(F.col('Position').isNotNull())
        
        dat_schema = StructType([])

        for row in df.collect():
            dat_schema.add(row['ColumnName'], StringType())

        return dat_schema

    def get_schema_for_control_file(self):
        '''
        This function returns the schema to read the control file based on the charset
        Arguemnts:
            self
            charset: charset of the control file
        Return:
            schema: StructType() containing schema to read control file
        '''
        schema = None

        char_set = self.get_charset()

        if char_set == "ASCII" or char_set == "EBCDIC" or char_set == "CSV_NAT" or char_set == "ISO 8859-2":
            schema = StructType([
                StructField("fs_id", StringType(), True),        
                StructField("delivery", StringType(), True),
                StructField("delivery_number", StringType(), True),
                StructField("previous_delivery_number", StringType(), True),        
                StructField("sum_of_amount", StringType(), True),        
                StructField("number_of_records", StringType(), True)
            ])

        elif char_set == "CSV_INT":
            schema = StructType([
                StructField("fs_id", StringType(), True),
                StructField("pfs_id", StringType(), True),      
                StructField("delivery", StringType(), True),
                StructField("delivery_number", StringType(), True),
                StructField("previous_delivery_number", StringType(), True),        
                StructField("sum_of_amount", StringType(), True),        
                StructField("number_of_records", StringType(), True)
            ])

        return schema        

In [32]:
class Validator:
    '''
    Class for the Validator objects.

    Attributes:
  
    '''
    def __init__(self, data_df, edc_instance):
        self.isValid = None
        self.data_df = data_df
        self.edc_df = edc_instance.edc_metadata_df

    def number_of_columns_check(self):
        '''
        '''
        number_of_cols_from_edc = self.edc_df.filter(F.col('Position').isNotNull()).count()
        syn_logger.debug("Number of columns in data file {0}".format(len(data_df.columns)))
        syn_logger.debug("Number of columns in EDC {0}".format(number_of_cols_from_edc))
        
        if(len(data_df.columns) == number_of_cols_from_edc):
            self.isValid = True    
        else:
            self.isValid = False
        
        return self.isValid

    def data_length_check(self):
        '''
        '''
        rejected_cols = []
        df = self.data_df

        for column_name in self.data_df.columns:
            # get actual legnt of each column from edc dataframe
            actual_length = self.edc_df.filter(self.edc_df.ColumnName == column_name).select("Length").collect()[0][0]
            if actual_length:
                df_filtered = df.withColumn(column_name, F.regexp_replace(column_name, ',', '')).filter( (F.length(F.col(column_name))  <= actual_length) | (F.col(column_name).isNull()) )
                if(self.data_df.count() != df_filtered.count()):
                    rejected_cols.append(column_name)
        if len(rejected_cols) == 0:
            self.isValid = True
            syn_logger.info("length of the columns match with EDC specified length")
            
        else:
            self.isValid = False
            syn_logger.critical("length of the columns do NOT match with EDC specified length for the following columns: {0}".format(rejected_cols))

        return self.isValid

    def null_check(self):
        '''
        '''
        nullable_colmuns = self.edc_df.filter(F.col('Nullable') == False).select('ColumnName')
        syn_logger.info("Not Null columns from EDC: {0}".format(nullable_colmuns.collect()))

        if (nullable_colmuns):
            for row in nullable_colmuns.collect():
                if( self.data_df.filter(F.col(row['ColumnName']).isNull()).count() >= 1):
                    self.isValid = False
                    syn_logger.critical("Not Null constraint not met: {0}".format(row['ColumnName']))            
                else:
                    self.isValid = True
                    syn_logger.info("Not Null constraint met: {0}".format(row['ColumnName']))
                
        return self.isValid

In [33]:
class Conversion:
    '''
    Class for the Conversion objects.

    Attributes:
  
    '''
    def __init__(self):
        self.convert = None
        self.data_df = data_df
        self.edc_df = edc_instance.edc_metadata_df

    def date_conversion(self):
        '''
        '''
        date_colmuns = self.edc_df.filter(F.col('DataType') == 'date').select('ColumnName')
        # TODO check len = 8
        if (date_colmuns):
            for row in date_colmuns.collect():
                self.data_df = self.data_df.withColumn(row.ColumnName, when( (F.col(row.ColumnName).substr(5, 2).cast('int') <= 12) , F.to_date(col(row.ColumnName), "yyyymmdd") )
                                    .when( (F.col(row.ColumnName).substr(3, 2).cast('int') <= 12), F.to_date(col(row.ColumnName), "ddmmyyyy"))
                                    )
                syn_logger.debug("date converion for the column: {0}".format(row.ColumnName))

        return self.data_df

    def number_conversion(self):
        '''
        '''
        number_dtype_colmuns = self.edc_df.filter(F.col('DataType') == 'number')
        if (number_dtype_colmuns):
            for row in number_dtype_colmuns.collect():
                self.data_df = self.data_df.withColumn(row.ColumnName, F.regexp_replace(F.col(row.ColumnName), ',', '.').cast(DecimalType(precision = int(row.Length), scale = int(row.Scale))))
                
                syn_logger.debug("numeric converion for the column: {0}".format(row.ColumnName))

        return self.data_df

In [34]:
class ErrorHandler(Exception):
    '''
    Class for the Error Handling.

    Attributes:
        filename - filename which caused the error
        message - explanation of the error
    '''
    def __init__(self, filename, *args):
        self.filename = filename
        if args:
            self.message = args[0]
        else:
            self.message = None
        super().__init__(self.message)

    def __str__(self):
        if self.message:
            # print("{0}  - File - {1}".format(syn_logger.error, self.filename))
            return self.message

    def move_file_to_failed_container(self):
        '''
        '''
        return None


In [35]:
# create a dict for instances of edc
edc_instances_per_fs_id = {}

# create one edc instance for one fs_id
def create_edc_instances():
    for ctl_file in list_of_ctl_files:
        fs_id = ctl_file[:ctl_file.index('_')]
        if fs_id.lower() not in edc_instances_per_fs_id:
            edc_instances_per_fs_id[fs_id.lower()] = EDC(fs_id.lower())

In [36]:
map_2fs_family_df = spark.read.load(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/map_2fs_family.csv", format='csv',header=True, delimiter=',', inferSchema=True )

In [37]:
map_2mandt_df= spark.read.load(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/map_2mandt.csv", format='csv',header=True, delimiter=',', inferSchema=True )

In [38]:
create_edc_instances()
result = []

# read the ctl and data files and then validate the schema
for ctl_file_name in list_of_ctl_files:
    # get the corresponding edc instancce
    fs_name = ctl_file_name[:ctl_file_name.index('_')]
    syn_logger.info("Begin processing the file: {0}".format(ctl_file_name))

    edc_instance = edc_instances_per_fs_id[fs_name.lower()]
    syn_logger.debug("Create edc instance for the file: {0}".format(ctl_file_name))

    # load edc metadata from the EDC REST end point
    edc_resp = edc_instance.load_data_from_edc()
    if(edc_resp == "abort"):
        syn_logger.debug("Metdata not found in edc for: {0}".format(ctl_file_name))
        result.append({"filename": ctl_file_name, "delivery_date": delivery_date, "field_separtor": field_separator, "is_valid": False})
        syn_logger.debug("Rejecting the file: {0}".format(ctl_file_name))
        break
    else:
        syn_logger.debug("Metdata succesfully loaded from edc for: {0}".format(ctl_file_name))

    # get Field Seperator
    field_separator = edc_instance.get_field_separator()
    syn_logger.debug("Field separator for the file: {0} is: {1}".format(ctl_file_name, field_separator))
    
    # get ctl file schema
    ctl_schema = edc_instance.get_schema_for_control_file()
    syn_logger.debug("Control file schema loaded for: {0}".format(ctl_file_name))
    
    # read control file
    control_df = spark.read.load(f"abfss://{upload_container}@{storage_account}.dfs.core.windows.net/{ctl_file_name}", 
                                    format='csv',header=False, schema = ctl_schema, delimiter = field_separator)
    
    syn_logger.debug("Loaded the control file: {0}".format(ctl_file_name))

    feeder_system_id = control_df.collect()[0][0]
    delivery_date = control_df.collect()[0][1]

    syn_logger.debug("Feeder systme ID and Delivery Data: {0} is: {1}".format(feeder_system_id, delivery_date))

    # get shema for dat file
    dat_schema = edc_instance.get_scehma_for_data_file()

    # read data file
    data_file_name = ctl_file_name.replace('.ctl', '.dat')
    syn_logger.debug("Loaded data file schema for the file: {0}".format(data_file_name))

    data_df = spark.read.load(f"abfss://{upload_container}@{storage_account}.dfs.core.windows.net/{data_file_name}", 
                                format = 'csv', header = False, schema = dat_schema, delimiter = field_separator )

    syn_logger.debug("Laoded the data file: {0}".format(data_file_name))

    # create an instance for validating the file strcuture
    validator_instace = Validator(data_df, edc_instance)
    syn_logger.debug("Create validator instance for the file: {0}".format(data_file_name))

    syn_logger.debug("Technical validations begin: {0}".format(data_file_name))
    # perform technical validations
    number_of_columns_check = validator_instace.number_of_columns_check()
    if(number_of_columns_check):
        syn_logger.debug("number of columns match: {0}".format(data_file_name))
    else:
        syn_logger.critical("number of columns DO NOT match: {0}".format(data_file_name))
    
    data_length_check = validator_instace.data_length_check()
    if(data_length_check):
        syn_logger.debug("lenght check satisfied: {0}".format(data_file_name))
    else:    
        syn_logger.critical("length check failed: {0}".format(data_file_name))
    
    null_check = validator_instace.null_check()
    if(null_check):
        syn_logger.debug("Null check success: {0}".format(data_file_name))
    else:    
        syn_logger.critical("Null check failed: {0}".format(data_file_name))

    # create a converiosn instance and performa necessary conversions

    conversion_instace = Conversion()
    data_df = conversion_instace.date_conversion()
    syn_logger.debug("Date columns are converted: {0}".format(data_file_name))

    data_df = conversion_instace.number_conversion()
    syn_logger.debug("Numeric columns are converted: {0}".format(data_file_name))

    fs_mandt_value = '#'
    fs_mandt2_value = '#'

    if 'fs_mandt' in data_df.columns:
        fs_mandt_value = data_df.select("fs_mandt").distinct().collect()[0][0]
        
    if 'fs_mandt2' in data_df.columns:
        fs_mandt2_value = data_df.select("fs_mandt2").distinct().collect()[0][0]
    
    syn_logger.debug("fs_mandt2_value: {0}".format(fs_mandt2_value))

    fs_family = None
    fs_family_df = map_2fs_family_df.select('fs_family').filter(map_2fs_family_df.fs_id == feeder_system_id)
    if fs_family_df.count() >= 1:
        fs_family = fs_family_df.collect()[0][0]
    
    syn_logger.debug("fs_family: ".format(fs_family))

    map_2mandt_df = map_2mandt_df.filter((map_2mandt_df.fs_family == fs_family))

    mandt = None

    #fs_mandt pfs_id fs_mandt2
    valuesToCompare = [fs_mandt2_value, feeder_system_id, fs_mandt_value]

    for i in range(len(valuesToCompare)):

        mandt_df = map_2mandt_df.select('mandt').filter((map_2mandt_df.fs_mandt2 == valuesToCompare[0]) & (map_2mandt_df.pfs_id == valuesToCompare[1]) & (map_2mandt_df.fs_mandt == valuesToCompare[2]))
        if mandt_df.count() >= 1:
            syn_logger.debug("found mandt {}".format(mandt_df.collect()[0][0]))
            mandt = mandt_df.collect()[0][0]
            break
        else:
            valuesToCompare[i] = '#'

    # add business Id column. This is temporary solution until we know about the Business Id In RDV
    col_list=[]
    for i in data_df.columns:
        col_list.append(i)

    data_df = data_df.withColumn("financial_transaction_bus_id", md5(concat_ws("", *col_list)))

    syn_logger.debug("Added Business Id column {0}".format(data_file_name))

    dt = datetime.strptime(delivery_date, '%Y%m%d')

    syn_logger.debug("Check if the data file passed all technical validations{0}".format(data_file_name))

    if(validator_instace.isValid):
        syn_logger.debug("File {0} passed all the technical validations".format(data_file_name))
        syn_logger.debug("Moving file {0} to curated container".format(data_file_name))

        parquet_file_name = data_file_name.replace('.dat', '.parquet')
        relative_file_path = '/' + str(mandt) + '/' + fs_name +'/' + str(dt.year) + '/' + str(dt.month) + '/' + str(dt.day) +  '/' + parquet_file_name
        file_path = adls_path + relative_file_path
        data_df.write.mode("overwrite").parquet(file_path)
        
        syn_logger.debug("Parquet file succesfully written {0}".format(file_path))
    else:
        syn_logger.debug("File {0} did NOT pass all the technical validations".format(data_file_name))
        syn_logger.debug("Move file {0} to reject container".format(data_file_name))
        syn_logger.debug("DO NOT write Parquet file to curated container {0}")

    '''
    append the control file with files that are being moved to curated container
    '''

    info = [ctl_file_name, relative_file_path, False, pipeline_id, datetime.now(), datetime.now() ]
    
    # load control table parquet file
    # controltable_df = spark.read.load('abfss://spf-bi-metadata@{storage_account}.dfs.core.windows.net/control_table.parquet', format='parquet', schema = controltable_schema)
    controltable_df = spark.read.load(control_file_path, format='parquet', schema = controltable_schema)
    syn_logger.debug("Control file loaded")

    if(controltable_df.filter(controltable_df.raw_filepath == ctl_file_name).count() == 0):
        syn_logger.info("No file exists in in control table. therfore, appending the new control file info {0}".format(ctl_file_name))
        temp_df = spark.createDataFrame(data = [info], schema = controltable_schema)
        controltable_df = controltable_df.union(temp_df)
        controltable_df.write.parquet(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/control_table_new.parquet")
        mssparkutils.fs.rm(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/control_table.parquet", True)
        controltable_df_temp = spark.read.load(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/control_table_new.parquet", format='parquet', schema = controltable_schema)
        controltable_df_temp.write.parquet(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/control_table.parquet")
        mssparkutils.fs.rm(f"abfss://{metadata_container}@{storage_account}.dfs.core.windows.net/control_table_new.parquet", True)
        syn_logger.debug("Updated the control file with file: {0}".format(ctl_file_name))
    else:
        syn_logger.debug("Filename is not appended to control file. File has already been processed {0}".format(ctl_file_name))

    result.append({"filename": ctl_file_name, "delivery_date": delivery_date, "field_separtor": field_separator, "is_valid": validator_instace.isValid})

In [39]:
from notebookutils import mssparkutils
mssparkutils.notebook.exit(result)