In [None]:
#Importando as bibliotecas necessárias
import os #uma biblioteca que nos permite interagir com o sistema operacional, como navegar pelos diretórios
import pandas as pd
import numpy as np
import re
from simpledbf import Dbf5 #Converte arquivo DBF para arquivo CSV
from dbfread import DBF
from openpyxl import load_workbook
import zipfile

In [None]:
#Funcao para percorrer todos arquivos em formato dbf nas pastas da rede do laboratorio
def encontrar_dbf(tabela, empresa, ano, cod_emp):
    ano = int(ano)
    arquivos_dbf = []
   

    if ano not in np.arange(2004,2009): #Os anos entre 2009 e 2017 foram armazenados em caminhos de estruturação distinta e não serão usados neste relatorio
        ano = str(ano)
        caminho_inicial = fr'\\PROJETOS2\CDs_{ano}\{empresa}\{cod_emp}'
        for root, dirs, files in os.walk(caminho_inicial): #Permite fazer uma busca recursiva no diretório e seus subdiretórios
            #A função os.walk retorna um gerador que produz tuplas contendo três elementos: o diretório atual, uma lista dos subdiretórios presentes nesse diretório e uma lista dos arquivos presentes nesse diretório.
            path_components = root.split(os.sep)
        
            #if cod_emp in path_components and empresa in path_components or cod_emp.strip('0') in path_components:
            if any( cod_emp[1:] in item or cod_emp in item for item in path_components) and any( empresa.lower() in item.lower() for item in path_components) and all('excluir' not in item.lower() for item in path_components) and all('lixo' not in item.lower() for item in path_components) and all( 'old' not in item.lower() for item in path_components):
                for file in files:
                    if file.lower().endswith(".dbf") and file.lower().startswith(f"{tabela.lower()}"): #Selecioanos os tipos de arquivos desejados
                        arquivos_dbf.append(os.path.join(root, file))

    if len(arquivos_dbf) == 0:    
       return [0] 
   
    return arquivos_dbf

In [None]:
#Funcao para realizar as contagens basicas dos arquivos em formato dbf encontrados na função anterior
def arq_dbf (tabelas, cod_emps, empresa, anos):
    
    linhas_resultados = [] # Armazenar resultados intermediários para concatenação final
    logs= []
    
    for tabela in tabelas:

        for ano in anos:

            for cod_emp in cod_emps:
                #Para cada tipo de tabela, ano e codigo da empresa buscamos os caminhos correspondentes 
                caminhos = encontrar_dbf(tabela, empresa, ano, cod_emp)
                
                #Caso exista mais de um envio para at ou sa, por exemplo
                for caminho in caminhos:

                    tipo = tabela[:2]
                    ramo = tabela[3:6]

                     # Se o arquivo não foi encontrado, passa para a próxima iteração
                    if caminho == 0:
                        #print(f"Diretório não encontrado para a empresa {empresa} de cod_emp {cod_emp} no ano {ano}.")
                        linhas_log = pd.DataFrame({'ANO': [ano], 'EMPRESA': [empresa], 'COD_EMP': [cod_emp], 'TIPO': [tipo], 'RAMO': [ramo],'QTDE_LINHAS_TOTAL_DBF': 0})
                        logs.append(linhas_log)

                    else:
                        #Se o arquivo foi encontrado, primeiro tenta converter o dbf para csv, caso não dê certo mostra uma mensagem
                        try:
                            table = DBF(caminho, load=False) 
                            qtde_linhas_total = len(table)
                            linhas = pd.DataFrame({'ANO': [ano], 'EMPRESA': [empresa], 'COD_EMP': [cod_emp], 'TIPO': [tipo], 'RAMO': [ramo],'QTDE_LINHAS_TOTAL_DBF':[qtde_linhas_total] })
                            linhas_resultados.append(linhas) 

                        except Exception as erro:
                            print(f"Erro ao processar o arquivo DBF para a empresa {empresa}, cod_emp {cod_emp}, ano {ano}: {erro}")

    if len(logs) == 0: #Caso todos os arquivos tenham sido encontrados mostrar apenas um dataFrame vazio
        resultado_log = pd.DataFrame()
    else:
        resultado_log = pd.concat(logs, ignore_index=True)    
        
    if len(linhas_resultados) == 0: #Caso nenhum arquivo tenha sido encontrado
        resultados_dbf = resultado_log
    else:
        resultados_dbf = pd.concat(linhas_resultados, ignore_index=True) #Vou guardar todos os DataFrames em uma lista e concatenar tudo ao final.
        resultados_dbf = resultados_dbf.groupby(['COD_EMP','ANO','EMPRESA','RAMO','TIPO'])['QTDE_LINHAS_TOTAL_DBF'].sum().reset_index()  
    
    return resultados_dbf, resultado_log

