# ONBOARD eDNE DATA

In this notebook we onboard the data in `LOG_LOGRADOURO_SP.TXT` from eDNE files stored here. **We only load the data from `Delimitado/LOG_LOGRADOURO_SP.TXT` as it's later used for enriching `susep.geo_info` with imputated information for missing CEPs.**
- Original data: https://drive.google.com/file/d/1lcB3sDltsATSyUVIeWAlkU4xrhcWTEj9/view



**PENDING**: Bringing this step along with the entire process to Airflow for onbaoarding the data of eDNE according to the following schema (as defined in the file `Delimitado/Leiautes_delimitador.doc`):

![alt text](images/edne_data_schema.png "Title")

In [1]:
# Imports
import pandas as pd
import numpy as np
import psycopg2, os
from tqdm import tqdm

#!pip install pyshp
import shapefile

# Establish connection and create its cursor
try: 
    conn = psycopg2.connect(f"host={os.environ['AURORA_POSTGRES_HOST']} dbname={os.environ['AURORA_POSTGRES_DATABASE']} user={os.environ['AURORA_POSTGRES_USERNAME']} password={os.environ['AURORA_POSTGRES_PWD']}")
    cur = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Postgres database")
    print(e)
    

---

## 1. Load the data

Define the data schema according to `Delimitado/Leiautes_delimitador.doc`

In [2]:
columns = {"LOG_NU".lower(): {'description': "chave do logradouro",
                              'data_type': "NUMBER(8)",
                              'postgres_data_type': "integer",
                              'df_data_type': int},
           
           "UFE_SG".lower(): {'description': "sigla da UF",
                              'data_type': "CHAR(2)",
                              'postgres_data_type': "varchar(2)",
                              'df_data_type': str},
           
           "LOC_NU".lower(): {'description': "chave da localidade",
                              'data_type': "NUMBER(8)",
                              'postgres_data_type': "integer",
                              'df_data_type': int},
           
           "BAI_NU_INI".lower(): {'description': "chave do bairro inicial do logradouro",
                                  'data_type': "NUMBER(8)",
                                  'postgres_data_type': "integer",
                                  'df_data_type': int},
           
           "BAI_NU_FIM".lower(): {'description': "chave do bairro final do logradouro (opcional)",
                                  'data_type': "NUMBER(8)",
                                  'postgres_data_type': "integer",
                                  'df_data_type': int},
           
           "LOG_NO".lower(): {'description': "nome do logradouro",
                              'data_type': "VARCHAR2(100)",
                              'postgres_data_type': "varchar(100)",
                              'df_data_type': str},
           
           "LOG_COMPLEMENTO".lower(): {'description': "complemento do logradouro (opcional)",
                                       'data_type': "VARCHAR2(100)",
                                       'postgres_data_type': "varchar(100)",
                                       'df_data_type': str},
           
           "CEP".lower(): {'description': "CEP do logradouro",
                           'data_type': "CHAR(8)",
                           'postgres_data_type': "varchar(8)",
                           'df_data_type': str},
           
           "TLO_TX".lower(): {'description': "tipo de logradouro",
                              'data_type': "VARCHAR2(36)",
                              'postgres_data_type': "varchar(36)",
                              'df_data_type': str},
           
           "LOG_STA_TLO".lower(): {'description': "indicador de utilização do tipo de logradouro (S ou N) (opcional)",
                                   'data_type': "CHAR(1)",
                                   'postgres_data_type': "varchar(1)",
                                   'df_data_type': str},
           
           "LOG_NO_ABREV".lower(): {'description': "abreviatura do nome do logradouro (opcional)",
                                    'data_type': "VARCHAR2(36)",
                                    'postgres_data_type': "varchar(36)",
                                    'df_data_type': str}}


Now load the data

In [3]:
# Load the data
with open('../eDNE_Basico_2103 2/Delimitado/LOG_LOGRADOURO_SP.TXT', 
          'r', 
          encoding='latin-1') as filein:
    data = pd.read_csv(filein, sep='@', header=None)
    
# Assign the column names
data.columns = list(columns.keys())
display(data.shape)
data.head()


(306598, 11)

Unnamed: 0,log_nu,ufe_sg,loc_nu,bai_nu_ini,bai_nu_fim,log_no,log_complemento,cep,tlo_tx,log_sta_tlo,log_no_abrev
0,1001235,SP,8912,14716,,Octaviano de Arruda Campos,- de 960/961 ao fim,14810227,Avenida,S,Av Octaviano de A Campos
1,1001236,SP,8912,14760,,José Salles Gadelha,- até 108/109,14807048,Avenida,S,Av José S Gadelha
2,1001237,SP,8912,14668,,José Salles Gadelha,- de 110/111 ao fim,14807126,Avenida,S,Av José S Gadelha
3,1001239,SP,8912,55688,,Oswaldo Gonçalves de Jesus,,14805396,Avenida,S,Av Oswaldo G de Jesus
4,1001241,SP,8924,56583,,Pedro Eroles,"- até km 34,999",7434090,Rodovia,S,Rod Pedro Eroles


Check NA's

In [4]:
data.isna().sum()

log_nu                  0
ufe_sg                  0
loc_nu                  0
bai_nu_ini              0
bai_nu_fim         306598
log_no                  1
log_complemento    263757
cep                     0
tlo_tx                  0
log_sta_tlo             0
log_no_abrev            0
dtype: int64

