In [1]:
import os
import re
import pandas as pd
from tqdm import tqdm
from unidecode import unidecode

# Functions

In [2]:
def fix_gov_danf(df):
    return df.loc[0, 2]

def fix_emitente(df):
    return pd.DataFrame( 
                        df.loc[:, 1]    # Select data
                        .str.split("|") # Split IE
                        .explode()      # Explode IE into rows
                        .str.split(":") # Split key and value
                        .to_list()      # Convert to list
                        , columns=["key", "value"]) # Create columns and convert to pandas dataframe

def fix_destinatario(df):
    return pd.DataFrame(df.loc[:, 1]
                        .str.split(":", n=1)
                        .to_list()
                        , columns=["key", "value"]
                        )
    
def fix_items(df):
    df.columns = df.iloc[0]
    df.drop(0, inplace=True)
    df.reset_index(drop=True, inplace=True)
    df.drop(df.columns[:2], axis=1, inplace=True)
    return df
    
def fix_total(df):
    df.drop(0, axis=1, inplace=True)
    df.columns = ["key", "value"]
    df.reset_index(drop=True, inplace=True)
    return df

def add_id(df, cont):
    df["id_file"] = cont
    return df[["id_file"] + df.columns[:-1].tolist()]

def fix_footer(df):
    temp_df = df.split("||")
    temp_df = pd.Series(temp_df).str.split(":", n=1, expand=True)
    temp_df.columns = ["key", "value"]
    return temp_df

def fix_column_upper(df):
    """Remove accents and apply lower case on columns names, apply upper and strip to values in columns

    Args:
        df (pd.Dataframe): A dataframe with columns names and values
    """
    df.columns = ([ unidecode(ii)
                   .lower()
                   .replace(" ", "_")
                   .replace(".", "")
                   for ii in df.columns])
    
    for col in df.columns:
        if col == "key":
                df[col] = ( df[col]
                    .apply(lambda x: unidecode(x)
                           .replace("(R$)", "")
                           .replace("NFC-e", "")
                           .replace("NF-e", "")
                           .strip()
                           .lower()
                           .replace(" ", "_")
                           )
                    )    
        else:
            df[col] = ( df[col].apply(lambda x: str(x))
                    .str.upper()
                    .str.strip()
                    )
            


# Load Files

In [3]:
dfs_path = "Raw_Data/Raw_DFs/"
url_path = "Raw_Data/"

url_test = "urls" 
url_df = pd.read_csv(f"{url_path}{url_test}.csv").fillna("Failed")

mask = url_df.df_id == "Failed"
url_df = url_df[~mask]

In [4]:
# Drop duplicated rows based in columns df_id
url_df.drop_duplicates(subset="df_id", inplace=True)
url_df.reset_index(drop=True, inplace=True)
# Update file
# url_df.to_csv(f"{url_path}{url_test}.csv", index=False)


# Raw files

In [5]:
# Files List
files_list = os.listdir(dfs_path)
files_dict = {}

for file_path in files_list:
    # Create a dict with the name of file as key and the suffix of subfiles as a list
    if ".csv" in file_path:
        key, value = file_path.replace(".csv", "").rsplit("_", 1)
        if key in files_dict:
            concat_val = files_dict[key] + [value]
            files_dict[key] = concat_val
        else:
            files_dict[key] = [value]
            
# Show the total of files in url.csv and compare with the number of files in the folder
print(f"Total of files in url.csv: {url_df.shape[0]}\nTotal of files in folder: {len(files_dict.keys())}")

Total of files in url.csv: 810
Total of files in folder: 810


In [6]:
# Dfs to store the data
df_gov = pd.DataFrame(columns = ["id_file", "gov", "danfe", "url"])
df_emitente = pd.DataFrame(columns = ["id_file", "key", "value"])
df_destinatario = pd.DataFrame(columns = ["id_file", "key", "value"])
df_items = pd.DataFrame(columns = ["id_file", "Item", "Descrição", "Qtde.", "Unid.", "Vl. unid.", "Vl. total"])
df_pagamento = pd.DataFrame(columns = ["id_file", "key", "value"])
df_nfe_info = pd.DataFrame(columns=["id_file", "key", "value"])

In [7]:

for idx, (file_name, sub_files) in tqdm(enumerate(files_dict.items()), total=len(files_dict.items())):
      # Read all files
      sub_files = map(int, sub_files)
      df_list_raw = [pd.read_csv(f"{dfs_path}{file_name}_{item}.csv", header = None) for item in sub_files]
      
      # Retrieve url
      mask = url_df.df_id == file_name
      file_url = url_df.loc[mask, "url"].values[0]
      # Store data
      df_gov.loc[len(df_gov)]       = [file_name, fix_gov_danf(df_list_raw[0]), fix_gov_danf(df_list_raw[1]), file_url]
      df_emitente                   = pd.concat([df_emitente, add_id(fix_emitente(df_list_raw[2]), file_name)])
      df_destinatario               = pd.concat([df_destinatario, add_id(fix_destinatario(df_list_raw[3]), file_name)])
      df_items                      = pd.concat([df_items, add_id(fix_items(df_list_raw[4]), file_name)])
      df_pagamento                  = pd.concat([df_pagamento, add_id(fix_total(df_list_raw[5]), file_name)])
      df_nfe_info                   = pd.concat([df_nfe_info, add_id(fix_footer(url_df.loc[idx, "footer"]), file_name)])

100%|██████████| 810/810 [00:29<00:00, 27.01it/s]


