In [1]:
# Importa bibliotecas
import pandas as pd
from datetime import date
import numpy as np
import pickle
import re
from itertools import product, combinations, permutations
from concat_files import concat_rrvs

In [2]:
pd.set_option('display.max_columns', None)

In [3]:
# Valida CNPJ 
def validar_cnpj(s: pd.Series): 
    digitos = s.str.extract(r'(\d)(\d).(\d)(\d)(\d).(\d)(\d)(\d)/(\d)(\d)(\d)(\d)-').astype(np.uint8,errors="ignore")
    dv = s.str.extract(r'-(\d)(\d)').astype(np.uint8,errors="ignore")
    dig_1 = np.array([6,7,8,9,2,3,4,5,6,7,8,9 ],dtype=np.uint8)
    mult1 = np.sum(np.multiply(digitos,dig_1),axis=1).astype(np.uint16)
    res1 = np.remainder(mult1,11).astype(np.uint8)
    res1[res1==10] = 0
    digitos =  pd.concat([digitos,res1],axis=1)
    dig_2 = np.array([5,6,7,8,9,2,3,4,5,6,7,8,9],dtype=np.uint8)
    mult2 = np.sum(np.multiply(digitos,dig_2),axis=1).astype(np.uint16)
    res2 = np.remainder(mult2,11).astype(np.uint8)
    res2[res2==10] = 0
    digitos =  pd.concat([digitos,res2],axis=1)
    return ((dv[0] - res1) == 0) & ((dv[1] - res2) == 0)

# Formata CNPJ
def formatar_cnpj(s: pd.Series, check_val =False,on_error = "pass"): 
    s= s.apply(str).str.strip() # Transforma CPJS em string
    s.replace(to_replace="\W", value=r"", regex=True,inplace=True)
    s = s.str.pad(14,"left","0")  # Faz padding left com zeros
    s.replace(to_replace="(\d{2})(\d{3})(\d{3})(\d{4})(\d{2})", value=r"\1.\2.\3/\4-\5", regex=True,inplace=True)
    
    if check_val or on_error == "raise":
        invalidos  = ~validar_cnpj(s)
        quant_errors = invalidos.count()
        print(f"{quant_errors} CNPJs inválidos:")
        display(s[invalidos])
    if on_error == "raise" and quant_errors > 0:
        raise 
    
    return s

# Normaliza nomes retirando caracteres não ascii
def normalizar_str(s: pd.Series,no_spaces = False):
    s = s.str.strip().str.upper()
    s  = s.str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8')
    s = s.replace(to_replace=r"\s{1,}", value=r"", regex=True)      
    return s


# Formata CEG
def formatar_ceg(s: pd.Series):
    s = s.str.strip().str.upper()
    s.replace(to_replace="\W", value=r"", regex=True,inplace=True)
    s = s.str.replace(r"(\w{3})(\w{2})(\w{2})(\d{6})(\d{1}).*",r"\1.\2.\3.\4-\5",regex=True)
    return s

# Extrai as seguintes informações a partir do CEG: "Geração","Fonte","UF","ID","DV"
def extrair_info_ceg(s: pd.Series):
    ceg_info = s.str.extract("(\w{3}).(\w{2}).(\w{2}).(\d{6})-(\d{1})")
    ceg_info.columns = ["Geração","Fonte","UF","ID","DV"]
    ceg_info["ID"] = ceg_info["ID"].astype(int) 
    return ceg_info



# Junta dicionário. Retorna erro caso haja valores distintos para uma mesma chave.
def merge_dict(dict_a,dict_b,stop_first=False):
    keys = list(dict.fromkeys((*dict_a,*dict_b)))
    erros = erros = {"dict_a":[],"dict_b":[]}
    index = []
    for key in keys:
        if (key in dict_a) and (key in dict_b):
            if dict_a[key] != dict_b[key]:
                if stop_first:
                    raise Exception(f" dict_a[{key}] = {dict_a[key]} and dict_b[{key}] = {dict_b[key]}")
                else:
                    erros["dict_a"].append(dict_a[key])
                    erros["dict_b"].append(dict_b[key])
                    index.append(key)
    if len(erros["dict_a"] + erros["dict_b"]) > 1:
        print("Diferenças encontradas:")
        display(pd.DataFrame(erros,index=index))
        raise Exception(f"Valores distintos para uma mesma chave.")
    return {**dict_a,**dict_b}


