In [1]:
import os
from pyspark.sql import SparkSession
from generate_df import table_to_df
from IPython.display import display, HTML
from tabulate import tabulate
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from unidecode import unidecode

In [2]:
# Definir o caminho do Spark corretamente
os.environ['SPARK_HOME'] = '/home/daiane/spark-3.5.1-bin-hadoop3/'

# Definir o caminho do Java corretamente
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.17.0-openjdk-amd64/'

# Iniciar uma sessão Spark
spark = SparkSession.builder \
    .appName("Exemplo Spark") \
    .getOrCreate()

# Testar a sessão Spark
spark

24/06/19 17:37:50 WARN Utils: Your hostname, victor-Lenovo-ideapad-330-15IKB resolves to a loopback address: 127.0.1.1; using 10.0.0.126 instead (on interface wlp2s0)
24/06/19 17:37:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/19 17:37:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Concatenado tabelas e criando df com todos os enderecos 


In [3]:
df_agencias = table_to_df('finance_raw_data', 'agencias', spark)
df_bancos = table_to_df('finance_raw_data', 'bancos', spark)
df_cooperativas_credito = table_to_df('finance_raw_data', 'cooperativas_credito', spark)
df_sociedades = table_to_df('finance_raw_data', 'sociedades', spark)
df_address_adm_consorcio = table_to_df('finance_raw_data', 'administradoras_consorcio', spark)

#colunas para a dim_endereco: id, cnpj, address_type, registration_date, date_end, street,
#complement, number,  neighborhood, city, postalcode, state

colunas = ["cnpj", "data", "endereco","complemento", "bairro",  "municipio","cep", "uf"]


df_address_adm_consorcio_selected = df_address_adm_consorcio.select(colunas).withColumn("address_type", F.lit("SEDE"))
df_bancos_selected = df_bancos.select(colunas).withColumn("address_type", F.lit("SEDE"))
df_cooperativas_credito_selected = df_cooperativas_credito.select(colunas).withColumn("address_type", F.lit("SEDE"))
df_sociedades_selected = df_sociedades.select(colunas).withColumn("address_type", F.lit("SEDE"))
df_agencias_selected = df_agencias.select(colunas).withColumn("address_type", F.lit("AGENCIA"))

dataframes_selected = [df_address_adm_consorcio_selected, df_bancos_selected, df_cooperativas_credito_selected, df_sociedades_selected, df_agencias_selected]

dataframes_selected = [
    df.withColumn("number", F.lit(None).cast("integer"))
      .withColumn("date_end", F.lit(None).cast("date"))
    for df in dataframes_selected
]

def unionAll(dataframes):
    return reduce(DataFrame.unionAll, dataframes)

df_enderecos = unionAll(dataframes_selected)

#colunas_ordem = ["cnpj",  "address_type", "data", "date_end", "endereco","complemento", "number", "bairro",  "municipio","cep", "uf"]

#df_enderecos = df_enderecos.select(colunas_ordem)

df_enderecos.persist()

# df_enderecos.count()



DataFrame[cnpj: string, data: string, endereco: string, complemento: string, bairro: string, municipio: string, cep: string, uf: string, address_type: string, number: int, date_end: date]

### Tratamento do logradouro e número

#### Padronização dos tipos de logradouros mais comuns


In [4]:
tipos_logradouros = ["AREA", "ACESSO", "ACAMPAMENTO", "AEROPORTO", "ALAMEDA", "AVENIDA", "BLOCO",
                     "CANAL", "CONDOMINIO", "DISTRITO", "ESTRADA", "RUA", "VIA", "TRAVESSA"]

#amostra_df = df_enderecos.sample(False, 0.5)

#df = amostra_df.filter(df_enderecos["endereco"].contains("PC. "))

df_tipo_corrigido = df_enderecos.withColumn(
    "endereco",
    F.when(
        F.col("endereco").rlike(r"^R\. ") | F.col("endereco").rlike(r"^R "),
        F.regexp_replace(F.col("endereco"), r"^R\.? ", "RUA ")
    ).when(
        F.col("endereco").rlike(r"^AV\. ") | F.col("endereco").rlike(r"^AV "),
        F.regexp_replace(F.col("endereco"), r"^AV\.? ", "AVENIDA ")
    ).when(
        F.col("endereco").rlike(r"^TV\. ") | F.col("endereco").rlike(r"^TV "),
        F.regexp_replace(F.col("endereco"), r"^TV\.? ", "TRAVESSA ")
    ).when(
        F.col("endereco").rlike(r"^PC "),
        F.regexp_replace(F.col("endereco"), r"^PC ", "PRACA ")
    ).otherwise(F.col("endereco"))
)

df_tipo_corrigido.persist()