In [None]:
#Contagem basica dos arquivos que estão em formato parquet
def arq_parquet (tabelas, empresa, anos, cod_emps):
    
    logs = []
    resultados_parquet = []

    for tabela in tabelas:
        
        for ano in anos:

            base = fr'\\PROJETOS\{ano}\{empresa.upper()}'
            tipo = tabela[:2]
            ramo = tabela[3:6]
            parquets_diretorio = os.path.join(base,tabela)

            #Procura os parquets

            if not os.path.exists(parquets_diretorio) and len(tabelas) == 1: #Caso em que só seja selecionado o ano em que não há parquets
                print(f"Não há totalizações da {tabela} para mostar para a {empresa} no ano de {ano}")

            if not os.path.exists(parquets_diretorio): 
                #print(f"Diretorio {parquets_diretorio} não encontrado.")
                linhas_log = pd.DataFrame({'ANO': [ano], 'EMPRESA': [empresa], 'COD_EMP': [cod_emps], 'TIPO': [tipo], 'RAMO': [ramo],'QTDE_LINHAS_TOTAL_PARQUET': 0})
                linhas_log = linhas_log.explode('COD_EMP', ignore_index=True) # Expande a coluna COD_EMP para que cada valor tenha uma linha separada
                logs.append(linhas_log)
                continue
            
            parquets = os.listdir(parquets_diretorio) #Lista todos os arquivos em parquet dentro da pasta de uma empresa dentro daquele ano para a tabela desejada

            for p in parquets:
                parquets_path = os.path.join(parquets_diretorio,p)
                aux = pd.read_parquet(f'{base}\\{tabela}\\{p}')

                #filtra o DataFrame para o cod_emp selecionado
                cod_emps = aux['COD_EMP'].unique()
                for cod_emp in cod_emps:
                    aux_cod_emp = aux[aux['COD_EMP']==cod_emp]
            
                    qtde_linhas_total = len(aux_cod_emp)

                    #Junta os resultados_parquet de todas as iterações
                    linhas =pd.DataFrame({'ANO': [ano], 'EMPRESA': [empresa], 'COD_EMP': [cod_emp], 'TIPO': [tipo], 'RAMO': [ramo],'QTDE_LINHAS_TOTAL_PARQUET':[qtde_linhas_total]})
                    resultados_parquet.append(linhas)
    
    if len(logs) == 0: #Caso todos os arquivos tenham sido encontrados
        resultado_log = pd.DataFrame()
    else:
        resultado_log = pd.concat(logs, ignore_index=True)         
    
    if len(resultados_parquet) == 0: #Caso nenhum arquivo tenha sido encontrado
        resultados_parquet = resultado_log
    else:
        resultados_parquet = pd.concat(resultados_parquet, ignore_index=True) #Vou guardar todos os DataFrames em uma lista e concatenar tudo ao final.
        
           
    return resultados_parquet , resultado_log

In [None]:
#Visualização geral das contagens 
def totaliza_uni (tabela, empresa, anos, cod_emps):
    #Talvez o for tabelas e for anos e for empresas devesse ser aqui 

    print(f"Executando totaliza com empresa={empresa}, tabela={tabela}, cod_emp={cod_emps} \n")

    juncoes = ['ANO','EMPRESA','COD_EMP','TIPO','RAMO']
    t1 = arq_dbf (tabela, cod_emps, empresa, anos)
    t2 = arq_parquet (tabela, empresa, anos, cod_emps)
    left = t1[0]
    right = t2[0]
    #Para evitar de armazenar tabela com linhas vazias, que é quando não tem registros para mostrar do segmento escolhido no argumento tabela
    if t1[0].equals(t1[1]) and t2[0].equals(t2[1]): 
        ramo = tabela[0][3:6] #Quando houver a necessidade de rodar mais de uma tabela ao mesmo tempo haverá necessidade de mudança aqui
        return f'A {empresa} não apresenta o segmento {ramo}'
    
    else:
        #juntando as totalizações de cada arquivo parquet
        right = right.groupby(['ANO','COD_EMP', 'EMPRESA','TIPO','RAMO'])[['QTDE_LINHAS_TOTAL_PARQUET']].sum().reset_index()
    
        #Fazendo um join com os DataFrames
        resultado = pd.merge(left, right, on= juncoes, how='left')
    
        return resultado 

In [None]:
#Inserir os dados para cada empresa
tabela = ['ATIVOS']
anos = np.arange(2017,2024)

guia = {'XXX':['00000'],'YYY':['11111']}


empresas = guia.keys()

planilha = fr"cole o path desejado para armazenar o resultado"

In [None]:
#Visualização em uma tabela externa para verificação da consistencia entre as transformações de formato dos arquivos 
import winsound