def find_diff(df,cols):
    mask_diff = ~pd.Series(index=rv_17.index,dtype=bool)
    for g1 , g2 in list(permutations(cols,2)):
        mask_diff |=  ((df[g1] != df[g2]) & (df[g1].notna() &  df[g2].notna()))
    return mask_diff

In [4]:
# Parâmetros de coluna
colunas_rv_dim = ['USINA'] # Colunas necessárias para criação da tabela dimensão
colunas_rv_fact = ['USINA',"DATA","RECEITA DE VENDA MENSAL (R$)"] # Colunas necessárias para criação da tabela fato


# Dicionários para renomear colunas
dict_colunas_rv_17 = {
                      "EMPREENDIMENTO":"USINA1",
                      'TOTAL DA RECEITA DE VENDA (R$)':"RECEITA DE VENDA MENSAL (R$)",}
 #                     "CNPJ VENDEDOR": "CNPJ VENDEDOR"}

In [5]:
rv_17 = pd.read_parquet("./Inputs/RV017.gzip")
rv_13 = pd.read_parquet("./Inputs/RV013.gzip")
relatorio_ceg_cnpj = pd.read_excel("./Inputs/RelatorioEmpreendimentoCEG_CNPJ.xlsx")[['NumCnpj','CodCEG','NomEmpreendimento']]
relatorio_ceg_cnpj.columns = ['CNPJ','CEG','USINA1'] # Renomeia colunas

rv_17.dropna(how="all",inplace = True) # Exclui linhas que possuam dados vazios
rv_13.dropna(how="all",inplace = True)
rv_17.rename(columns= dict_colunas_rv_17,inplace= True)
rv_13.rename(columns= dict_colunas_rv_17,inplace= True)

rv_17["CNPJ VENDEDOR"]= formatar_cnpj(rv_17["CNPJ VENDEDOR"])
rv_13["CNPJ VENDEDOR"]= formatar_cnpj(rv_17["CNPJ VENDEDOR"])
del rv_17["Unnamed: 0"]
del rv_13["Unnamed: 0"]

In [6]:
# Remove linhas com numero de CEG, CNPJ e nome de USINA sejam todos iguais
relatorio_ceg_cnpj = relatorio_ceg_cnpj.dropna(subset=["CEG","USINA1","CNPJ"],how="any")

# Formata CNPJ e CEG e extrai ID, e tipo de geração
relatorio_ceg_cnpj["CNPJ"] = formatar_cnpj(relatorio_ceg_cnpj["CNPJ"]) # Formata CNPJ
relatorio_ceg_cnpj['CEG'] = formatar_ceg(relatorio_ceg_cnpj["CEG"])  # Formata CEG
relatorio_ceg_cnpj["ID"] = extrair_info_ceg(relatorio_ceg_cnpj["CEG"])["ID"]
relatorio_ceg_cnpj["Geração"] = extrair_info_ceg(relatorio_ceg_cnpj["CEG"])["Geração"]
relatorio_ceg_cnpj_copy = relatorio_ceg_cnpj.copy()
relatorio_ceg_cnpj = relatorio_ceg_cnpj.drop_duplicates(subset=["ID","USINA1","CNPJ"])



relatorio_ceg_cnpj.USINA1 = normalizar_str(relatorio_ceg_cnpj.USINA1)
relatorio_ceg_cnpj.USINA1= relatorio_ceg_cnpj.USINA1.replace("(UHE|CGH|PCH|UTE|UTN|EOL|UFV|AHE)","",regex=True)

# Exemplo de uso desta expressão regular em: https://regex101.com/r/NYcNl2/1
padrao = r'\((?:ANTIGA)?(.*)\)'


relatorio_ceg_cnpj["USINA2"] = relatorio_ceg_cnpj.USINA1.str.extract(padrao)[0]
relatorio_ceg_cnpj.replace(padrao,"",regex=True,inplace=True)