# table = tabulate(df_tipo_corrigido.collect(), headers=df_enderecos_corrigido.columns, tablefmt='html')

 #table = tabulate(amostra_df.collect(), headers=amostra_df.columns, tablefmt='html')


# display(HTML(table))

DataFrame[cnpj: string, data: string, endereco: string, complemento: string, bairro: string, municipio: string, cep: string, uf: string, address_type: string, number: int, date_end: date]

#### Tratamento do número

Identificando o número na coluna "endereco" e copiando-o pra coluna "number"


In [5]:
# amostra_df = df_tipo_corrigido.sample(False, 0.1)

regex = r"(\d+(?:\.\d+)?)"

tipos_logradouros = ["AREA", "ACESSO", "ACAMPAMENTO", "AEROPORTO", "ALAMEDA", "AVENIDA", "BLOCO",
                     "CANAL", "CONDOMINIO", "DISTRITO", "ESTRADA", "RUA", "VIA", "TRAVESSA"]


df_numero = df_tipo_corrigido.withColumn("number",
    F.when(
        (F.col("endereco").like("%BR %")) |
        (F.col("endereco").like("%BR/%")) |
        (F.col("endereco").like("%RODOVIA%")) |
        (F.col("endereco").rlike(r"^\b(" + "|".join(tipos_logradouros) + r")\b \d+")) |
        (F.col("endereco").rlike(r"^\b(" + "|".join(tipos_logradouros) + r")\b \d+[A-Z]*$")) |
        ((F.col("endereco").like("%QUADRA%")) & (~F.col("endereco").like("%LOTE%"))) |
        (F.col("endereco").rlike(r'\d+-\d+')),
        ""
    ).otherwise(
        F.when(
            (F.col("endereco").like("%QUADRA%")) & (F.col("endereco").like("%LOTE%")),
            F.regexp_extract(F.col("endereco"), r".LOTE (\d+)", 1)
        ).when(
            (F.col("endereco").like("%N %")) | (F.col("endereco").like("%N.")) | (F.col("endereco").like("%Nº")),
            F.regexp_extract(F.col("endereco"), r"N[ .º]?(\d+)", 1)
        ).otherwise(
            F.regexp_extract(F.col("endereco"), regex, 0)
        )
    )
).withColumn("endereco",
    F.when(
        (F.col("number") != "") &
        ~(F.col("endereco").rlike(r'\d+-\d+')),  
        F.regexp_replace(F.col("endereco"), F.col("number"), "EXTRAIRAPARTIRDAQUI")
    ).otherwise(F.col("endereco")))



df_numero.persist()


# df_numero_tratado_ = df_numero_tratado.filter(df_enderecos["endereco"].contains("EXTRAIRAPARTIRDAQUI-"))

# rows = df_numero_tratado.collect()

# columns = df_numero_tratado.columns

# table = tabulate(rows, headers=columns, tablefmt='html')

# display(HTML(table))


non_empty_count = df_numero.filter((F.col("number").isNotNull()) & (F.col("number") != "")).count()

empty_count = df_numero.filter((F.col("number").isNull()) | (F.col("number") == "")).count()

total_count = df_numero.count()

non_empty_percentage = (non_empty_count / total_count) * 100
empty_percentage = (empty_count / total_count) * 100


print(f"Registros não vazios: {non_empty_count} ({non_empty_percentage:.2f}%)")
print(f"Registros vazios: {empty_count} ({empty_percentage:.2f}%)")

24/06/19 17:38:57 WARN TaskSetManager: Stage 0 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.
24/06/19 17:39:00 WARN TaskSetManager: Stage 1 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.
24/06/19 17:39:02 WARN TaskSetManager: Stage 2 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.
24/06/19 17:39:04 WARN TaskSetManager: Stage 3 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.
24/06/19 17:39:05 WARN TaskSetManager: Stage 6 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.
24/06/19 17:39:06 WARN TaskSetManager: Stage 9 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.


Registros não vazios: 46521 (68.00%)
Registros vazios: 21893 (32.00%)


- Adicionando tudo que vem após o número em uma coluna temporária onde serão removidos os caracteres especias e padronizados para concatenar com a coluna complemento
- excluindo o texto "EXTRAIRAPARTIRDAQUI" usado pra identificar o número no endereço
- limpando caracteres especiais do final da string do campo endereco

In [6]:
pattern = "EXTRAIRAPARTIRDAQUI(.*)"

df_pos_numero = df_numero.withColumn("apos_numero", F.regexp_extract("endereco", pattern, 1))


df_pos_numero_tratado = df_pos_numero.withColumn("apos_numero", F.regexp_replace(F.col("apos_numero"), r'^[^a-zA-Z0-9]+', ''))

