In [None]:
import os
import numpy as np
import pandas as pd
from pyspark.dbutils import DBUtils
from pyspark.sql.functions import lower, col
from pyspark.sql import functions as F
from pyspark.sql.functions import max
from pyspark.sql.functions import col, concat, lower, regexp_replace

In [None]:
# Caminho para o arquivo CSV
pasta = "List_cities_IBGE.csv"

# Leia o arquivo CSV como um DataFrame do Spark com separador ";" e encoding apropriado
df_ibge = spark.read.option("delimiter", ";") \
                          .option("encoding", "ISO-8859-1") \
                          .csv(pasta, header=True, inferSchema=True)

In [None]:
# Ajustando nomes de cidades
df_ibge = df_ibge.withColumn('cidade', F.lower('MUNICIPIO_IBGE'))
# Trocando sinais do texto
dict_replace_char = {'à':'a','á':'a','ã':'a','â':'a',
                     'è':'e','é':'e','ê':'e',
                     'ì':'i','í':'i',
                     'ò':'o','ó':'o','õ':'o','ô':'o',
                     'ù':'u','ú':'u','ç':'c'}
for key in dict_replace_char:
     df_ibge = df_ibge.withColumn('cidade',F.regexp_replace('cidade',key, dict_replace_char[key]))

# Adicione uma nova coluna concatenando as duas colunas existentes
df_ibge = df_ibge.withColumn("chave", 
                                    concat(col("UF"), col("cidade")))

# Deletando a coluna 'salary'
df_ibge = df_ibge.drop('MUNICIPIO_IBGE',"cidade","UF")

In [None]:
# Caminho para o arquivo CSV
pasta = "List_cities.csv"

# Leia o arquivo CSV como um DataFrame do Spark com separador ";" e encoding apropriado
df_listaCidades = spark.read.option("delimiter", ";") \
                          .option("encoding", "ISO-8859-1") \
                          .csv(pasta, header=True, inferSchema=True)
# Ajustando nomes de cidades
df_listaCidades = df_listaCidades.withColumn('cidade', F.lower('cidade'))
# Trocando sinais do texto
dict_replace_char = {'à':'a','á':'a','ã':'a','â':'a',
                     'è':'e','é':'e','ê':'e',
                     'ì':'i','í':'i',
                     'ò':'o','ó':'o','õ':'o','ô':'o',
                     'ù':'u','ú':'u','ç':'c'}
for key in dict_replace_char:
     df_listaCidades = df_listaCidades.withColumn('cidade',F.regexp_replace('cidade',key, dict_replace_char[key]))

# Adicione uma nova coluna concatenando as duas colunas existentes
df_listaCidades = df_listaCidades.withColumn("chave", 
                                    concat(col("UF"), col("cidade")))

In [None]:
# Realizando o merge/junção dos DataFrames baseado na coluna 'id'
df_cidades_hypera = df_listaCidades.join(df_ibge, on='chave', how='inner')
df_cidades_hypera = df_cidades_hypera.drop("chave")

novos_nomes = {col_name: col_name.strip() for col_name in df_cidades_hypera.columns}
df_cidades_hypera = df_cidades_hypera.select([col(col_name).alias(novos_nomes[col_name]) for col_name in df_cidades_hypera.columns])

# Renomear as colunas
df_cidades_hypera = df_cidades_hypera.withColumnRenamed("uf", "UF") \
                 .withColumnRenamed("CODIGO_MUNICIPIO_IBGE", "codigo")

# Reorganizar a ordem das colunas
df_cidades_hypera = df_cidades_hypera.select("cidade", "UF", "codigo","id_cidade")

In [None]:
# Caminho para o arquivo CSV
pasta = "City_Coordinates.txt"

# Leia o arquivo CSV como um DataFrame do Spark com separador ";" e encoding apropriado
coordendas_cidades = spark.read.option("delimiter", ";") \
                          .option("encoding", "ISO-8859-1") \
                          .csv(pasta, header=True, inferSchema=True)


# Renomear a coluna 'name' para 'nome_completo' e a coluna 'age' para 'idade'
coordendas_cidades = coordendas_cidades \
    .withColumnRenamed("GEOCODIGO_MUNICIPIO", "codigo")

# Obter os nomes das colunas e converter para minúsculas
novos_nomes_colunas = [coluna.lower() for coluna in coordendas_cidades.columns]

# Renomear as colunas para minúsculas
for coluna_original, coluna_nova in zip(coordendas_cidades.columns, novos_nomes_colunas):
    coordendas_cidades = coordendas_cidades.withColumnRenamed(coluna_original, coluna_nova)

coordendas_cidades.display()

codigo,nome_municipio,longitude,latitude
1100015,ALTA FLORESTA D'OESTE,-61.9998238962936,-11.9355403047646
1100023,ARIQUEMES,-63.0332692780484,-9.9084628665672
1100031,CABIXI,-60.5443135812009,-13.4997634596963
1100049,CACOAL,-61.4429442118224,-11.4338650286852
1100056,CEREJEIRAS,-60.8184261646815,-13.1950330320399
1100064,COLORADO DO OESTE,-60.5550674630789,-13.1305638414553
1100072,CORUMBIARA,-60.9487011706614,-12.9975202364834
1100080,COSTA MARQUES,-64.2316539391172,-12.4360138777142
1100098,ESPIGÃO D'OESTE,-61.0201731132847,-11.5285546333473
1100106,GUAJARÃ-MIRIM,-65.3239518196923,-10.7738837406705


In [None]:
# Realizando o merge/junção dos DataFrames baseado na coluna 'id'
df_cidades_final = df_cidades_hypera.join(coordendas_cidades, on='codigo', how='inner')
df_cidades_final = df_cidades_final.drop("nome_municipio")

In [None]:
table_name = "your_database_databrciks"
write_mode = "overwrite"

df_raw = df_cidades_final


num_data = df_raw.count()
if num_data >= 1:
    
    df_final = spark.table(table_name)
    types_delta = dict(df_final.dtypes)
    types_raw = dict(df_raw.dtypes)

    for column in types_delta:
        if types_delta[column] != types_raw[column]:
            df_raw = df_raw.withColumn(column, F.col(column).cast(types_delta[column]))

    df_raw.write.mode(write_mode).saveAsTable(table_name, format='delta', mode=write_mode)
    print(f"Adicioados {df_raw.count()} dados novos")

else:
    print("Não foram capturados novos dados, nenhum dados escrito na base")

Adicioados 100 dados novos