mask_usina2 = relatorio_ceg_cnpj.USINA2.notna()
relatorio_ceg_cnpj["USINA12"] = relatorio_ceg_cnpj.USINA1
relatorio_ceg_cnpj.loc[mask_usina2,"USINA12"] = relatorio_ceg_cnpj.USINA2
relatorio_ceg_cnpj = relatorio_ceg_cnpj.drop_duplicates(subset=["ID","USINA2","CNPJ"])
relatorio_ceg_cnpj = relatorio_ceg_cnpj.drop_duplicates(subset=["ID","USINA12","CNPJ"])
print(relatorio_ceg_cnpj.USINA1.count())

16303


In [7]:
relatorio_ceg_cnpj_rep = relatorio_ceg_cnpj.copy()
ceg_rep = relatorio_ceg_cnpj[relatorio_ceg_cnpj.duplicated(subset=["ID"],keep=False)].copy()
relatorio_ceg_cnpj.drop_duplicates(subset=["ID"],inplace=True,keep="first")
relatorio_ceg_cnpj = relatorio_ceg_cnpj[~(relatorio_ceg_cnpj.USINA1.duplicated(keep=False) | relatorio_ceg_cnpj.USINA12.duplicated(keep=False))].copy()
print(relatorio_ceg_cnpj.USINA1.count())



mask_cegs_repetidos_nomes_unicos = ceg_rep.ID.isin(relatorio_ceg_cnpj.ID)
relatorio_ceg_cnpj = pd.concat([relatorio_ceg_cnpj,ceg_rep[mask_cegs_repetidos_nomes_unicos]],ignore_index=True)
print(relatorio_ceg_cnpj.USINA1.count())

relatorio_ceg_cnpj.sort_values(by="ID",inplace=True)

15659
15925


In [8]:
rv_17["USINA"] = rv_17.USINA1
rv_17.USINA1 = normalizar_str(rv_17.USINA1)
rv_17["geracao1"] = rv_17.USINA1.str.extract("(UHE|CGH|PCH|UTE|UTN|EOL|UFV)",flags=re.IGNORECASE)
rv_17["geracao2"] = rv_17.VENDEDOR.str.extract("(UHE|CGH|PCH|UTE|UTN|EOL|UFV)",flags=re.IGNORECASE)
dict_confiabilidade_geracao = {1 : 3, 2 : 2}
rv_17.USINA1 = rv_17.USINA1.replace("(UHE|CGH|PCH|UTE|UTN|EOL|UFV|AHE)","",regex=True)
rv_17["USINA2"] = rv_17.USINA1.str.extract(padrao)[0]
rv_17.USINA1.replace(padrao,"",regex=True,inplace=True)
mask_usina2 = rv_17.USINA2.notna()
rv_17["USINA12"] = rv_17.USINA1
rv_17.loc[mask_usina2,"USINA12"] = rv_17.USINA2


#############################################################

rv_13["USINA"] = rv_13.USINA1
rv_13.USINA1 = normalizar_str(rv_13.USINA1)
rv_13.USINA1 = rv_13.USINA1.replace("(UHE|CGH|PCH|UTE|UTN|EOL|UFV|AHE)","",regex=True)
rv_13["USINA2"] = rv_13.USINA1.str.extract(padrao)[0]
rv_13.USINA1.replace(padrao,"",regex=True,inplace=True)

In [9]:
rv_17_copy = rv_17.copy()
print(rv_17.USINA1.count())
composite_primary_key = ["USINA1","CNPJ VENDEDOR","USINA2"]
rv_17.drop_duplicates(subset=composite_primary_key,inplace=True,keep="first")
print(rv_17.USINA1.count())

352204
614


In [10]:
mask_difere_geracao = find_diff(rv_17,[f"geracao{i}" for i in [1,2]])
rv_17[mask_difere_geracao]

Unnamed: 0,DATA,USINA1,COMPRADOR,CNPJ COMPRADOR,VENDEDOR,CNPJ VENDEDOR,RECEITA DE VENDA MENSAL (R$),USINA,geracao1,geracao2,USINA2,USINA12


In [11]:
def remove_dup(s : pd.Series):
    return s[s.apply(lambda x : len(x)) == 1]

