# ETL IRIS Coordenadas

## Funções de extração de dados dos arquivos .txt de entrada

In [1]:
def extract_lat_lng(file):
    
    f = open(file,'r')
    
    coord = []
    lat = None
    lng = None
    
    for line in f:
        
        if ("Latitude" in line) and lat == None:
            #Talvez essa solução não seja a mais segura (utilizar o S para dividir a linha)
            lat = line.split("S")[1].split("\n")[0]
        if ("Longitude" in line) and (lng == None):
            #Talvez essa solução não seja a mais segura (utilizar o W para dividir a linha)
            lng = line.split("W")[1].split("\n")[0]
            
            coord.append([lat,lng])
            
            lat = None
            lng = None

    print("--Finished Path:",file)
    
    return coord

In [2]:
def extract_data(files):
    print("Extraindo Dados...")
    #Entrada: Lista de caminhos de arquivos para leitura
    #Saída: Lista com todas coordenadas de todos os arquivos lidos
    
    #lança para o maior número de processos possíveis para se tornar mais eficiente para leitura de diversos arquivos 
    #***cada thread assume um arquivo***
    
    coordinates = []
    
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    coordinates_temp = pool.map(extract_lat_lng, files)
    
    for f in coordinates_temp:
        coordinates += f
        
    print("--Número de Coordendas total", len(coordinates))

    return coordinates
    

## Funçôes de Transformação das coordenadas extraídas

In [3]:
def transform_data(coordinates):    
    print("Transformando dados... \n--Aguarde, Requisições à API.")
    #Entrada: Lista com coordenadas agrupadas em listas [[coord1_lat,coord1_lng],[coord2_lat,coord2_lng]]
    #Sáida DataFrame pandas, com as informações obtidas de cada coordenada
    
    #Processo é paralelizada de acordo com as threads disponíveis, visando
    #obter melhor performance na obtenção das informações através da API
    
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    dict_temp = pool.map(get_location, coordinates)

    df_temp = pd.DataFrame(data=dict_temp)
    
    #Remove as linhas repetidas baseadas na latitudes e longitude, são removidas as linhas com menos valores
    df_temp['count'] = pd.isnull(df_temp).sum(1)
    df_temp = df_temp.sort_values(['count']).drop_duplicates(subset=['latitude','longitude'],keep='first').drop('count',1)
    df_temp.sort_index(inplace=True)
    
    #Remove as linhas as quais o subset é nulo.
    df_temp.dropna(subset=['pais','estado','cidade','bairro'], inplace=True)
    
    df_temp.set_index(['latitude','longitude'], inplace=True)
        
    return df_temp

### Função da API geopy para obtenção de informações das coordenas

In [4]:
import geopy
from geopy.exc import GeocoderTimedOut
from geopy.extra.rate_limiter import RateLimiter

#https://developer.mapquest.com/user/me/apps
#mapquest
CONSUMER_KEY = 'zddGiPAGZEBPTSKYphGuIhSixhyWTcuJ'

df_columns = ["latitude","longitude","rua","numero","bairro","cidade","cep","estado","pais"]
geopy_columns = ["road","house_number","suburb","city","postcode","state","country"]

def get_location(coord):
    
    dict_location = {}
    
    if (coord[0] != None) and (coord[1] != None):
        dict_location['latitude'] = coord[0].strip()
        dict_location['longitude'] = coord[1].strip()
    
        try:
            #https://developer.mapquest.com/user/me/apps
            geolocator = geopy.geocoders.OpenMapQuest(api_key=CONSUMER_KEY)
            RateLimiter(geolocator.reverse, min_delay_seconds=1)
            #geolocator = geopy.geocoders.GoogleV3(api_key = "") 
            location = geolocator.reverse(str(coord[0]) + ", " + str(coord[1]), timeout=10)

        except GeocoderTimedOut as e:
            print("Error: geocode failed on input %s with message %s"%(coord[0]+coord[1], e.message))

        for i in range(2,len(df_columns)):

            try:
                dict_location[df_columns[i]] = location.raw['address'][geopy_columns[i-2]]
            except:
                dict_location[df_columns[i]] = None
    
    return dict_location
       

## Funções do database (conexão, visualização, inserção)

In [5]:
def print_table_db(table):
    conn = get_connection_db()
    try:
        rows = conn.execute('SELECT * FROM {};'.format(table))
        for n,r in enumerate(rows):
            print(n,r)
    except:
        print("Tabela nao existe")

In [6]:
import psycopg2
from sqlalchemy import create_engine

def get_connection_db(host_ = 'localhost', dbname_ = 'testeiris', user_ = 'douglas', password_ = '123'):
    try:
        return create_engine(
            'postgresql+psycopg2://{}:{}@{}/{}'.format(user_,password_,host_,dbname_)
            )

    except Exception as e:
        print(e)
        return None



