In [4]:
#Libriaries
import os 
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
import pandasgui as pg 
from sqlalchemy import create_engine
import urllib
import re
import pyodbc 
import pydash
import glob
import logging

#Define the connection details
server = 'jacobo-dev.database.windows.net'
port = '1433'
database = 'jacobo-dev-sqlserver-azure-001'
username = 'azure-admin'
password = 'ja-2023-un0ypzjo'
driver = '{ODBC Driver 18 for SQL Server}'

#Define the connection string
conn_str = f"DRIVER={driver};SERVER={server},{port};DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30"

#Test SQL DB Connection
try:
    conn = pyodbc.connect(conn_str)
    print("Connection successful!")
    conn.close()
except pyodbc.Error as e:
    print("Error connecting to database:", e)


#defined functions
def get_root(file_path):
    """Parses an XML file and returns its root element.
    
    Args:
        file_path (str): The path to the XML file.

    Returns:
        xml.etree.ElementTree.Element: The root element of the XML file.
    """
    xml_tree = ET.parse(file_path)
    return xml_tree.getroot()


def get_child_elements(xml_root):
    """Generates a list to store all child elements of a given xml element.
    
    Args:
        xml_root (xml.etree.ElementTree.Element): The root element of the XML file.

    Returns:
        list: List of all child elements.
    """
    elements = [child for child in xml_root]
    for child in xml_root:
        elements.extend(get_child_elements(child))
    # Append xml root to elements list
    elements.append(xml_root)
    return elements


def get_attribute_cfdi(elements, tag_contains, attribute):
    """Extracts the attributes from xml elements based on the tag and attribute provided, and get back unique values.
    
    Args:
        elements (list): List of XML elements.
        tag_contains (str): Tag name to search for.
        attribute (str): Attribute name to search for.

    Returns:
        str: Comma-separated string of attribute values.
    """
    attribute_values = set(element.attrib.get(attribute) 
                        for element in elements if tag_contains in element.tag 
                        and element.attrib.get(attribute) is not None)
    return ', '.join(attribute_values)


def get_attribute_cfdi_from_path(xml_path, tag_contains, attribute):
    """Extracts the attribute from the root xml based on the tag and attribute provided.
    
    Args:
        xml_path (str): Path to the XML file.
        tag_contains (str): Tag name to search for.
        attribute (str): Attribute name to search for.

    Returns:
        str: Comma-separated string of attribute values.
    """
    xml_root = get_root(xml_path)
    elements = get_child_elements(xml_root)
    elements.append(xml_root)
    return get_attribute_cfdi(elements, tag_contains, attribute)



def pascal_case(text):
    """Convert a string into PascalCase."""
    return ''.join(word.capitalize() for word in text.split())


def get_xml_metadata(elements):
    """Extracts the metadata from xml elements.
    
    Args:
        elements (list): List of XML elements.

    Returns:
        pandas.DataFrame: DataFrame containing XML metadata.
    """
    # Define a list to store row data and a dictionary for the attribute field counters
    data, field_counters = [], {}

    # Get UUID from 'TimbreFiscalDigital' element
    uuid = get_attribute_cfdi(elements, 'TimbreFiscalDigital', 'UUID')
    
    # Iterate over elements
    for element in elements:
        if element.attrib:
            cleaned_tag = pascal_case(re.sub(r"{.*?}", "", element.tag)).lower()
            file_name = os.path.basename(xml_file_path)
            file_path = xml_file_path

            # If this tag has not been seen before, initialize its counter
            if element.tag not in field_counters:
                field_counters[element.tag] = 1

            # Iterate over attributes of the element and add the data to the list
            for key, value in element.attrib.items():
                cleaned_key = pascal_case(re.sub(r"{.*?}", "", key)).lower()
                data.append({'field_number': field_counters[element.tag], 'file_path': file_path, 'file_name': file_name, 'cleaned_tag': cleaned_tag, 'tag': element.tag, 'key': key, 'cleaned_key': cleaned_key,'value': value, 'UUID': uuid})

            # Increment the counter for this attribute field
            field_counters[element.tag] += 1

    # Convert the list of dictionaries to a DataFrame and return
    return pd.DataFrame(data)


def get_unique_tags(elements):
    """Gets a list of unique tag names from xml elements.

    Args:
        elements (list): List of XML elements.

    Returns:
        list: List of unique tag names.
    """
    return list(set([element.tag for element in elements if element.attrib]))