dict_geracao_cnpj_usina1 = remove_dup(relatorio_ceg_cnpj_rep.groupby(["Geração","CNPJ","USINA1"]).ID.apply(list))
dict_geracao_cnpj_usina2 = remove_dup(relatorio_ceg_cnpj_rep.groupby(["Geração","CNPJ","USINA2"]).ID.apply(list))


dict_cnpj_usina1 = remove_dup(relatorio_ceg_cnpj_rep.groupby(["CNPJ","USINA1"]).ID.apply(list))
dict_cnpj_usina2 = remove_dup(relatorio_ceg_cnpj_rep.groupby(["CNPJ","USINA2"]).ID.apply(list))

dict_geracao_usina1 = remove_dup(relatorio_ceg_cnpj_rep.groupby(["Geração","USINA1"]).ID.apply(list))
dict_geracao_usina2 = remove_dup(relatorio_ceg_cnpj_rep.groupby(["Geração","USINA2"]).ID.apply(list))



dict_confiabilidade_usina= {}


In [12]:
dict_usina1 = dict(zip(relatorio_ceg_cnpj[relatorio_ceg_cnpj.USINA1.notna()].USINA1,relatorio_ceg_cnpj[relatorio_ceg_cnpj.USINA1.notna()].ID))
dict_usina2 = dict(zip(relatorio_ceg_cnpj[relatorio_ceg_cnpj.USINA2.notna()].USINA2,relatorio_ceg_cnpj[relatorio_ceg_cnpj.USINA2.notna()].ID))

rv_17["usina011"] = rv_17.USINA1.map(dict_usina1)
rv_17["usina012"] = rv_17.USINA1.map(dict_usina2)

rv_17["usina021"] = rv_17.USINA2.map(dict_usina1)
rv_17["usina022"] = rv_17.USINA2.map(dict_usina2)


dict_confiabilidade_usina= {"usina011": 0,
                            "usina012": 0,
                            "usina021": 0,
                            "usina022": 0,}

In [13]:
#usina, geracao, metodo(exato ou n)
for index, row in rv_17.iterrows():
    for j in range(1,3): # usina1 ou usina2
        for i in range(1,3): # Loop para cada geração   
            if type(row[f"geracao{i}"]) == str:
                for k, dict_ in enumerate([dict_geracao_cnpj_usina1,dict_geracao_cnpj_usina2]):
                    try:
                        rv_17.loc[index,f"usina{j}_1_{k+1}_{i}"] = dict_[row[f"geracao{i}"]][row["CNPJ VENDEDOR"]][row[f"USINA{j}"]][0]
                        dict_confiabilidade_usina[f"usina{j}_1_{k+1}_{i}"] = dict_confiabilidade_geracao[i] + 10 - k
                    except KeyError as error:
                        pass
                for k, dict_ in enumerate([dict_geracao_usina1,dict_geracao_usina2]): 
                    try:
                        rv_17.loc[index,f"usina{j}_3_{k+1}_{i}"] = dict_[row[f"geracao{i}"]][row[f"USINA{j}"]][0]
                        dict_confiabilidade_usina[f"usina{j}_3_{k+1}_{i}"] = dict_confiabilidade_geracao[i] + 5 - k
                    except KeyError as error:
                        pass
    ########################################################################################################################################

        for k, dict_ in enumerate([dict_cnpj_usina1,dict_cnpj_usina2]):
            try:
                rv_17.loc[index,f"usina{4}_{j}_{k+1}_1"] = dict_[row["CNPJ VENDEDOR"]][row[f"USINA{j}"]][0]
                dict_confiabilidade_usina[f"usina{4}_{j}_{k+1}_1"] =  6 - k
            except KeyError:
                pass

lista_colunas_usinas = [col for col in rv_17.columns if col[0:5] =="usina"]
print(rv_17[find_diff(rv_17,lista_colunas_usinas)].USINA1.count())
print(rv_17[rv_17[lista_colunas_usinas].isna().all(axis=1)].USINA1.count())

0
40


In [14]:
rv_17["ID"] = np.nan
rv_17["Confiabilidade"] = 0
rv_17["Metodo"] = np.nan


