In [1]:
import pandas as pd
import numpy as np
import re
import pymysql
import sys
sys.path.append('C:\\Users\\Federico\\Desktop\\Broker\\RNS\\src')
import rns_functions as F

In [2]:
## Paths
file_path = '../data/2019/registro-nacional-sociedades-201909.csv'
file_next_date = pd.to_datetime(file_path[-10:-4]+'01') + pd.DateOffset(months= 1)

In [3]:
## ****************** VALIDACIONES GENERALES ********************

## -------------------- HEADERS -----------------------

with open(f'../logs/log_{file_path[-10:-4]}.txt', 'w+') as log:
        log.write(f'Cargando el archivo -> {file_path[13:]}\r\n')
        
with open('../data/headers.csv', 'r', encoding= 'utf-8-sig') as f:
    headers = f.readline().replace('\n','').split(',')
        
try:
        
    F.f_check_headers(file_path)
    
    rns = pd.read_csv(file_path, header= 0, dtype= 'string').drop(columns= 'numero_inscripcion')

    with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
            log.write('Archivo cargado\r\n')
        
except Warning:
    
    rns = pd.read_csv(file_path, header= None, names= headers, dtype= 'string').drop(columns= 'numero_inscripcion')
    
    with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
            log.write('Warning: Archivo cargado, no hay Headers\r\n')
    
except ValueError:
    
    with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
            log.write('Error: Archivo no cargado, hay campos distintos a los esperados\r\n')

## ---------------- FILE COLUMNS ----------------------

# Procesamiento

strings = ['razon_social', 'tipo_societario', 'dom_fiscal_provincia',
           'dom_fiscal_localidad', 'dom_fiscal_calle', 'dom_fiscal_piso',
           'dom_fiscal_departamento', 'dom_fiscal_estado_domicilio', 'dom_legal_provincia',
           'dom_legal_localidad', 'dom_legal_calle', 'dom_legal_piso', 'dom_legal_departamento',
           'dom_legal_estado_domicilio']

# Lower todos los tipos strings
rns.loc[:,strings] = rns[strings].apply(lambda x: F.f_proceso_string(x))

# Corrijo la fecha de constitucion para ser YYYY-MM-DD
rns['fh_contrato_social'] = rns['fecha_contrato_social'].str[:10] 
rns['fh_contrato_social'] = pd.to_datetime(rns['fh_contrato_social'], errors= 'coerce')
rns.loc[rns['fh_contrato_social'] >= file_next_date, 'fh_contrato_social'] = np.NaN

# Lo mismo hago con la fecha de la ultima actualizacion
rns['fh_actualizacion'] = rns['fecha_actualizacion'].str[:10] 
rns['fh_actualizacion'] = pd.to_datetime(rns['fh_actualizacion'], errors= 'coerce')
rns.loc[rns['fh_actualizacion'] >= file_next_date, 'fh_actualizacion'] = np.NaN

# Pongo en Integer el Codigo Postal y el domicilio 
rns['dom_fiscal_cp'] = pd.to_numeric(rns['dom_fiscal_cp'], errors= 'coerce').astype('Int64')
rns['dom_legal_cp'] = pd.to_numeric(rns['dom_legal_cp'], errors= 'coerce').astype('Int64')
rns['dom_fiscal_numero'] = pd.to_numeric(rns['dom_fiscal_numero'], errors= 'coerce').astype('Int64')
rns['dom_legal_numero'] = pd.to_numeric(rns['dom_legal_numero'], errors= 'coerce').astype('Int64')

# Valido la estructura de los CUITs
rns['cuit_valido'] = F.f_cuit_validation(rns['cuit'])

# Validacion NAs
columnas = ['razon_social','fh_contrato_social','tipo_societario','fh_actualizacion','dom_fiscal_provincia','dom_fiscal_localidad','dom_fiscal_calle','dom_fiscal_numero','dom_fiscal_piso','dom_fiscal_departamento','dom_fiscal_cp','dom_fiscal_estado_domicilio','dom_legal_provincia','dom_legal_localidad','dom_legal_calle','dom_legal_numero','dom_legal_piso','dom_legal_departamento','dom_legal_cp','dom_legal_estado_domicilio']
nulos = rns[columnas].isnull().mean()

with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
    
    log.write('NROWS: {}\r\n'.format(rns.shape[0]))
    log.write('CUITS OK: {} ; CUITS No Validos: {} ; CUITS NAs: {}\r\n'.format(rns['cuit_valido'].sum(),(~rns['cuit_valido']).sum(),rns['cuit'].isnull().sum()))
    
    if rns['cuit_valido'].sum() == rns.shape[0]:
        log.write('NROWS = CUITS -> OK\r\n')
    else:
        log.write('NROWS = CUITS -> WARNING!!\r\n')
    
    log.write('\r\n'*2)
    log.write('TABLA DE NAs:\r\n')
    
    for i in nulos.keys():
        log.write('\t {}: {:.2f}%\r\n'.format(i,nulos[i]*100))

    log.write('\r\n'*2)
    
## ------------------ DROP NAs ------------------------