In [8]:
temp_emitende = df_emitente.copy()
temp_destinatario = df_destinatario.copy()
temp_items = df_items.copy()
temp_pagamento = df_pagamento.copy()
temp_nfe = df_nfe_info.copy()

In [9]:
df_emitente = temp_emitende.copy()
df_destinatario = temp_destinatario.copy()
df_items = temp_items.copy()
df_pagamento = temp_pagamento.copy()
df_nfe_info = temp_nfe.copy()

# Process Dataframes

In [10]:
# Normalize columns upper case values in columns, lower case in columns names and remove accents
fix_column_upper(df_emitente)
fix_column_upper(df_destinatario)
fix_column_upper(df_items)
fix_column_upper(df_pagamento)
fix_column_upper(df_nfe_info)

# Reindex
df_emitente.reset_index(drop=True, inplace=True)
df_destinatario.reset_index(drop=True, inplace=True)
df_items.reset_index(drop=True, inplace=True)
df_pagamento.reset_index(drop=True, inplace=True)
df_nfe_info.reset_index(drop=True, inplace=True)

# Remove unecessary rows
mask = df_emitente.key == "emitente"
df_emitente = df_emitente[~mask]

mask = df_nfe_info.key == "sem_valor_fiscal"
df_nfe_info = df_nfe_info[~mask]

# Move the value of chave_de_acesso from column key to column value ( correct place )
mask = df_nfe_info.key.str.contains("chave")
target_index = df_nfe_info[mask].index.to_list()
for idx in target_index:
    df_nfe_info.loc[idx, "value"] = df_nfe_info.loc[idx+1, "key"]
    
# Remove the moved ( duplicated ) row
mask2 = df_nfe_info.key.isin(df_nfe_info[mask].value)
df_nfe_info = df_nfe_info[~mask2]

# Convert long to wide
df_emitente = df_emitente.pivot(index="id_file", columns="key", values="value").reset_index()
df_destinatario = df_destinatario.pivot(index="id_file", columns="key", values="value").reset_index()
df_pagamento = df_pagamento.pivot(index="id_file", columns="key", values="value").reset_index()
df_nfe_info = df_nfe_info.pivot(index="id_file", columns="key", values="value").reset_index()

# Rename columns
df_emitente.rename(columns={"endereco": "endereco_emitente"}, inplace=True)
df_destinatario.rename(columns={"endereco": "endereco_destinatario"}, inplace=True)

In [11]:
# Fix type
df_items[["qtde", "vl_unid", "vl_total"]] = df_items[["qtde", "vl_unid", "vl_total"]].astype(float)
df_pagamento[["valor_descontos", "valor_pago", "valor_total", "valor_total_dos_produtos"]] = df_pagamento[["valor_descontos", "valor_pago", "valor_total", "valor_total_dos_produtos"]].astype(float)

df_nfe_info["data_de_autorizacao"] = pd.to_datetime(df_nfe_info["data_de_autorizacao"], format="%d/%m/%Y %H:%M:%S")
df_nfe_info["data_de_cancelamento"] = pd.to_datetime(df_nfe_info["data_de_cancelamento"], format="%d/%m/%Y %H:%M:%S")
df_nfe_info["data_de_emissao"] = pd.to_datetime(df_nfe_info["data_de_emissao"], format="%d/%m/%Y %H:%M:%S")

# Fill nan

In [12]:
df_emitente.nome_fantasia.fillna(df_emitente.razao_social, inplace=True)
df_emitente.razao_social.fillna(df_emitente.nome_fantasia, inplace=True)
# Fix user names
mask = df_destinatario.endereco_destinatario.str.contains("caico", case=False, na=False)
df_destinatario.loc[mask, "nome"] = "IVANILSON ( CAICÓ )"
df_destinatario.loc[mask, "cpf"] = "503.185.514-20"

df_destinatario.cpf.fillna("000.000.000-00", inplace=True)
df_destinatario.endereco_destinatario.fillna("Sem Info.", inplace=True)
df_destinatario.nome.fillna("Sem Info.", inplace=True)

df_pagamento.forma_pagamento.fillna("Sem Info.", inplace=True)
df_pagamento.icms.fillna(0, inplace=True)
df_pagamento.valor_total.fillna(df_pagamento.valor_total_dos_produtos, inplace=True)
df_pagamento.valor_total_dos_produtos.fillna(df_pagamento.valor_total, inplace=True)


# Concat

In [13]:
df_nfe = pd.concat([df_gov.set_index("id_file"), df_emitente.set_index("id_file"), df_destinatario.set_index("id_file"), 
     df_pagamento.set_index("id_file"), df_nfe_info.set_index("id_file")], axis=1, join="inner")

In [14]:
df_nfe.isna().sum()

gov                            0
danfe                          0
url                            0
cnpj                           0
endereco_emitente              0
ie                             0
nome_fantasia                  0
razao_social                   0
cpf                            0
destinatario                   0
endereco_destinatario          0
nome                           0
forma_pagamento                0
icms                           0
valor_descontos                0
valor_pago                     0
valor_total                    0
valor_total_dos_produtos       0
chave_de_acesso                0
data_de_autorizacao            0
data_de_cancelamento         808
data_de_emissao                0
protocolo                      0
protocolo_de_cancelamento    808
situacao                       0
dtype: int64

# Save result

In [17]:
# df_nfe.to_csv("Raw_Data/Cleaned_DFs/df_nfe_info.csv", encoding="UTF-8", index=False)
# df_items.to_csv("Raw_Data/Cleaned_DFs/df_items.csv", encoding="UTF-8", index=False)