Fill with `-1` and enforce data types

In [5]:
# Fillna+
for col in ['bai_nu_fim', 'log_no', 'log_complemento']:
    data[col] = data[col].fillna(-1)
    
# Enforce data types
for col in data.columns:
    data[col] = data[col].astype(columns[col]['df_data_type'])

Pad the zeroes on CEP field to length 8

In [6]:
display(data["cep"].apply(lambda x: len(x)).value_counts())
data["cep"] = data["cep"].apply(lambda x: x if len(x)==8 else "0"+x)
display(data["cep"].apply(lambda x: len(x)).value_counts())


8    196535
7    110063
Name: cep, dtype: int64

8    306598
Name: cep, dtype: int64

Export data and upload to S3 for copy command. Uploaded to `s3://postgres-staging-data/edne/eDNE_LOG_LOGRADOURO_SP_data.csv`

In [7]:
data.to_csv("eDNE_LOG_LOGRADOURO_SP_data.csv", index=False)

---

## 2. Data to Postgres

Create the schema if it doesn't exist

### 2.1. Create the table in Postgres

In [8]:
cur.execute("CREATE SCHEMA IF NOT EXISTS edne")
conn.commit()


Create the table if it doesn't exist. Truncate it to insert the records

In [9]:
cur.execute(f"CREATE TABLE IF NOT EXISTS edne.log_logradouro_sp ({', '.join([k.lower()+' '+v['postgres_data_type'] for k,v in columns.items()])})")
conn.commit()

cur.execute("TRUNCATE TABLE edne.log_logradouro_sp")
conn.commit()


### 2.2. Populate the table

Load the data from S3 to the table.

NOTES:
- Run the statement below from the DBeaver editor, won't work from here
- Make sure to refresh your credentials instead of taking them from env variables

In [10]:
# Create a custom copy statement for the file
statement = f"""
    SELECT aws_s3.table_import_from_s3(
    'edne.log_logradouro_sp',
    'log_nu,ufe_sg,loc_nu,bai_nu_ini,bai_nu_fim,log_no,log_complemento,cep,tlo_tx,log_sta_tlo,log_no_abrev',
    '(FORMAT CSV, HEADER true)',
    aws_commons.create_s3_uri(
        'postgres-staging-data',
        'edne/eDNE_LOG_LOGRADOURO_SP_data.csv',
        'us-east-1'
        ),
    aws_commons.create_aws_credentials(
        '{os.environ['AWS_ACCESS_KEY_ID']}',
        '{os.environ['AWS_SECRET_ACCESS_KEY']}'
        'IQoJb3JpZ2luX2VjEKn//////////wEaCXVzLWVhc3QtMiJGMEQCICTZ5cfpWXRXxiO+cL6wk6GPXEjWeN2+MDmeNQ7GUZmqAiAuYXrnlwI75e5VztN19PnBdZpyxRCcuRZU5zW90tpdTiqXAwjC//////////8BEAEaDDgyNzMwMzI5MjY1NSIMLR25Lh7mp2rh12FeKusCegrq/nuU2tJPma6YMYxYsaTlbybPN2iTXgw23brSxKh+MHgBEj0a++Pqu9ZHGJ0W4knWfq7QaAxe+eNzJNAWr+YvuyFcv070pDrWehZCswAtshyZYJKANrx5+cK92is6lyHgdDgo5ITGvMycpUvhB7ag8P3Sor44TMMn3i1eJR4a5AtSTuezbP8Yo+5RsxajENzlI+XsGf4Vu8YrlWfmGfWqmKmPAZxqELIF6TT7j2GDOV8Sc99k2CvVFlhV21Q8il4/zq9MPqWfpLJ3wMtNAVAhR9o+jcG0hap7swoudk3fcwE4AObA6Q8Khue6xwxHVK/BdzC4WWLvz9V5K1M1c/YDJpE1w4BnGabctA94sz9nI6FJkEY5bfG3+W/dJQZDB86RvNzsa+v7lXjKCtibVvNvrAxFk2NYxttIsbLsbjLbPW+E4lkz9DXqo8Lodwecy/K81+/OlpYo/oWGlJzVeefue7hGbKcaUnwYMKy6lo8GOqcB/OPyF/cxPqMe+dlwXd/hohcMvlfJ887FE0swDvs/Y5rJXH9FVQ8kQP0J/JwLT48ylFdqRgAzrFzdZ96eP2LujCtXsn1NwlIc0di2BALRMrNPz0scEdYdbdkLhGAIZ6O/oCFEeGjEFc+8R+vy0CeoeQ3+pzy8f95m1rbvMvwYdUp9jRjqISE/+YiMSByWjbiCYTyohUD0z1IVky03N3vNeVSaBvmTW44=')
    );
"""

**IF POSTGRES THROWS AND EXTENSION ERROR**: We can insert from cache, much slower

In [11]:
for i, record in tqdm(data.iterrows()):
    insert_statement = f"INSERT INTO edne.log_logradouro_sp ({', '.join(record.index)}) VALUES ({', '.join(['%s']*len(record))})"
    cur.execute(insert_statement,
                tuple([v for k,v in record.items()])
               )
    
conn.commit()



306598it [10:00:25,  8.51it/s]