# Adicionar " - " ao final se a string não ficar vazia após a limpeza
df_pos_numero_tratado = df_pos_numero_tratado.withColumn(
    "apos_numero",
    F.when(F.col("apos_numero") != '', F.concat(F.col("apos_numero"), F.lit(' - '))).otherwise('')
)

#concatenando a coluna temporario tratada com o complemento
df_pos_numero_tratado = df_pos_numero_tratado.withColumn("complemento",
                                                       F.concat(F.col("apos_numero"), F.col("complemento")))

df_endereco_sem_numero = df_pos_numero_tratado.withColumn("endereco",
                                                      F.regexp_replace(F.col("endereco"), pattern, ""))    

caractere_especial = r'[^\w\s]$'

df_endereco_sem_especial = df_endereco_sem_numero.withColumn("endereco", 
                        F.regexp_replace(F.trim(F.col("endereco")), caractere_especial, ""))                                                                       
                                                                                   
df_pos_numero_tratado = df_endereco_sem_especial.drop("apos_numero")

df_pos_numero_tratado.persist()

df_pos_numero_tratado.createOrReplaceTempView("view_temporariaa")

df = spark.sql("SELECT endereco, number, complemento, data FROM view_temporariaa").limit(50)

# df = spark.sql(" SELECT apos_numero, COUNT(*) as freq from view_temporariaa group by apos_numero order by freq desc ").show(50)

# df = spark.sql("select * from view_temporariaa ")

table = tabulate(df.collect(), headers=df.columns, tablefmt='html')

# display(HTML(table))

# df_pos_numero_tratado.printSchema()



24/06/19 17:39:12 WARN TaskSetManager: Stage 12 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

#### Removendo acentos


In [32]:
!pip install unidecode

Defaulting to user installation because normal site-packages is not writeable
Collecting unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 KB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m:01[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.8


In [8]:
def clean_accent_(texto):
    return unidecode(texto) if texto else None

clean_accent = F.udf(clean_accent_, F.StringType())

df_sem_acento = df_pos_numero_tratado.withColumn("endereco", F.upper(clean_accent(F.col("endereco"))))\
                                     .withColumn("complemento", F.upper(clean_accent(F.col("complemento"))))\
                                     .withColumn("uf", F.upper(clean_accent(F.col("uf"))))\
                                    .withColumn("municipio", F.upper(clean_accent(F.col("municipio"))))

# df_sem_acento.show(20, truncate=False)

### Duplicação

In [30]:
all_columns = df_sem_acento.columns

colunas_ = list(filter(lambda col: col != "data", all_columns))

df_unique = df_sem_acento.dropDuplicates(subset=colunas_)

df_grouped = df_unique.groupBy(colunas_).count()

df_duplicates = df_grouped.filter(F.col("count") > 1)


if df_duplicates.count() > 0:
    print("Há linhas duplicadas no DataFrame.")
    print(df_duplicates.count() )
else:
    print("Não há linhas duplicadas no DataFrame.")


24/06/19 18:31:09 WARN TaskSetManager: Stage 14 contains a task of very large size (1496 KiB). The maximum recommended task size is 1000 KiB.

Não há linhas duplicadas no DataFrame.


                                                                                

### Fomatação da data

In [23]:
df_data_formatada = df_unique.withColumn("data", F.to_date(F.col("data")))

df_data_formatada_sample = df_data_formatada.sample(False, 0.1)

# table = tabulate(df_data_formatada_sample.collect(), headers=df_data_formatada_sample.columns, tablefmt='html')

# display(HTML(table))



#### Renomear colunas

In [26]:
# Renomear as colunas 'id' para 'codigo' e 'nome' para 'nome_completo'

df_final = df_data_formatada.select(df_data_formatada["data"].alias("registration_date"),
                                    df_data_formatada["endereco"].alias("street"),
                                     df_data_formatada["complemento"].alias("complement"),
                                    df_data_formatada["bairro"].alias("neighborhood"),
                                     df_data_formatada["municipio"].alias("city"),
                                    df_data_formatada["cep"].alias("postalcode"),
                                     df_data_formatada["uf"].alias("state"),
                                   df_data_formatada["cnpj"],
                                   df_data_formatada["address_type"],
                                   df_data_formatada["number"],
                                   df_data_formatada["date_end"])
# df_final.printSchema()

In [24]:
# jdbc_properties = {
#     "user": "seu_usuario",
#     "password": "sua_senha",
#     "driver": "org.postgresql.Driver"
# }

# # Escreve o DataFrame no PostgreSQL
# df.write.jdbc(url=jdbc_url, table="nome_da_tabela", mode="overwrite", properties=jdbc_properties)



root
 |-- cnpj: string (nullable = true)
 |-- data: date (nullable = true)
 |-- endereco: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- address_type: string (nullable = false)
 |-- number: string (nullable = true)
 |-- date_end: date (nullable = true)