for empresa in empresas:
    print(empresa)
    cod_emps = guia[empresa]
    #resultados_dbf = arq_dbf(tabela, cod_emps,empresa, anos)[1]
    #resultados_parquet = arq_parquet(tabela, empresa, anos,cod_emps)[1]

    #Criando uma planilha para armazenar todas as totalizações básicas e inserir a informação de percentual de dados faltantes
    df_novo = totaliza_uni (tabela, empresa, anos, cod_emps)
    if type(df_novo) == str:
        continue
     
    df_novo.fillna(0) #Tratando os valores nulos 
    df_novo['percentual']= df_novo['QTDE_LINHAS_TOTAL_PARQUET'] / df_novo['QTDE_LINHAS_TOTAL_DBF']
    #Reorganizando as colunas do df para corresponder com as colunas da planilha
    df_novo = df_novo[['ANO','EMPRESA','COD_EMP','TIPO','RAMO','QTDE_LINHAS_TOTAL_DBF','QTDE_LINHAS_TOTAL_PARQUET','percentual']]

    #Abrindo o arquivo para realizar modificações
    with pd.ExcelWriter(planilha, mode='a', engine='openpyxl',if_sheet_exists='overlay') as writer: #Só isso não carrega as planilhas existentes, modo = a significa anexar
        
        #Caso vc rode esse codigo e depois apague o conteudo das celulas no arquivo, na hora da contagem elas são consideradas como preenchidas, por isso é necessario excluir a linha 
        df_novo.to_excel(writer, sheet_name='Página1', index=False, 
                                startrow=writer.sheets['Página1'].max_row, startcol = 0, header=False)

# Emitir som após a execução
winsound.Beep(1000, 500)  # Frequência 1000 Hz, duração de 500 ms

#Referências de estudo:
#https://www.datacamp.com/tutorial/python-excel-tutorial#rdl
#https://xlsxwriter.readthedocs.io/working_with_pandas.html
#https://www.tutorialspoint.com/how-to-get-the-maximum-number-of-occupied-rows-and-columns-in-a-worksheet-in-selenium-with-python

In [None]:
#EXTRA

#Poderia adicionar tambem a contagem basica dos arquivos que estão em formato csv. Que foi uma conversão intermediária dos arquivos entre dbf e csv
def arq_csv(tabela, empresa, anos, cod_emp):
    
    tipo = tabela[:2]
    ramo = tabela[3:6]
    resultado = []
    resultado_c_erro = []
    for ano in anos:
        
        caminho_inicial = fr'\\PROJETOS\CDs_{ano}\{empresa}\{cod_emp}'
    

        for root, dirs, files in os.walk(caminho_inicial): #Permite fazer uma busca recursiva no diretório e seus subdiretórios
            #A função os.walk retorna um gerador que produz tuplas contendo três elementos: o diretório atual, uma lista dos subdiretórios presentes nesse diretório e uma lista dos arquivos presentes nesse diretório.
            print(f'Rodando para a {empresa} no ano de {ano} procurando arquivos {tabela}.csv')
            print(f"Explorando diretório: {root} \n")

            for file in files:
    
                if  file.lower() == f"{tabela.lower()}.zip":
                    zip_path = os.path.join(root,file)

                    # Abrir o arquivo ZIP
                    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                        arquivo_zip = zip_ref.namelist()

                        try:
                            with zip_ref.open(arquivo_zip[0]) as arquivo_csv: #Sempre só vai ter um arquivo no zip
                                # Lê o CSV diretamente no pandas
                                df = pd.read_csv(arquivo_csv)
                                qtde_linhas_total = df.shape[0]
                                linhas =pd.DataFrame({'ANO': [ano], 'EMPRESA': [empresa], 'COD_EMP': [cod_emp], 'TIPO': [tipo], 'RAMO': [ramo],'QTDE_LINHAS_TOTAL_CSV':[qtde_linhas_total]})

                                if len(files) > 1: #Condicional para realizar agrupamento só para os casos em que haja mais de um arquivo para um conjunto específico de tabela,empresa,ano e cod_emp
                                    print(f'Feito agrupamento para a {empresa} no ano {ano}')
                                    linhas = linhas.groupby(['ANO','EMPRESA','COD_EMP','TIPO','RAMO'])['QTDE_LINHAS_TOTAL_CSV'].sum().reset_index()  

                                else:
                                    linhas

                                resultado.append(linhas)

                        except Exception as e:
                        # Caso dê erro, adiciona à lista de arquivos com erro
                            print(f"Erro ao ler o arquivo {zip_path}: {e}")
                            resultado_c_erro.append(zip_path)
            

    if len(resultado) == 0:    
        return f'Não há {tabela}.csv para {empresa} no {ano}' 
    
    resultado = pd.concat(resultado,ignore_index=True)      
    resultado = resultado.groupby(['ANO','EMPRESA','COD_EMP','TIPO','RAMO'])['QTDE_LINHAS_TOTAL_CSV'].sum().reset_index()  

    return resultado , resultado_c_erro