for usina_num, confiabilidade in sorted(dict_confiabilidade_usina.items(), key=lambda item: item[1],reverse=True):
    for index, row in rv_17.iterrows():
        if pd.isna(row.ID)  and not  pd.isna(row[usina_num]):
            rv_17.loc[index,"ID"] = row[f"{usina_num}"]
            rv_17.loc[index,"Confiabilidade"] = confiabilidade
            rv_17.loc[index,"Metodo"] = usina_num

In [15]:
mask_export = rv_17.Metodo.isna() | (rv_17.Confiabilidade < 5)
rv_17_export = rv_17.loc[ mask_export,["USINA","CNPJ VENDEDOR"]]
rv_17_export.to_excel("./Intermediarios/rv_17_export.xlsx")
rv_17.loc[mask_export,"ID"] = pd.NA
rv_17.loc[mask_export,"Confiabilidade"] = 0

In [1]:
#raise

In [16]:
faltantes = pd.read_excel("./Intermediarios/rv_17_export_mod.xlsx",index_col="Unnamed: 0")[["USINA","CNPJ VENDEDOR","CEG"]]
faltantes.dropna(subset="CEG",inplace=True)
faltantes["CEG"] = formatar_ceg(faltantes.CEG)
faltantes["ID"] = extrair_info_ceg(faltantes["CEG"])["ID"]
faltantes = pd.merge(rv_17[["USINA","CNPJ VENDEDOR"]],faltantes[["USINA","CNPJ VENDEDOR","ID"]],on=["USINA","CNPJ VENDEDOR"],how="left")
faltantes.index = rv_17.index
rv_17.loc[rv_17.ID.isna(),"ID"] = faltantes

In [17]:
fact_cols = ["USINA","ID","DATA","RECEITA DE VENDA MENSAL (R$)","CNPJ VENDEDOR"]
rv_17_fact = pd.merge(rv_17_copy[["USINA1","USINA2","CNPJ VENDEDOR","DATA","RECEITA DE VENDA MENSAL (R$)"]],rv_17[["USINA1","USINA2","CNPJ VENDEDOR","ID","USINA"]],on=composite_primary_key,how="left",validate="many_to_one")[fact_cols]

In [18]:
rv_17_dim = rv_17_fact.drop_duplicates("ID")[["ID"]].dropna(subset=["ID"])

rv_17_dim = pd.merge(rv_17_dim,
        relatorio_ceg_cnpj_copy.drop_duplicates(subset="ID").rename(columns={"USINA1":"Usina Outorgas"}),
        on="ID",validate="one_to_one",suffixes=["","_rrv"])

In [19]:
# Arquivos com os RRVs
import glob
path_files = "./Inputs/RRVs/"
file_paths = glob.glob(f"{path_files}/*.xlsx")
colunas = ["Evento","Mês/Ano","CEG do Empreendimento","Sigla Parcela - Usina", "(S) TOT_RV_D p,t,l,e,m", 
"Perfil Agente - Vendedor","Perfil Agente - Comprador","CNPJ Agente - Comprador","CNPJ Agente - Vendedor"]
rrv = concat_rrvs(file_paths,colunas)
rrv["CNPJ Agente - Vendedor"] = formatar_cnpj(rrv["CNPJ Agente - Vendedor"])
rrv["(S) TOT_RV_D p,t,l,e,m"].fillna(0,inplace=True)

lista_colunas_match = ["Sigla Parcela - Usina","Perfil Agente - Vendedor","CNPJ Agente - Vendedor"]
dict_parcela_ceg =   rrv[rrv["CEG do Empreendimento"].notna()].drop_duplicates(subset=lista_colunas_match)[lista_colunas_match + ["CEG do Empreendimento"]].drop_duplicates(subset=lista_colunas_match)
cegs_identificados =  pd.merge(rrv[lista_colunas_match],dict_parcela_ceg,on=lista_colunas_match,how="left")["CEG do Empreendimento"]
rrv.loc[rrv["CEG do Empreendimento"].isna(),"CEG do Empreendimento"] = cegs_identificados