def create_dataframes(elements, tag_list, metadata_df):
    """Creates a dictionary of DataFrames for each unique XML tag.

    Args:
        elements (list): List of XML elements.
        tag_list (list): List of unique XML tags.
        metadata_df (pandas.DataFrame): DataFrame containing XML metadata.

    Returns:
        dict: Dictionary containing a DataFrame for each unique XML tag.
    """
    # Initialize an empty dictionary to hold the dataframes
    cfdi_df = {}

    # Define key fields
    uuid = get_attribute_cfdi(elements, 'TimbreFiscalDigital', 'UUID')
    emisor_rfc = get_attribute_cfdi(elements, 'Emisor', 'Rfc')
    receptor_rfc = get_attribute_cfdi(elements, 'Receptor', 'Rfc')
    comprobante_tipo = get_attribute_cfdi(elements, 'Comprobante', 'TipoDeComprobante')   
    
    for tag in tag_list:
        # Filter the DataFrame
        filtered_df = metadata_df[metadata_df['tag'] == tag]
        
        # Custom aggregation function to concatenate values into a list
        aggfunc = lambda x: list(x) if len(x) > 1 else np.max(x)
        
        # Create a pivot table
        pivot_table = pd.pivot_table(filtered_df, values='value', index=['field_number'], columns=['cleaned_key'], aggfunc=aggfunc)

        # Reset index and change the column names
        df = pivot_table.reset_index().rename_axis(None, axis=1)

        # Add the UUID and other key fields as new columns to the DataFrame
        df['uuid'] = uuid 
        df['emisor_rfc'] = emisor_rfc 
        df['receptor_rfc'] = receptor_rfc
        df['comprobante_tipo'] = comprobante_tipo

        # Add the DataFrame to the dictionary
        cfdi_df[tag] = df

    return cfdi_df


def create_db_engine(driver, server, port, database, username, password):
    """Create a connection engine for a database.

    Args:
        driver (str): The name of the ODBC driver for the database.
        server (str): The address of the server hosting the database.
        port (str): The port number to use to connect to the server.
        database (str): The name of the database.
        username (str): The username to use to connect to the database.
        password (str): The password to use to connect to the database.

    Returns:
        Engine: The SQLAlchemy engine object.
    """
    params = urllib.parse.quote_plus(
        f'DRIVER={driver};SERVER={server},{port};DATABASE={database};UID={username};PWD={password}'
    )
    engine = create_engine(f'mssql+pyodbc:///?odbc_connect={params}')
    return engine


def write_to_db(df_dict, engine):
    """Writes DataFrames to a SQL database.

    Args:
        df_dict (dict): Dictionary where the key is the tag name and the value is the DataFrame to write to the database.
        engine (sqlalchemy.engine.Engine): SQLAlchemy engine instance used to connect to the database.
    """
    for tag, df in df_dict.items():
        cleaned_tag = re.sub(r"{.*?}", "", tag)
        table_name = 'cfdi_' + cleaned_tag
        df.to_sql(table_name, engine, if_exists='append', index=False)


def get_xml_files(path, include_subdirectories=False):
    xml_files = []

    for root, dirs, files in os.walk(path):
        for file in files:
            if file.endswith('.xml'):
                xml_files.append(os.path.join(root, file))
        if not include_subdirectories:
            break  # prevent os.walk() from traversing subdirectories

    return xml_files


def get_child_elements_from_path(file_path):
    """Parse an XML file and return all child elements."""
    xml_tree = ET.parse(file_path)
    xml_root = xml_tree.getroot()
    elements = get_child_elements(xml_root)
    return elements



Connection successful!