# Estos 3 campos tienen que tener valor siempre
invalid_rows = rns[['cuit','razon_social','fh_contrato_social']].isnull().any(axis= 1)
rns = rns.loc[~invalid_rows,:]

with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
    
    log.write('INVALID ROWS: {} ; as %: {:.2f}%\r\n'.format(invalid_rows.sum(),invalid_rows.mean()*100))
    

## ----------- REEMPLAZO TEXTO POR CODIGOS -------------

mysql = F.f_make_mysql_connection('empresas')
with mysql.cursor() as cur:
    cur.execute('select * from look_provincia')
    
    look_provincia = {prov: cod for cod, prov in cur.fetchall()}
    
    cur.execute('select * from look_tipo_societario')
    
    look_tipo_societario = {soc: cod for cod, soc in cur.fetchall()}    
    
    cur.execute('select cd_estado_domicilio, nb_estado_domicilio from look_estado_domicilio')
    
    look_estado_domicilio = {est: cod for cod, est in cur.fetchall()}
    
rns['cd_provincia_dom_fiscal'] = rns['dom_fiscal_provincia'].apply(lambda x: look_provincia.get(x, -1))
rns['cd_provincia_dom_legal'] = rns['dom_legal_provincia'].apply(lambda x: look_provincia.get(x, -1))
rns['cd_estado_dom_fiscal'] = rns['dom_fiscal_estado_domicilio'].apply(lambda x: look_estado_domicilio.get(x, -1))
rns['cd_estado_dom_legal'] = rns['dom_legal_estado_domicilio'].apply(lambda x: look_estado_domicilio.get(x, -1))
rns['cd_tipo_societario'] = rns['tipo_societario'].apply(lambda x: look_tipo_societario.get(x, -1))

with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
    
    log.write('Reemplazo codigos de provincias, tipo societario y estado del domicilio -> OK\r\n')
    
## ------------------ BASE FINAL ------------------------

rns.rename(columns= {'cuit': 'nu_cuit',
                     'razon_social': 'nb_razon_social',
                     'dom_fiscal_localidad': 'nb_localidad_dom_fiscal',
                     'dom_fiscal_cp': 'cd_postal_dom_fiscal',
                     'dom_fiscal_calle': 'nb_calle_dom_fiscal',
                     'dom_fiscal_numero': 'nu_calle_dom_fiscal',
                     'dom_fiscal_piso': 'tx_piso_dom_fiscal',
                     'dom_fiscal_departamento': 'tx_depto_dom_fiscal',
                     'dom_legal_localidad': 'nb_localidad_dom_legal',
                     'dom_legal_cp': 'cd_postal_dom_legal',
                     'dom_legal_calle': 'nb_calle_dom_legal',
                     'dom_legal_numero': 'nu_calle_dom_legal',
                     'dom_legal_piso': 'tx_piso_dom_legal',
                     'dom_legal_departamento': 'tx_depto_dom_legal'},
           inplace= True)

with open(f'../logs/log_{file_path[-10:-4]}.txt', 'a') as log:
    
    log.write('Me quedo con la base final para actualizar\r\n')

In [20]:
variables_finales = ['nu_cuit', 'nb_razon_social', 'cd_tipo_societario', 'fh_contrato_social', 'fh_actualizacion',
                     'cd_provincia_dom_fiscal', 'nb_localidad_dom_fiscal', 'cd_postal_dom_fiscal',
                     'nb_calle_dom_fiscal', 'nu_calle_dom_fiscal', 'tx_piso_dom_fiscal', 'tx_depto_dom_fiscal','cd_estado_dom_fiscal',
                     'cd_provincia_dom_legal', 'nb_localidad_dom_legal', 'cd_postal_dom_legal',
                     'nb_calle_dom_legal', 'nu_calle_dom_legal', 'tx_piso_dom_legal', 'tx_depto_dom_legal','cd_estado_dom_legal',
                     'fh_inicio_registro','fh_fin_registro']

In [96]:
rns_final = rns[variables_finales[:-2]].copy()

In [97]:
rns_final.loc[:,'fh_inicio_registro'] = '1900-01-01'
rns_final.loc[:,'fh_fin_registro'] = '2100-12-31'
rns_final.loc[:,['fh_contrato_social','fh_actualizacion']] = rns_final.select_dtypes('datetime').apply(lambda x: x.dt.date.astype('string'))

In [98]:
%%time
with mysql.cursor() as cur:
    
    n = len(variables_finales)
    sql = 'insert into `registro_sociedades` (`{}`) values ({}%s)'.format('`,`'.join(variables_finales), '%s,'*(n-1))
    

    n = rns_final.shape[0]
    i = 0
    chunks = F.f_get_chunks(n)
    while chunks:
        chunk = chunks.pop(0)
        null_to_none = np.where(rns_final.iloc[i:chunk,:].isnull(),None,rns_final.iloc[i:chunk,:])
        list_input = null_to_none.tolist()
        
        cur.executemany(sql, list_input)
        mysql.commit()    
        
        print(f'Commited: {chunk}')
        i = chunk

mysql.close()

Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited: chunk
Commited