faltantes = pd.read_excel("./Intermediarios/Faltantes.xlsx")[lista_colunas_match + ["CEG do Empreendimento"]]
cegs_identificados =  pd.merge(rrv[lista_colunas_match],faltantes,on=lista_colunas_match,how="left")["CEG do Empreendimento"]
rrv.loc[rrv["CEG do Empreendimento"].isna(),"CEG do Empreendimento"] = cegs_identificados

rrv.dropna(subset="CEG do Empreendimento",inplace=True)
rrv["ID"] = extrair_info_ceg(rrv["CEG do Empreendimento"])["ID"]
rrv["DATA"] = pd.to_datetime(rrv["Mês/Ano"],utc=False)
display(rrv.isna().any())


rrv = rrv[["DATA","Sigla Parcela - Usina","ID","(S) TOT_RV_D p,t,l,e,m"]]
#rrv= rrv.rename(columns= {"CEG do Empreendimento":"CEG","Sigla Parcela - Usina":"SiglaUsina","(S) TOT_RV_D p,t,l,e,m":"tot"})[["DATA","SiglaUsina","ID","tot"]]


Lendo arquivo: ./Inputs/RRVs/Parquets/RRV_final_2021.gzip
Lendo arquivo: ./Inputs/RRVs/Parquets/RRV_final_2022_05.gzip
Lendo arquivo: ./Inputs/RRVs/Parquets/RRV_final_2022_06.gzip


Evento                       False
Mês/Ano                      False
CEG do Empreendimento        False
Sigla Parcela - Usina        False
(S) TOT_RV_D p,t,l,e,m       False
Perfil Agente - Vendedor     False
Perfil Agente - Comprador    False
CNPJ Agente - Comprador      False
CNPJ Agente - Vendedor       False
ID                           False
DATA                         False
dtype: bool

In [20]:
rrv = pd.merge(rrv,
        relatorio_ceg_cnpj_copy.drop_duplicates(subset="ID")[["ID","USINA1"]],
        on="ID",validate="many_to_one",suffixes=["","_rrv"]).rename(columns={"USINA1":"Usina"})

In [21]:
rv_17_fact["RECEITA DE VENDA MENSAL (R$)"].fillna(0,inplace=True)
rrv_sum = rrv.groupby(["DATA","ID","Usina"])[["(S) TOT_RV_D p,t,l,e,m"]].sum().reset_index()
rv_17_fact_sum = rv_17_fact.groupby(["DATA","ID","USINA"])[["RECEITA DE VENDA MENSAL (R$)"]].sum().reset_index()

In [22]:
comp = pd.merge(rrv_sum,rv_17_fact_sum,on=["DATA","ID"],how="outer")
datas_comuns = comp[comp['(S) TOT_RV_D p,t,l,e,m'].notna() &  comp["RECEITA DE VENDA MENSAL (R$)"].notna()].DATA.unique()
comp = comp.loc[comp.DATA.isin(datas_comuns),:]

In [23]:
# Vê-se que a usina Energia Madeiras foi identificada como a usina Cisframa 
diff = (abs(comp["RECEITA DE VENDA MENSAL (R$)"] - comp["(S) TOT_RV_D p,t,l,e,m"])>100) | ( (comp["RECEITA DE VENDA MENSAL (R$)"] >10) & (comp["(S) TOT_RV_D p,t,l,e,m"].isna()) ) |  ( (comp["(S) TOT_RV_D p,t,l,e,m"] >10) & (comp["RECEITA DE VENDA MENSAL (R$)"].isna()) )

comp[diff].sort_values(by="DATA").drop_duplicates("ID")

Unnamed: 0,DATA,ID,Usina,"(S) TOT_RV_D p,t,l,e,m",USINA,RECEITA DE VENDA MENSAL (R$)
29,2021-01-01,27250.0,Aparecida Parte I,27534050.47,,
46,2021-01-01,28140.0,Energia Madeiras,306608.64,,
15680,2021-01-01,27978.0,,,CISFRAMA,306608.64
4043,2021-09-01,28538.0,Altos,1110750.46,,
4042,2021-09-01,28537.0,Campo Maior,1110750.46,,
4040,2021-09-01,28535.0,Marambaia,1110750.46,,
4034,2021-09-01,28529.0,Baturité,914765.16,,
4035,2021-09-01,28530.0,Crato,1110770.46,,
4036,2021-09-01,28531.0,Aracati,914765.13,,
4037,2021-09-01,28532.0,Iguatu,1197428.25,,