In [40]:
def get_xml_metadata_from_path(path, include_subdirectories=True):
    xml_files = get_xml_files(path, include_subdirectories)

    data = []
    log = []
    tag_list = set()  # change to a set to automatically eliminate duplicates
    key_fields_list = []  # initialize an empty list to store key fields
    elements_list = []   # initialize an empty list to store all elements

    for xml_file in xml_files:
        # Define a dictionary for the attribute field counters
        field_counters = {}

        #keyfields
        file_name = os.path.basename(xml_file)
        comprobante_tipo = get_attribute_cfdi_from_path(xml_file, 'Comprobante', 'TipoDeComprobante')
        uuid = get_attribute_cfdi_from_path(xml_file, 'TimbreFiscalDigital', 'UUID')
        emisor_rfc = get_attribute_cfdi_from_path(xml_file, 'Emisor', 'Rfc')
        receptor_rfc = get_attribute_cfdi_from_path(xml_file, 'Receptor', 'Rfc')

        # Pack the key fields into a dictionary and append it to the list
        key_fields_list.append({"file_name": file_name, "comprobante_tipo": comprobante_tipo, "uuid": uuid, "emisor_rfc": emisor_rfc, "receptor_rfc": receptor_rfc})

        # Get elements from the current xml file
        elements = get_child_elements_from_path(xml_file)
        elements_list.extend(elements)  # add elements to the elements_list

        # Iterate over elements
        for element in elements:
            if element.attrib:
                cleaned_tag = pascal_case(re.sub(r"{.*?}", "", element.tag)).lower()
                tag_list.add(cleaned_tag)  # add the cleaned_tag to the tag_list set

                # If this tag has not been seen before, initialize its counter
                if element.tag not in field_counters:
                    field_counters[element.tag] = 1

                # Iterate over attributes of the element and add the data to the list
                for key, value in element.attrib.items():
                    cleaned_key = pascal_case(re.sub(r"{.*?}", "", key)).lower()
                    data.append({'row': field_counters[element.tag], 'comprobante_tipo': comprobante_tipo, 'emisor_rfc': emisor_rfc, 'receptor_rfc': receptor_rfc, 'uuid': uuid, 'tag': element.tag, 'key': key,  'dataframe_name': cleaned_tag, 'field_name': cleaned_key,'field_value': value,'file_path': xml_file, 'file_name': file_name })

                # Increment the counter for this attribute field
                field_counters[element.tag] += 1

        #log - record log info as a dictionary for each xml file
        log.append({'comprobante_tipo': comprobante_tipo, 'emisor_rfc': emisor_rfc, 'receptor_rfc': receptor_rfc, 'file_name': file_name, 'uuid': uuid})

    # create DataFrames after finishing the loop
    metadata_df = pd.DataFrame(data)
    log_df = pd.DataFrame(log)  # create DataFrame from log list

    return metadata_df, log_df, list(tag_list), key_fields_list, elements_list, xml_files # return elements_list


# Function to write a DataFrame to a SQL table
def write_df_dictionary_to_sql(schema, df_dict, engine):
    for tag, df in df_dict.items():
        cleaned_tag = re.sub(r"{.*?}", "", tag)
        table_name = cleaned_tag
        db_schema = schema  # Specify the schema name
        df.to_sql(table_name, engine, if_exists='append', index=False, schema=db_schema)

#write_df_dictionary_to_sql('cfdi', cfdi_df, engine)


In [41]:
path = r"C:\Users\Roberto\OneDrive\cargoIabono\Proyectos y Desarrollos\P001_V001_CFDI-Reader\01_Inputs"

metadata_df, log_df, tag_list, key_fields_list, elements_list, xml_files = get_xml_metadata_from_path(path)


In [42]:
for xml_file in xml_files:
    print('f:', xml_file)
    for element in elements_list:
        print(' ----e:', element)
        #print(      '')
    print('___________|')

f: C:\Users\Roberto\OneDrive\cargoIabono\Proyectos y Desarrollos\P001_V001_CFDI-Reader\01_Inputs\Facturas\01DAD1D3-CA8F-4F0D-8CA7-E0347D111EC4.xml
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Emisor' at 0x000001F5E48D76F0>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Receptor' at 0x000001F5E48D76A0>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Conceptos' at 0x000001F5E48D6840>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Impuestos' at 0x000001F5E48D5E90>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Complemento' at 0x000001F5E48D6930>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Emisor' at 0x000001F5E48D76F0>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Receptor' at 0x000001F5E48D76A0>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Concepto' at 0x000001F5E48D6C50>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Impuestos' at 0x000001F5E48D6890>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Traslados' at 0x000001F5E48D6A70>
 ----e: <Element '{http://www.sat.gob.mx/cfd/3}Tra

In [8]:
log_df