In [None]:
def remove_df_db_dup(df,db_rows):
    print("--Removendo duplicados presentes no dataframe e database...")
    for i in db_rows:
        try:
            df = df.drop((i[0],i[1]))
        except Exception:
            pass

    return df

In [7]:
def append_df_in_db(df,conn):
    try:
        df = remove_df_db_dup(df,conn.execute("SELECT * FROM locations"))
        df.to_sql('locations', con = conn, if_exists = 'append', chunksize = 1, index=True)
        print("--Adicionado Novos Valores na Tabela.")
    except:
        try:
            df.to_sql('locations', con = conn, if_exists = 'append', chunksize = 1, index=True)
            conn.execute('ALTER TABLE locations ADD PRIMARY KEY (latitude, longitude);')
            print("--Primeira inserção de valores na tabela.")
        except Exception as e:
            print("--Exeception message:\n",e)

In [8]:
def load_data(df):
    print("Carregando dados no DB")
    #para utilização é necessário ter um DB.
    #os paramentros de conexão ao DB podem ser alterados
    #host_ = 'localhost', dbname_ = 'testeiris', user_ = 'douglas', password_ = '123'
    conn = get_connection_db()
    append_df_in_db(df, conn)
    conn.dispose()


## Main

In [9]:
import pandas as pd
import multiprocessing
import glob
import time

#Caminho onde se encontram os arquivos .txt contendo as informações de coordenadas
path_master='../ETL_Coordinates/Arquivos Extração'

files = [f for f in glob.glob(path_master + "**/*.txt", recursive=True)]

if len(files) > 0:
    print("\nProcesso de ETL iniciado:\n\n")
    files.sort()

    start = time.time()

    #O processo de extração tem como parametro uma lista de caminho de arquivos
    #a serem extraidas as coordenadas, retornando uma lista com todas as coordenadas
    #de todos os arquivos passados inicialmente (esse processo é feito paralelamente)
    coordinates = extract_data(files)

    #O processe de transformação dos dados tem como parametro a lista de coordenadas
    #as quais serão obtidas as informações como rua, bairro, etc. retornando um dataframe pandas
    df_locations = transform_data(coordinates)

    #O processo de carregamento é feito através da inserção do dataframe em uma tabela prédefinida
    #onde são adicionados os dados ao fim da tabela se ela já existe como também criando a tabela caso não.
    #Esse código pode ser executado diversas vezes com arquivos diferentes como uma única vez com todos os arquivos.
    load_data(df_locations)

    end = time.time()
    
    tempo_total = end - start
    if(float() > 60.0):
        print("\n\nTempo total decorrido:",tempo_total / 60,"m")
    else:
        print("\n\nTempo total decorrido:",tempo_total,"s")

else:
    print("Nenhum arquivo encontrado.")

    

Extraindo Dados
Finished Path: ../ETL_Coordinates/Arquivos Extração/data_points_20180102.txt
Número de Coordendas total 301
Transformando dados... 
Aguarde, Requisições à API.
Carregando dados no DB
Primeira inserção de valores na tabela.
Tempo total decorrido: 74.22877144813538 s


In [10]:
#comprovação da inserção de dados no database
print_table_db('locations')

0 ('-30.06355608', '-51.2331063', None, None, 'Praia de Belas', 'Porto Alegre', '90850-110', 'RS', 'Brasil')
1 ('-30.04208508', '-51.25745683', 'Rua Ilha Mauá', None, 'Arquipélago', 'Porto Alegre', '90010-120', 'RS', 'Brasil')
2 ('-30.03152967', '-51.20818753', 'Viaduto Engenheiro Ildo Meneghetti', None, 'Bom Fim', 'Porto Alegre', '90420-111', 'RS', 'Brasil')
3 ('-30.01863604', '-51.19509983', 'Avenida Nova York', '412', 'Auxiliadora', 'Porto Alegre', '90550-070', 'RS', 'Brasil')
4 ('-30.07117501', '-51.24203618', None, None, 'Praia de Belas', 'Porto Alegre', '90810-180', 'RS', 'Brasil')
5 ('-30.01766564', '-51.26309605', 'Rua Mexiana', None, 'Arquipélago', 'Porto Alegre', '90090-040', 'RS', 'Brasil')
6 ('-30.03953495', '-51.24244836', None, None, 'Praia de Belas', 'Porto Alegre', '90050-190', 'RS', 'Brasil')
7 ('-30.04776623', '-51.19082785', 'Rua Artigas', None, 'Petrópolis', 'Porto Alegre', '90670-160', 'RS', 'Brasil')
8 ('-30.05026633', '-51.23414341', None, None, 'Praia de Belas',