In [24]:
ids_ruins = comp[diff].ID.unique()
rv_17_dim.loc[rv_17_dim.ID.isin(ids_ruins)].head(5)

Unnamed: 0,ID,CNPJ,CEG,Usina Outorgas,Geração
10,28538.0,04.735.623/0001-63,UTE.PE.PI.028538-2,Altos,UTE
13,27250.0,00.357.038/0001-16,UTE.GN.AM.027250-7,Aparecida Parte I,UTE
14,28531.0,04.735.629/0001-30,UTE.PE.CE.028531-5,Aracati,UTE
70,28529.0,04.735.629/0001-30,UTE.PE.CE.028529-3,Baturité,UTE
113,28537.0,04.735.623/0001-63,UTE.PE.PI.028537-4,Campo Maior,UTE


In [34]:
# Exclui casos que a USINA e o ID são idênticos assim como CNPJ e ID
rv_17_teste = rv_17.drop_duplicates(subset=["USINA1","ID"],keep=False).drop_duplicates(subset=["CNPJ VENDEDOR","ID"],keep=False)

In [35]:
# Vê-se que o único caso que existe IDs iguais para USINA1 e CNPJ diferentes, trata-se da mesma usina. Logo, USINA1 será usada como chave primária
rv_17_teste[rv_17_teste.ID.duplicated(keep=False)].head(20).sort_values(by="USINA1")[["USINA1","ID","CNPJ VENDEDOR"]]

Unnamed: 0,USINA1,ID,CNPJ VENDEDOR
232721,SALTOPILAO-CAMARGOCORREA,28564.0,11.792.578/0001-44
232841,SALTOPILAO-DME,28564.0,03.966.583/0001-06


In [36]:
rv_13_fact = pd.merge(rv_13[["DATA","RECEITA DE VENDA MENSAL (R$)","USINA"] + composite_primary_key],
        rv_17[["ID"] + composite_primary_key].drop_duplicates(subset="USINA1"),
        on = "USINA1",
        how="left",
        suffixes=["","_x"],
        validate="many_to_one")[fact_cols]

In [37]:
# Casos não identificados no RV_13
rv_13_fact[rv_13_fact.ID.isna()].drop_duplicates(subset="ID")

Unnamed: 0,USINA,ID,DATA,RECEITA DE VENDA MENSAL (R$),CNPJ VENDEDOR
1345,SAO DOMINGOS,,2020-01-01,70.95,10.618.009/0001-14


In [38]:
# Casos não identificados no RV_17
rv_17_fact[rv_17_fact.ID.isna()].drop_duplicates(subset="ID")

Unnamed: 0,USINA,ID,DATA,RECEITA DE VENDA MENSAL (R$),CNPJ VENDEDOR
5275,APARECIDA,,2020-01-01,28026171.77,17.957.780/0001-65


In [40]:
dict_cols_export = {"USINA":"Sigla Parcela - Usina","DATA":"Data",'RECEITA DE VENDA MENSAL (R$)':"(S) TOT_RV_D p,t,l,e,m",'CNPJ VENDEDOR':"CNPJ Agente - Vendedor"}

In [41]:
rv_13_fact = rv_13_fact.rename(columns=dict_cols_export)
rv_17_fact = rv_17_fact.rename(columns=dict_cols_export)

In [44]:
rrv_nao_ident = rv_17_fact[(rv_17_fact["(S) TOT_RV_D p,t,l,e,m"]> 0) & rv_17_fact.ID.isna()].copy()

In [47]:
rv_13_fact.dropna(subset="ID",inplace=True)
rv_17_fact.dropna(subset="ID",inplace=True)

In [49]:
rv_13_fact.to_parquet("./Intermediarios/RV_13_2020.gzip")
rv_17_fact[rv_17_fact.Data < "2021-01-01"].to_parquet("./Intermediarios/RV_17_2020.gzip")
rrv_nao_ident[rrv_nao_ident.Data < "2021-01-01"].to_parquet("./Intermediarios/rrv_nao_ident_2020.gzip")