Unnamed: 0,comprobante_tipo,emisor_rfc,receptor_rfc,file_name,uuid
0,I,FIS780810KQ9,MES880922JQA,01DAD1D3-CA8F-4F0D-8CA7-E0347D111EC4.xml,01DAD1D3-CA8F-4F0D-8CA7-E0347D111EC4
1,I,FIS780810KQ9,TCA640201ER3,0A8BEAE0-F410-4A97-8860-7F5EBADC0D60.xml,0A8BEAE0-F410-4A97-8860-7F5EBADC0D60
2,I,FIS780810KQ9,KLM970416U59,0C58F816-7800-44E2-96FF-631A8E432C50.xml,0C58F816-7800-44E2-96FF-631A8E432C50
3,I,FIS780810KQ9,PSM101118J20,0D0E1EBB-003E-4F17-B2BE-0A5442819D1C.xml,0D0E1EBB-003E-4F17-B2BE-0A5442819D1C
4,P,FIS780810KQ9,FAN540305I15,0F23FE0D-8324-4BCE-AFAC-68CB67E89714.xml,0F23FE0D-8324-4BCE-AFAC-68CB67E89714
5,I,FIS780810KQ9,CAT051122H76,1F8FA056-C516-4349-8E87-DC2EC8F831D5.xml,1F8FA056-C516-4349-8E87-DC2EC8F831D5
6,N,EMS2103108P3,AOAR951019842,EMS2103108P3_Pago de nómina_20220815_N_AOAR951...,B7D29180-3BB4-904C-9DC5-8730016AA924


In [9]:
for key_fields in key_fields_list:
    print(key_fields['uuid'])

01DAD1D3-CA8F-4F0D-8CA7-E0347D111EC4
0A8BEAE0-F410-4A97-8860-7F5EBADC0D60
0C58F816-7800-44E2-96FF-631A8E432C50
0D0E1EBB-003E-4F17-B2BE-0A5442819D1C
0F23FE0D-8324-4BCE-AFAC-68CB67E89714
1F8FA056-C516-4349-8E87-DC2EC8F831D5
B7D29180-3BB4-904C-9DC5-8730016AA924


In [10]:
def create_dataframes3(metadata_df):
    # Set up logging configuration
    logging.basicConfig(filename='pivot_table_log.txt', level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')

    # Create an empty dictionary to hold the dataframes
    cfdi_df = {}

    # Unique tags for each comprobante_tipo
    unique_tags = metadata_df['dataframe_name'].unique()

    for tag in unique_tags:
        # Filter the DataFrame by tag
        tag_df = metadata_df[metadata_df['dataframe_name'] == tag]

        # Custom aggregation function to concatenate values into a list
        aggfunc = lambda x: list(x) if len(x) > 1 else np.max(x)

        logging.info(f'Creating pivot table for tag: {tag}')

        pivot_table = pd.pivot_table(
            tag_df,
            values='field_value',
            index=['uuid', 'comprobante_tipo', 'emisor_rfc', 'receptor_rfc', 'file_name', 'file_path', 'row'],
            columns=['field_name'],
            aggfunc=aggfunc
        ).rename(columns={'uuid': 'cfdi_key'})

        # Reset index and change the column names
        df = pivot_table.reset_index().rename_axis(None, axis=1)

        cfdi_df[tag] = df

    return cfdi_df

In [11]:
pg.show(metadata_df)

PandasGUI INFO — pandasgui.gui — Opening PandasGUI


<pandasgui.gui.PandasGui at 0x1f5bde508b0>

In [12]:
cfdi_df = create_dataframes3(metadata_df)

In [13]:
print(tag_list)
pg.show(cfdi_df['timbrefiscaldigital'])

PandasGUI INFO — pandasgui.gui — Opening PandasGUI


['impuestos', 'subsidioalempleo', 'emisor', 'comprobante', 'receptor', 'timbrefiscaldigital', 'nomina', 'deduccion', 'otropago', 'traslado', 'pago', 'doctorelacionado', 'percepcion', 'concepto', 'percepciones', 'deducciones', 'pagos']


<pandasgui.gui.PandasGui at 0x1f5bdf4c820>

In [27]:
# Create a connection engine for the SQL server
engine = create_db_engine(driver, server, port, database, username, password)


# Write each DataFrame in the dictionary to a separate SQL table
write_df_dictionary_to_sql('cfdi', cfdi_df, engine)

In [17]:
# Function to write a DataFrame to a SQL table
def write_to_db_descontinuada(df_dict, engine):
    for tag, df in df_dict.items():
        cleaned_tag = re.sub(r"{.*?}", "", tag)
        table_name = 'cfdi_' + cleaned_tag
        schema = 'cfdi'  # Specify the schema name
        full_table_name = f'{schema}.{table_name}'
        df.to_sql(full_table_name, engine, if_exists='append', index=False)