## Etapa 2
### Criar um novo banco de dados na camada Silver, tratar e limpar os dados

In [0]:
# Import libraries

import requests
import pandas as pd
import re

from pyspark.sql.types import StringType
from pyspark.sql.functions import (regexp_extract, col, regexp_replace, lit, when, concat_ws, expr,
                                size, trim, split, lpad, lower, coalesce, count, row_number, first,
                                monotonically_increasing_id, concat)

from pyspark.sql.window import Window

import shutil




In [0]:
# Defining the layer silver database, where transformed data will be stored

# Create new database with custom DBFS location
spark.sql("""
CREATE DATABASE IF NOT EXISTS silver_imoveis
LOCATION 'dbfs:/mvp/database/silver_imoveis'
""")

Out[2]: DataFrame[]

In [0]:
# Display databases

display(spark.sql("SHOW DATABASES"))

databaseName
bronze_aux
bronze_imoveis
default
ouro_imoveis
silver_aux
silver_imoveis


In [0]:
# Define a function to add an ID for each listed property

def adicionar_id_imovel(df, prefixo: str, nome_coluna="ID_imovel"):
    """
    Adiciona uma coluna de ID único para imóveis com padrão:
    XXYYZZZZ → XX=cidade, YY=origem, ZZZZ=número sequencial
    
    Parâmetros:
    - df: DataFrame PySpark de entrada
    - prefixo: string de 4 dígitos (ex: "1001" para SP/VivaReal)
    - nome_coluna: nome da coluna de ID que será criada (padrão = "ID_imovel")
    
    Retorna:
    - DataFrame com nova coluna de ID
    """
    # window function to generate sequencial number
    window_spec = Window.orderBy(monotonically_increasing_id())

    df = df.withColumn(
        nome_coluna,
        concat(
            lit(prefixo),
            lpad(row_number().over(window_spec).cast("string"), 4, "0")
        )
    )
    
    return df


In [0]:
# Dic with numerical codes to ID_Imovel
prefixos_id = {
    "df_VR_RJ": "2101",  # VivaReal - Rio de Janeiro
    "df_OLX_RJ": "2102", # OLX - Rio de Janeiro
    "df_ZAP_RJ": "2103",  # Zap - Rio de Janeiro
    "df_VR_SP": "1101",  # VivaReal - São Paulo
    "df_OLX_SP": "1102", # OLX - São Paulo
    "df_ZAP_SP": "1103",  # Zap - São Paulo
}

####Limpeza e Tratamento dos Dados
Nesta parte do projeto, irei pegar cada tabela e tratar strings, tipagem dos dados, dados duplicados e faltantes, esquema de colunas, etc. No final espero ter todas a tabelas uniformizadas e prontas pra passar para uma próxima camada chamada "Gold", aonde poderei realizar agregações, análises e gráficos.

####Viva Real RJ

In [0]:
# Perform cleaning and transformations using pyspark

df_VR_RJ = spark.table("bronze_imoveis.vivareal_rio")

display(df_VR_RJ.printSchema)
display(df_VR_RJ.sample(.005))


<bound method DataFrame.printSchema of DataFrame[link: string, titulo: string, endereco: string, bairro: string, cep: bigint, preco: bigint, condominio: double, iptu: double, metragem: bigint, dormitorios: bigint, banheiros: bigint, vagas: bigint]>

link,titulo,endereco,bairro,cep,preco,condominio,iptu,metragem,dormitorios,banheiros,vagas
/imovel/apartamento-2-quartos-lagoa-zona-sul-rio-de-janeiro-96m2-aluguel-RS7500-id-2778964233/,Imóvel para aluguel tem 96 metros quadrados com 2 quartos em Lagoa - Rio de Janeiro - RJ,"Avenida Epitácio Pessoa, 4362",Lagoa,22471004,7500,900.0,66.0,96,2,1,0
/imovel/apartamento-2-quartos-botafogo-zona-sul-rio-de-janeiro-70m2-aluguel-RS4500-id-2776607722/,EXCELENTE 2 QUARTOS COM DEPENDÊNCIA COMPLETA EM BOTAFOGO,"Rua Rodrigo de Brito,",Botafogo,22280100,4500,721.0,1.0,70,2,2,0
/imovel/apartamento-3-quartos-ipanema-zona-sul-rio-de-janeiro-com-garagem-148m2-aluguel-RS18000-id-2752473132/,APARTAMENTO - IPANEMA - 03 QUARTOS - 02 VAGAS,"Rua Prudente de Morais, 790",Ipanema,22420041,18000,3989.0,990.0,148,3,2,2
/imovel/apartamento-2-quartos-lagoa-zona-sul-rio-de-janeiro-com-garagem-105m2-aluguel-RS9500-id-2773156320/,"Apartamento com vista Jockey e Cristo, Junto ao Piraquê, varandão, 2 qtos(1 suite) 2 vagas de garag","Avenida Lineu de Paula Machado, 1005",Lagoa,22470040,9500,2000.0,450.0,105,2,3,2
/imovel/apartamento-3-quartos-ipanema-zona-sul-rio-de-janeiro-82m2-aluguel-RS5800-id-2793796630/,"apartamento terreo 3 quartos copanema, 1 suite, boa cozinha/área, sem vaga, proximo praia e metrô","Rua Joaquim Nabuco,",Ipanema,22080060,5800,1634.0,532.0,82,3,2,0
/imovel/apartamento-2-quartos-botafogo-zona-sul-rio-de-janeiro-com-garagem-67m2-aluguel-RS5500-id-2786256816/,"Apartamento para aluguel, com 67m² no Botafogo!","Rua São Clemente,",Botafogo,22260004,5500,1348.0,285.0,67,2,2,1


In [0]:
# Cast datatypes and save in another silver table

df_VR_RJ = df_VR_RJ.select(
    col("link").cast("string"),
    col("titulo").cast("string"),
    col("endereco").cast("string"),
    col("bairro").cast("string"),
    col("cep").cast("string"),              # CEP as string, some ceps starts with digit zero
    col("preco").cast("double"),
    col("condominio").cast("double"),
    col("iptu").cast("double"),
    col("metragem").cast("int"),
    col("dormitorios").cast("int"),
    col("banheiros").cast("int"),
    col("vagas").cast("int")
)


In [0]:
# Apply Id function

df_VR_rio = adicionar_id_imovel(df_VR_RJ, 2101, nome_coluna="ID_imovel")


In [0]:
# Map columns to schema
column_mapping = {
    'Id_imovel': 'ID_imovel',
    'link': 'Link',
    'titulo':  'Titulo',
    'endereco': 'Endereco',
    'preco': 'Preco',
    'condominio': 'Condominio',
    'iptu': 'IPTU',
    'metragem': 'Area', 
    'dormitorios': 'Dormitorios',
    'banheiros': 'Banheiros',
    'vagas': 'Garagem',
}

# Renaming columns in the DataFrame using Pyspark
for old_name, new_name in column_mapping.items():
    df_VR_rio = df_VR_rio.withColumnRenamed(old_name, new_name)

# Reorder the columns
esquema= ['ID_imovel', 'Link', 'Titulo', 'Endereco', 'Bairro', 'Cep', 'Preco', 'Condominio', 'Iptu', 'Area', 'Dormitorios', 'Banheiros', 'Garagem']
df_VR_rio = df_VR_rio.select(esquema)

In [0]:
display(df_VR_rio.sample(0.005))

ID_imovel,Link,Titulo,Endereco,Bairro,Cep,Preco,Condominio,Iptu,Area,Dormitorios,Banheiros,Garagem
21010007,/imovel/apartamento-3-quartos-ipanema-zona-sul-rio-de-janeiro-com-garagem-149m2-venda-RS3800000-id-2744523679/,Apartamento - Padrão / Residencial / Ipanema,"Rua Nascimento Silva,",Ipanema,22421023,20000.0,5500.0,1500.0,149,3,5,3
21010033,/imovel/apartamento-3-quartos-lagoa-zona-sul-rio-de-janeiro-com-garagem-135m2-aluguel-RS5000-id-2793188924/,Apartamento com 3 Quartos e Varanda na Lagoa.,"Rua Professor Saldanha, 154",Lagoa,22461220,5000.0,2281.0,559.0,135,3,2,1
21010383,/imovel/apartamento-4-quartos-ipanema-zona-sul-rio-de-janeiro-com-garagem-140m2-aluguel-RS9500-id-2785304526/,Excelente apartamento 4 quartos para alugar em Ipanema - 2 vagas,"Rua Gomes Carneiro,",Ipanema,22071110,9500.0,2200.0,900.0,140,4,3,2
21010603,/imovel/apartamento-3-quartos-ipanema-zona-sul-rio-de-janeiro-com-garagem-130m2-venda-RS2000000-id-2788904614/,Apartamento - Padrão / Residencial / Ipanema,"Rua Almirante Saddock de Sá, 144",Ipanema,22411040,8500.0,2633.0,599.0,130,3,3,1
21010677,/imovel/apartamento-3-quartos-ipanema-zona-sul-rio-de-janeiro-com-garagem-119m2-aluguel-RS11500-id-2774539142/,"Ipanema/Lagoa - Apartamento de 3 quartos com suíte, Infra completa e 2 vagas","Avenida Epitácio Pessoa, 2356",Ipanema,22411072,11500.0,2710.0,915.0,119,3,3,2
21011185,/imovel/apartamento-2-quartos-ipanema-zona-sul-rio-de-janeiro-com-garagem-70m2-aluguel-RS4950-id-2762241220/,"Apartamento 2 quartos, dep completas, vaga, portaria 24 horas, bicicletário, Barão com Teixeira","Rua Barão da Torre, 82",Ipanema,22411000,4950.0,1226.0,230.0,70,2,1,1
21011336,/imovel/apartamento-4-quartos-copacabana-zona-sul-rio-de-janeiro-com-garagem-360m2-venda-RS6000000-id-2512022780/,Cobertura para aluguel e venda com 360 metros quadrados com 4 quartos,"Avenida Atlântica,",Copacabana,22070001,15000.0,4700.0,,360,4,4,1


In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.vivareal_rio"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/vivareal_rio", recurse=True) 

# Write table in delta format
df_VR_rio.write.format("delta").mode("overwrite").option('mergeSchema', 'true').saveAsTable("silver_imoveis.vivareal_rio")


####Viva Real SP

In [0]:
# Dataset viva real sp 

df_VR_SP = spark.table("bronze_imoveis.vivareal_sp")

display(df_VR_SP.printSchema)
display(df_VR_SP.sample(.005))


<bound method DataFrame.printSchema of DataFrame[link: string, preco: string, endereco: string, title: string, area: string, iptu: array<string>, condominio: array<string>, dormitorios: string, banheiros: string, garagem: string]>

link,preco,endereco,title,area,iptu,condominio,dormitorios,banheiros,garagem
https://www.vivareal.com.br/imovel/apartamento-1-quartos-pinheiros-sao-paulo-30m2-aluguel-RS4500-id-2561864857/,R$ 4.500,Rua dos Pinheiros,"Pinheiros, São Paulo",30 m²,List(Cond. R$ 500),List(Cond. R$ 500),1,1,
https://www.vivareal.com.br/imovel/apartamento-3-quartos-perdizes-sao-paulo-com-garagem-221m2-aluguel-RS8500-id-2615549432/,R$ 8.500,Rua Monte Alegre,"Perdizes, São Paulo",221 m²,"List(Cond. R$ 4.000 , IPTU R$ 1.294)","List(Cond. R$ 4.000 , IPTU R$ 1.294)",3,7,2.0
https://www.vivareal.com.br/imovel/apartamento-1-quartos-barra-funda-zona-oeste-sao-paulo-28m2-aluguel-RS1800-id-2753549503/,R$ 1.800,Avenida Rudge,"Bom Retiro, São Paulo",28 m²,List(Cond. R$ 450),List(Cond. R$ 450),1,1,
https://www.vivareal.com.br/imovel/apartamento-2-quartos-perdizes-sao-paulo-com-garagem-91m2-aluguel-RS5199-id-2790774192/,R$ 5.199,Rua Monte Alegre,"Perdizes, São Paulo",91 m²,"List(Cond. R$ 1.850 , IPTU R$ 240)","List(Cond. R$ 1.850 , IPTU R$ 240)",2,2,1.0
https://www.vivareal.com.br/imovel/apartamento-1-quartos-jardim-paulista-sao-paulo-com-garagem-86m2-venda-RS3001600-id-2714056126/,R$ 16.294,Rua José Maria Lisboa,"Jardim Paulista, São Paulo",86 m²,"List(Cond. R$ 3.888 , IPTU R$ 341)","List(Cond. R$ 3.888 , IPTU R$ 341)",1,2,1.0


In [0]:
# Perform cleaning and transformations in dataframe viva real SP using Pyspark

# Column preco
df_VR_SP = df_VR_SP.withColumn('preco', regexp_extract(col('preco'), r'(\d[\d.]*)', 1))
df_VR_SP = df_VR_SP.withColumn('preco', regexp_replace(col('preco'), r'\.', '').cast('double'))

# Column preco_condominio
df_VR_SP = df_VR_SP.withColumn(
    "condominio",
    when((col("condominio").isNotNull()) & (size(col("condominio")) > 1), col("condominio")[0]).otherwise(None)) # extract 1 part of a list

df_VR_SP = df_VR_SP.withColumn(
    "condominio",
    regexp_extract(col("condominio"), r"(\d[\d\.]*)", 1)) # extract numerical part

df_VR_SP = df_VR_SP.withColumn(
    "condominio",
    regexp_replace(col("condominio"), r"\.", "").cast("double")) # replace separator and dtype) # replace separator and dtype

# Column iptu
df_VR_SP = df_VR_SP.withColumn(
    "iptu",
    when((col("iptu").isNotNull()) & (size(col("iptu")) > 1), col("iptu")[1]).otherwise(None)) # extract 2 part of a list

df_VR_SP = df_VR_SP.withColumn(
    "iptu",
    regexp_extract(col("iptu"), r"(\d[\d\.]*)", 1)) # extract numerical part

df_VR_SP = df_VR_SP.withColumn(
    "iptu",
    regexp_replace(col("iptu"), r"\.", "").cast("double")) # replace separator and dtype) # replace separator and dtype

# Column area
df_VR_SP = df_VR_SP.withColumn('area', regexp_extract(col('area'), r'(\d[\d]*)', 1))
df_VR_SP = df_VR_SP.withColumn('area', col('area').cast('int'))

# Column dormitorios, banheiros e garagem => change only dtypes
# There are some values in the columns with the following format "1-2", create a function to extract only first element 
# Apply function and transform dtypes
cols_to_transform = ['dormitorios', 'banheiros', 'garagem']

for col_name in cols_to_transform:
    df_VR_SP = df_VR_SP.withColumn(col_name, regexp_extract(col(col_name), r"(\d+)", 1).cast("int"))


#####Enrichment with cep info

In [0]:
# Open cep_sp dataset from silver table

df_cep_sp = spark.table("silver_aux.cep_sp")


In [0]:
# 1. Def function normalize: lower column names and remove blank spaces
def normalizar_col(df, col_name):
    return df.withColumn(col_name, trim(lower(col(col_name))))

# Generate bairro column using title string
df_VR_SP = df_VR_SP.withColumn("bairro", regexp_extract(col("title"), r"^([^,]+),", 1))

# Apply function
for col_name in ["endereco", "bairro"]:
    df_VR_SP = normalizar_col(df_VR_SP, col_name)

for col_name in ["Logradouro", "Bairro"]:
    df_cep_sp = normalizar_col(df_cep_sp, col_name)


In [0]:
# 2. Rename endereco column in database
df_VR_SP = df_VR_SP.withColumnRenamed("endereco", "Logradouro")
df_VR_SP = df_VR_SP.withColumnRenamed("bairro", "Bairro")

# 3. Convert CEP to string format (may contain 0 in thge beggining, not numerical format)
df_cep_sp = df_cep_sp.withColumn("Cep", col("Cep").cast(StringType()))

In [0]:
# 4. Deduplicate CEP database (choose only one CEP for each logradouro + bairro)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("Logradouro", "Bairro").orderBy("Cep")
df_cep_dedup = df_cep_sp.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("rn")


In [0]:
# 5. Main join using Logradouro + Bairro

df_join = df_VR_SP.join(
    df_cep_dedup.select("Logradouro", "Bairro", "Cep"),
    on=["Logradouro", "Bairro"],
    how="left"
).select(df_VR_SP["*"], df_cep_dedup["Cep"])  # avoid errors in column names


In [0]:
# 6. Filter not assigned CEP rows

df_sem_cep = df_join.filter(col("Cep").isNull())

display(df_join.filter(col("Cep").isNull()).count())

106

In [0]:
# 7. Deduplicate cep data using only bairro to perform second fallback join
df_cep_bairro = df_cep_dedup.dropDuplicates(["Bairro"]).select("Bairro", "Cep")

# Rename column cep to avoid errors
df_cep_bairro = df_cep_bairro.withColumnRenamed("Cep", "Cep_fallback")


In [0]:
# 8. Fallback join using Bairro
df_fallback = df_sem_cep.join(
    df_cep_bairro,
    on="Bairro",
    how="left")
    

In [0]:
# Replace null values in column cep with cep fallback, then remove cep_fallback column
# If column cep value equals to Null => replace with Cep_fallback, else keep original value

df_fallback = df_fallback.withColumn("Cep", coalesce("Cep", "Cep_fallback")).drop("Cep_fallback")


In [0]:
# 9. Replace null CEP with fallback

df_com_cep = df_join.filter(col("Cep").isNotNull())

df_apos_fallback = df_fallback.filter(col("Cep").isNotNull())


In [0]:
# 10. Perform final union (mkeep same schema/columns)

df_final = df_com_cep.unionByName(df_apos_fallback)


In [0]:

# 11. Check final results 
print("Total de imóveis:", df_final.count())
print("Imóveis sem CEP:", df_final.filter(col("Cep").isNull()).count())

# Count ok!

Total de imóveis: 1333
Imóveis sem CEP: 0


#####Save final dataset to table

In [0]:
# Apply Id function

df_final = adicionar_id_imovel(df_final, 1101, nome_coluna="ID_imovel")

In [0]:
# Map columns to schema
column_mapping = {
    'ID_imovel': 'ID_imovel',
    'link': 'Link',
    'title':  'Titulo',
    'Bairro': 'Bairro',
    'Cep': 'Cep',
    'Logradouro': 'Endereco',
    'preco': 'Preco',
    'condominio': 'Condominio',
    'iptu': 'IPTU',
    'area': 'Area', 
    'dormitorios': 'Dormitorios',
    'banheiros': 'Banheiros',
    'garagem': 'Garagem',
}

# Renaming columns in the DataFrame using Pyspark
for old_name, new_name in column_mapping.items():
    df_final = df_final.withColumnRenamed(old_name, new_name)

# Reorder the columns
esquema= ['ID_imovel', 'Link', 'Titulo', 'Endereco', 'Bairro', 'Cep', 'Preco', 'Condominio', 'Iptu', 'Area', 'Dormitorios', 'Banheiros', 'Garagem']

df_final = df_final.select(esquema)

In [0]:
display(df_final.sample(.01))

ID_imovel,Link,Titulo,Endereco,Bairro,Cep,Preco,Condominio,Iptu,Area,Dormitorios,Banheiros,Garagem
11010168,https://www.vivareal.com.br/imovel/apartamento-1-quartos-agua-branca-sao-paulo-40m2-aluguel-RS2700-id-2694624380/,"Água Branca, São Paulo",rua carlos vicari,água branca,5033070,2700.0,200.0,100.0,40,1,1,
11010295,https://www.vivareal.com.br/imovel/apartamento-1-quartos-agua-branca-zona-oeste-sao-paulo-30m2-aluguel-RS2500-id-2698341789/,"Água Branca, São Paulo",rua guaicurus,água branca,5033000,2500.0,400.0,80.0,30,1,1,
11010314,https://www.vivareal.com.br/imovel/apartamento-3-quartos-sumarezinho-sao-paulo-com-garagem-110m2-aluguel-RS4800-id-2789516657/,"Sumarezinho, São Paulo",rua harmonia,sumarezinho,5435000,4800.0,2100.0,480.0,110,3,3,2.0
11010499,,"Jardim Europa, São Paulo",rua seridó,jardim europa,1455040,100000.0,6938.0,6354.0,412,3,5,5.0
11010772,https://www.vivareal.com.br/imovel/apartamento-1-quartos-pinheiros-zona-oeste-sao-paulo-37m2-aluguel-RS4099-id-2778442016/,"Pinheiros, São Paulo",rua capote valente,pinheiros,5409000,4099.0,749.0,151.0,37,1,1,
11011085,https://www.vivareal.com.br/imovel/apartamento-3-quartos-vila-leopoldina-sao-paulo-com-garagem-143m2-venda-RS2200000-id-2784244223/,"Vila Leopoldina, São Paulo",rua mergenthaler,vila leopoldina,5311030,12000.0,2200.0,1000.0,143,3,4,2.0
11011089,https://www.vivareal.com.br/imovel/apartamento-1-quartos-perdizes-zona-oeste-sao-paulo-com-garagem-41m2-aluguel-RS4400-id-2792084426/,"Perdizes, São Paulo",rua minerva,perdizes,5007030,4400.0,400.0,59.0,41,1,1,1.0
11011107,https://www.vivareal.com.br/imovel/apartamento-2-quartos-perdizes-sao-paulo-com-garagem-80m2-aluguel-RS7500-id-2790778319/,"Perdizes, São Paulo",rua monte alegre,perdizes,5014000,7500.0,2000.0,560.0,80,2,2,2.0
11011137,https://www.vivareal.com.br/imovel/apartamento-1-quartos-pinheiros-sao-paulo-com-garagem-40m2-aluguel-RS4855-id-2644263613/,"Pinheiros, São Paulo",rua padre carvalho,pinheiros,5427020,4855.0,1068.0,257.0,40,1,1,1.0


In [0]:
# Verify duplicates, if endereco e preco is the same, hence i can consider row as duplicated

df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1) \
    .orderBy(col("qtde").desc()) \
    .show(truncate=False)


+-----------------------------+-------+----+
|Endereco                     |Preco  |qtde|
+-----------------------------+-------+----+
|rua benedito branco de abreu |2100.0 |9   |
|rua minerva                  |4400.0 |7   |
|null                         |10000.0|7   |
|rua harmonia                 |3800.0 |5   |
|rua cândido fontoura         |1500.0 |5   |
|rua aimberê                  |8496.0 |5   |
|rua da consolação            |8100.0 |5   |
|rua ministro godói           |3000.0 |4   |
|rua alves guimarães          |4500.0 |4   |
|rua doutor augusto de miranda|1800.0 |4   |
|rua carlos weber             |8000.0 |4   |
|rua guaipá                   |3100.0 |4   |
|rua roma                     |3750.0 |4   |
|rua joão ramalho             |5496.0 |4   |
|rua doutor franco da rocha   |2000.0 |4   |
|rua benedito branco de abreu |1800.0 |4   |
|null                         |3000.0 |4   |
|rua mourato coelho           |6998.0 |3   |
|rua guaicurus                |2500.0 |3   |
|rua pais 

In [0]:
# Count duplicated rows
duplicates = df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1)

print("Total de combinações duplicadas:", duplicates.count())

Total de combinações duplicadas: 177


In [0]:
# Drop duplicates
df_final = df_final.dropDuplicates(["Endereco", "Preco"])

In [0]:
print(df_final.count()) # Finished with 1065 rows of data

1065


In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.vivareal_sp"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/vivareal_sp", recurse=True) 

# Write table in delta format
df_final.write.format("delta").mode("overwrite").option("mergeSchema", 'true').saveAsTable("silver_imoveis.vivareal_sp")


####OLX Rio

In [0]:
# Dataset OLX Rio 

df_OLX_RJ = spark.table("bronze_imoveis.olx_rio")

display(df_OLX_RJ.printSchema)
display(df_OLX_RJ.sample(.005))

<bound method DataFrame.printSchema of DataFrame[titulo: string, preco_aluguel: string, localizacao: string, bairro: string, area: string, iptu: string, preco_condominio: string, quartos: string, banheiros: string, garagem: string]>

titulo,preco_aluguel,localizacao,bairro,area,iptu,preco_condominio,quartos,banheiros,garagem
Cobertura - Triplex / Residencial / Ipanema,R$ 25.000,"Rio de Janeiro, Ipanema",Ipanema,500m²,R$ 2.850,R$ 9.500,5 ou mais,5 ou mais,4.0
"Apartamento para Locação em Rio De Janeiro, Botafogo, 4 dormitórios, 2 suítes, 3 banheiros",R$ 4.500,"Rio de Janeiro, Botafogo",Botafogo,200m²,R$ 436,R$ 2.050,4,3,0.0
357 m² Oportunidade. Amplo apartamento de frente com 2 vagas.,R$ 20.000,"Rio de Janeiro, Ipanema",Ipanema,357m²,R$ 18.590,R$ 2.800,4,4,2.0
"POSTO 6 DE COPACABANA, 3 QUARTOS , REFORMADO!",R$ 10.000,"Rio de Janeiro, Copacabana",Copacabana,76m²,R$ 336,R$ 1.500,3,3,
"Copacabana!!! 3 quartos com 3 vagas, próximo ao metrô e praia.",R$ 8.000,"Rio de Janeiro, Copacabana",Copacabana,212m²,R$ 937,R$ 3.198,3,4,3.0
,,,,,,,,,
"Apartamento venda ou locação Quartos em Botafogo, Rio de Janeiro",R$ 6.200,"Rio de Janeiro, Humaitá",Humaitá,105m²,R$ 5.272,R$ 1.100,3,3,2.0
Apartamento / Residencial / Leblon,R$ 11.000,"Rio de Janeiro, Leblon",Leblon,118m²,R$ 814,R$ 3.000,3,4,2.0
Casa Comercial,R$ 20.000,"Rio de Janeiro, Botafogo",Botafogo,457m²,R$ 47.870,R$ 0,5 ou mais,5 ou mais,3.0
EXCELENTE CASA NA LADEIRA DO ASCURRA - 04 QUARTOS - COSME VELHO,R$ 9.500,"Rio de Janeiro, Cosme Velho",Cosme Velho,254m²,R$ 324,R$ 0,4,3,1.0


In [0]:
# Count before drop
print(df_OLX_RJ.count())

4165


In [0]:
# Drop rows that has all null values 
df_OLX_RJ = df_OLX_RJ.dropna(subset=['titulo', 'localizacao'], how='all')

In [0]:
# Count after drop
print(df_OLX_RJ.count())

3654


In [0]:
# Cleaning and transforms using pyspark

# Column preco
df_OLX_RJ = df_OLX_RJ.withColumn('preco_aluguel', regexp_extract(col('preco_aluguel'), r'(\d[\d.]*)', 1))
df_OLX_RJ = df_OLX_RJ.withColumn('preco_aluguel', regexp_replace(col('preco_aluguel'), r'\.', '').cast('double'))

# Column preco_condominio
df_OLX_RJ = df_OLX_RJ.withColumn('preco_condominio', regexp_extract(col('preco_condominio'), r'(\d[\d.]*)', 1))
df_OLX_RJ = df_OLX_RJ.withColumn('preco_condominio', regexp_replace(col('preco_condominio'), r'\.', '').cast('double'))

# Column iptu
df_OLX_RJ = df_OLX_RJ.withColumn('iptu', regexp_extract(col('iptu'), r'(\d[\d.]*)', 1))
df_OLX_RJ = df_OLX_RJ.withColumn('iptu', regexp_replace(col('iptu'), r'\.', '').cast('double'))

# Column area
df_OLX_RJ = df_OLX_RJ.withColumn('area', regexp_extract(col('area'), r'(\d[\d]*)', 1))
df_OLX_RJ = df_OLX_RJ.withColumn('area', col('area').cast('int'))

# Column dormitorios, banheiros e garagem => change only dtypes
cols_to_transform = ['quartos', 'banheiros', 'garagem']

for col_name in cols_to_transform:
    df_OLX_RJ = df_OLX_RJ.withColumn(col_name, regexp_extract(col(col_name), r"(\d+)", 1).cast("int"))


In [0]:
# Create a new column called link to fit pre-defined esquema
df_OLX_RJ = df_OLX_RJ.withColumn('Link', lit(0).cast('string'))


#####Enrichment with cep info

In [0]:
# Open table cep_rio from silver layer
df_cep_rj = spark.table("silver_aux.cep_rio")

In [0]:
# 1. Defining a function to normalize strings: all column strings should be in lower case
def normalizar_col(df, col_name):
    return df.withColumn(col_name, trim(lower(col(col_name))))

# Apply function to normalize
for col_name in ["localizacao", "bairro"]:
    df_OLX_RJ = normalizar_col(df_OLX_RJ, col_name)

for col_name in ["Logradouro", "Bairro"]:
    df_cep_rj = normalizar_col(df_cep_rj, col_name)


In [0]:
# 2. Rename columns localizacao to Logradouro and bairro to Bairro
df_OLX_RJ = df_OLX_RJ.withColumnRenamed("localizacao", "Logradouro")
df_OLX_RJ = df_OLX_RJ.withColumnRenamed("bairro", "Bairro")

# 3. Convert cep to string 
df_cep_rj = df_cep_rj.withColumn("Cep", col("Cep").cast(StringType()))


In [0]:
# 4. Deduplicate CEP dataset (pick one cep for each logradouro)

# Count how many times each cep appears in each neighborhood
df_cep_freq = df_cep_rj.groupBy("Bairro", "Cep") \
    .agg(count("*").alias("freq"))

window_spec = Window.partitionBy( "Bairro").orderBy(col("freq").desc(), col("Cep"))
df_cep_dedup = df_cep_freq.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("freq", "rn")


In [0]:
# 5. Main join by Bairro

df_join = df_OLX_RJ.join(
    df_cep_dedup.select("Bairro", "Cep"),
    on=["Bairro"],
    how="left"
).select(df_OLX_RJ["*"], df_cep_dedup["Cep"])  # Avoid errors


In [0]:
# 6. Identify rows without CEP

df_sem_cep = df_join.filter(col("Cep").isNull())

print(df_join.filter(col("Cep").isNull()).count())
print(df_join.count())

0
3654


In [0]:
# 7. Verify final results

print("Total de imóveis:", df_join.count())
print("Imóveis sem CEP:", df_sem_cep.filter(col("Cep").isNull()).count())

# Count ok!

Total de imóveis: 3654
Imóveis sem CEP: 0


In [0]:
display(df_join.show(5))

+--------------------+-------------+--------------------+---------------+----+------+----------------+-------+---------+-------+----+--------+
|              titulo|preco_aluguel|          Logradouro|         Bairro|area|  iptu|preco_condominio|quartos|banheiros|garagem|Link|     Cep|
+--------------------+-------------+--------------------+---------------+----+------+----------------+-------+---------+-------+----+--------+
|IPANEMA - FLAT - ...|      19000.0|rio de janeiro, i...|        ipanema|  85|1033.0|          5800.0|      2|        2|      1|   0|22071110|
|Casa de rua - Tri...|      20000.0|rio de janeiro, j...|jardim botânico| 600|2337.0|             0.0|      4|        1|   null|   0|22261070|
|Apartamento para ...|      38000.0|rio de janeiro, i...|        ipanema| 350|4400.0|          9700.0|      3|        5|      3|   0|22071110|
|Cobertura com 4 d...|      20000.0|rio de janeiro, urca|           urca| 370|1370.0|          3430.0|      4|        5|      2|   0|22290177|

In [0]:
df_final = df_join

#####Save final dataset to table

In [0]:
# Apply Id function

df_final = adicionar_id_imovel(df_final, 2102, nome_coluna="ID_imovel")

In [0]:
 # Map columns to schema
column_mapping = {
    'ID_imovel': 'ID_imovel',
    'link': 'Link',
    'titulo':  'Titulo',
    'Logradouro': 'Endereco',
    'preco_aluguel': 'Preco',
    'Cep': 'Cep',
    'bairro': 'Bairro',
    'preco_condominio': 'Condominio',
    'iptu': 'IPTU',
    'area': 'Area', 
    'quartos': 'Dormitorios',
    'banheiros': 'Banheiros',
    'garagem': 'Garagem',
}

# Renaming columns in the DataFrame using Pyspark
for old_name, new_name in column_mapping.items():
    df_final = df_final.withColumnRenamed(old_name, new_name)

# Reorder the columns
esquema= ['ID_imovel', 'Link', 'Titulo', 'Endereco', 'Bairro', 'Cep', 'Preco', 'Condominio', 'Iptu', 'Area', 'Dormitorios', 'Banheiros', 'Garagem']
df_final = df_final.select(esquema)


In [0]:
df_final.show(5)



+---------+----+--------------------+--------------------+---------------+--------+-------+----------+------+----+-----------+---------+-------+
|ID_imovel|Link|              Titulo|            Endereco|         Bairro|     Cep|  Preco|Condominio|  Iptu|Area|Dormitorios|Banheiros|Garagem|
+---------+----+--------------------+--------------------+---------------+--------+-------+----------+------+----+-----------+---------+-------+
| 21020001|   0|IPANEMA - FLAT - ...|rio de janeiro, i...|        ipanema|22071110|19000.0|    5800.0|1033.0|  85|          2|        2|      1|
| 21020002|   0|Casa de rua - Tri...|rio de janeiro, j...|jardim botânico|22261070|20000.0|       0.0|2337.0| 600|          4|        1|   null|
| 21020003|   0|Apartamento para ...|rio de janeiro, i...|        ipanema|22071110|38000.0|    9700.0|4400.0| 350|          3|        5|      3|
| 21020004|   0|Cobertura com 4 d...|rio de janeiro, urca|           urca|22290177|20000.0|    3430.0|1370.0| 370|          4|    

In [0]:
# Verify duplicates, if endereco e preco are the same, hence i can consider duplicated row

df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1) \
    .orderBy(col("qtde").desc()) \
    .show(truncate=False)

+--------------------------+-------+----+
|Endereco                  |Preco  |qtde|
+--------------------------+-------+----+
|rio de janeiro, ipanema   |25000.0|91  |
|rio de janeiro, copacabana|15000.0|44  |
|rio de janeiro, ipanema   |15000.0|41  |
|rio de janeiro, ipanema   |22000.0|40  |
|rio de janeiro, ipanema   |30000.0|36  |
|rio de janeiro, copacabana|3500.0 |32  |
|rio de janeiro, copacabana|6000.0 |31  |
|rio de janeiro, ipanema   |21000.0|31  |
|rio de janeiro, copacabana|3000.0 |31  |
|rio de janeiro, ipanema   |26000.0|30  |
|rio de janeiro, copacabana|5000.0 |29  |
|rio de janeiro, copacabana|10000.0|28  |
|rio de janeiro, ipanema   |40000.0|27  |
|rio de janeiro, copacabana|4500.0 |27  |
|rio de janeiro, copacabana|23000.0|27  |
|rio de janeiro, leblon    |15000.0|26  |
|rio de janeiro, leblon    |35000.0|24  |
|rio de janeiro, leblon    |13000.0|24  |
|rio de janeiro, copacabana|12000.0|22  |
|rio de janeiro, ipanema   |35000.0|22  |
+--------------------------+------

In [0]:
# Count duplicates
duplicates = df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1)

print("Total de combinações duplicadas:", duplicates.count())

Total de combinações duplicadas: 500


In [0]:
# Drop duplicates
df_final = df_final.dropDuplicates(["Endereco", "Preco"])

print(df_final.count()) # Finished with 1101 rows of data

1101


In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.olx_rio"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/olx_rio", recurse=True) 

# Write delta table in silver layer
df_final.write.format("delta").mode("overwrite").saveAsTable("silver_imoveis.olx_rio")


####OLX SP

In [0]:
# Open dataset

df_OLX_SP = spark.table("bronze_imoveis.olx_sp")

display(df_OLX_SP.printSchema)
display(df_OLX_SP.sample(.005))

<bound method DataFrame.printSchema of DataFrame[titulo: string, preco_aluguel: string, localizacao: string, area: string, iptu: string, preco_condominio: string, quartos: string, banheiros: string, garagem: double]>

titulo,preco_aluguel,localizacao,area,iptu,preco_condominio,quartos,banheiros,garagem
Viva bem na Vila Madalena,R$ 25.000,"São Paulo, Vila Madalena",336m²,R$ 2.608,R$ 0,4.0,5 ou mais,
"Apartamento com 1 Quarto para alugar, 25m² - Lapa",R$ 2.870,"São Paulo, Vila Romana",25m²,R$ 35,R$ 645,1.0,1,
,,,,,,,,
CASA PARA LOCAÇÃO NA ÁGUA BRANCA PRÓXIMO DA ESTAÇÃO DE TREM ÁGUA BRANCA,R$ 1.700,"São Paulo, Água Branca",65m²,R$ 131,R$ 0,1.0,1,
Apartamento muito bem localizado na rua Oscar Freire ao lado do Hospital Clínicas e metrô,R$ 4.800,"São Paulo, Pinheiros",110m²,R$ 3.648,R$ 1.750,3.0,2,
"Apartamento para Locação em São Paulo, AGUA BRANCA, 2 dormitórios, 1 suíte, 2 banheiros, 1",R$ 5.500,"São Paulo, Água Branca",62m²,R$ 200,R$ 800,2.0,2,
Imóvel para aluguel tem 115 metros quadrados com 2 quartos em Sumaré - São Paulo - SP,R$ 18.000,"São Paulo, Sumaré",115m²,R$ 450,R$ 3.000,2.0,3,
"VN Capote Valente disponível para locação contendo 78m², 01 dormitório e 02 vagas de garag",R$ 12.838,"São Paulo, Pinheiros",78m²,R$ 412,R$ 1.750,1.0,2,
"APARTAMENTO RESIDENCIAL em SÃO PAULO - SP, VILA ROMANA",R$ 1.400,"São Paulo, Vila Romana",15m²,R$ 0,R$ 100,1.0,1,
Imóvel para aluguel com 1 quarto em Pinheiros - São Paulo - SP,R$ 3.600,"São Paulo, Pinheiros",28m²,R$ 100,R$ 600,1.0,1,


In [0]:
# Count before dropping
print(f"Antes de remover nulos: {df_OLX_SP.count()}")

Antes de remover nulos: 5662


In [0]:
# Drop rows that has all null values 
df_OLX_SP = df_OLX_SP.dropna(subset=['titulo', 'localizacao'], how='all')

In [0]:
# Count after dropping
print(f"Após remover nulos: {df_OLX_SP.count()}")

Após remover nulos: 4969


In [0]:
# Cleaning and transforms using pyspark

# Column preco
df_OLX_SP = df_OLX_SP.withColumn('preco_aluguel', regexp_extract(col('preco_aluguel'), r'(\d[\d.]*)', 1))
df_OLX_SP = df_OLX_SP.withColumn('preco_aluguel', regexp_replace(col('preco_aluguel'), r'\.', '').cast('float'))

# Column preco_condominio
df_OLX_SP = df_OLX_SP.withColumn('preco_condominio', regexp_extract(col('preco_condominio'), r'(\d[\d.]*)', 1))
df_OLX_SP = df_OLX_SP.withColumn('preco_condominio', regexp_replace(col('preco_condominio'), r'\.', '').cast('float'))

# Column iptu
df_OLX_SP = df_OLX_SP.withColumn('iptu', regexp_extract(col('iptu'), r'(\d[\d.]*)', 1))
df_OLX_SP = df_OLX_SP.withColumn('iptu', regexp_replace(col('iptu'), r'\.', '').cast('float'))

# Column area
df_OLX_SP = df_OLX_SP.withColumn('area', regexp_extract(col('area'), r'(\d[\d]*)', 1))
df_OLX_SP= df_OLX_SP.withColumn('area', col('area').cast('int'))

# Column dormitorios, banheiros e garagem => change only dtypes and handle Null values
df_OLX_SP = df_OLX_SP.withColumn('quartos', regexp_extract(col('quartos'), r'(\d[\d]*)', 1))
df_OLX_SP= df_OLX_SP.withColumn('quartos', col('quartos').cast('int'))

df_OLX_SP = df_OLX_SP.withColumn('banheiros', regexp_extract(col('banheiros'), r'(\d[\d]*)', 1))
df_OLX_SP = df_OLX_SP.withColumn('banheiros', col('banheiros').cast('int'))

df_OLX_SP = df_OLX_SP.withColumn('garagem', regexp_extract(col('garagem'), r'(\d[\d]*)', 1))
df_OLX_SP = df_OLX_SP.withColumn('garagem', col('garagem').cast('int'))


In [0]:
# Create a column named Bairro using second part of localizacao column string

df_OLX_SP = df_OLX_SP.withColumn('Bairro', trim(split(col('localizacao'), ",")[1]))


In [0]:
# Create a new column called link to fit pre-defined esquema
df_OLX_SP = df_OLX_SP.withColumn('Link', lit(0))


#####Enrichment with cep info

In [0]:
# Open table cep_sp from silver layer
df_cep_sp = spark.table("silver_aux.cep_sp")

In [0]:
# 1. Defining a function to normalize strings: all column strings should be in lower case
def normalizar_col(df, col_name):
    return df.withColumn(col_name, trim(lower(col(col_name))))

# Apply function to normalize
for col_name in ["localizacao", "bairro"]:
    df_OLX_SP = normalizar_col(df_OLX_SP, col_name)

for col_name in ["Logradouro", "Bairro"]:
    df_cep_sp = normalizar_col(df_cep_sp, col_name)


In [0]:
# 2. Rename columns localizacao to Logradouro and bairro to Bairro
df_OLX_SP = df_OLX_SP.withColumnRenamed("localizacao", "Logradouro")
df_OLX_SP = df_OLX_SP.withColumnRenamed("bairro", "Bairro")

# 3. Convert cep to string 
df_cep_sp = df_cep_sp.withColumn("Cep", col("Cep").cast(StringType()))


In [0]:
# 4. Deduplicate CEP dataset (pick one cep for each logradouro)

# Count how many times each cep appears in each neighborhood
df_cep_freq = df_cep_sp.groupBy("Bairro", "Cep") \
    .agg(count("*").alias("freq"))

window_spec = Window.partitionBy( "Bairro").orderBy(col("freq").desc(), col("Cep"))
df_cep_dedup = df_cep_freq.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("freq", "rn")

In [0]:
# 5. Main join by Bairro

df_join = df_OLX_SP.join(
    df_cep_dedup.select("Bairro", "Cep"),
    on=["Bairro"],
    how="left"
).select(df_OLX_SP["*"], df_cep_dedup["Cep"])  # Avoid errors


In [0]:
# 6. Identify rows without CEP

df_sem_cep = df_join.filter(col("Cep").isNull())

print(df_join.filter(col("Cep").isNull()).count())
print(df_join.count())

2
4969


In [0]:
# 7. Verify final results

print("Total de imóveis:", df_join.count())
print("Imóveis sem CEP:", df_join.filter(col("Cep").isNull()).count())

# Count ok!

Total de imóveis: 4969
Imóveis sem CEP: 2


In [0]:
df_final = df_join.filter(~col("Cep").isNull()) # ~ means negative condition


#####Save dataset OLX SP

In [0]:
# Apply Id function

df_final = adicionar_id_imovel(df_final, 1102, nome_coluna="ID_imovel")

In [0]:
# Map columns to schema
column_mapping = {
    'ID_imovel': 'ID_imovel',
    'Link': 'Link',
    'titulo':  'Titulo',
    'logradouro': 'Endereco',
    'Cep': 'Cep',
    'preco_aluguel': 'Preco',
    'preco_condominio': 'Condominio',
    'iptu': 'IPTU',
    'area': 'Area', 
    'quartos': 'Dormitorios',
    'banheiros': 'Banheiros',
    'garagem': 'Garagem',
}

# Renaming columns in the DataFrame using Pyspark
for old_name, new_name in column_mapping.items():
    df_final = df_final.withColumnRenamed(old_name, new_name)

# Reorder the columns
esquema= ['ID_imovel', 'Link', 'Titulo', 'Endereco', 'Bairro', 'Cep', 'Preco', 'Condominio', 'Iptu', 'Area', 'Dormitorios', 'Banheiros', 'Garagem']

df_final = df_final.select(esquema)


In [0]:
df_final.show(5)

+---------+----+--------------------+--------------------+------------+--------+-------+----------+------+----+-----------+---------+-------+
|ID_imovel|Link|              Titulo|            Endereco|      Bairro|     Cep|  Preco|Condominio|  Iptu|Area|Dormitorios|Banheiros|Garagem|
+---------+----+--------------------+--------------------+------------+--------+-------+----------+------+----+-----------+---------+-------+
| 11020001|   0|Casa Locação Alto...|são paulo, alto d...|alto da lapa|05059000|16096.0|       0.0|1300.0| 300|          3|        4|   null|
| 11020002|   0|Apartamento Locaç...|são paulo, alto d...|alto da lapa|05059000| 7299.0|     840.0| 290.0|  87|          2|        3|   null|
| 11020003|   0|Apartamento Locaç...|são paulo, alto d...|alto da lapa|05059000|11998.0|    3200.0|1800.0| 280|          4|        5|   null|
| 11020004|   0|Kitnet 30m² 1 Dor...|são paulo, alto d...|alto da lapa|05059000| 1670.0|       0.0|   0.0|  30|          1|        1|   null|
| 1102

In [0]:
# Verify duplicates, if endereco e preco are the same, hence i can consider duplicates

df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1) \
    .orderBy(col("qtde").desc()) \
    .show(truncate=False)
    

+-------------------------------+-------+----+
|Endereco                       |Preco  |qtde|
+-------------------------------+-------+----+
|são paulo, jardim das vertentes|2100.0 |53  |
|são paulo, jardim das vertentes|1800.0 |50  |
|são paulo, pinheiros           |5800.0 |34  |
|são paulo, vila pompéia        |5900.0 |31  |
|são paulo, pinheiros           |3500.0 |30  |
|são paulo, pinheiros           |4000.0 |29  |
|são paulo, pinheiros           |4500.0 |25  |
|são paulo, pinheiros           |7000.0 |25  |
|são paulo, pinheiros           |6500.0 |23  |
|são paulo, lapa                |3750.0 |23  |
|são paulo, perdizes            |3000.0 |22  |
|são paulo, perdizes            |4500.0 |21  |
|são paulo, pinheiros           |8000.0 |20  |
|são paulo, pinheiros           |2500.0 |20  |
|são paulo, pinheiros           |5500.0 |19  |
|são paulo, sumaré              |5000.0 |19  |
|são paulo, pinheiros           |10000.0|18  |
|são paulo, pinheiros           |2350.0 |18  |
|são paulo, b

In [0]:
# Count duplicates
duplicates = df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1)

print("Total de combinações duplicadas:", duplicates.count())

Total de combinações duplicadas: 898


In [0]:
# Drop duplicates
df_final = df_final.dropDuplicates(["Endereco", "Preco"])

print(df_final.count()) # Finished with 1960 rows of data

1960


In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.olx_sp"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/olx_sp", recurse=True) 

# Write delta table in silver layer
df_final.write.format("delta").mode("overwrite").saveAsTable("silver_imoveis.olx_sp")

####ZAP Rio

In [0]:
# Open dataframe from the bronze layer

df_ZAP_RJ = spark.table("bronze_imoveis.zap_rio")

display(df_ZAP_RJ.printSchema)
display(df_ZAP_RJ.sample(.005))

<bound method DataFrame.printSchema of DataFrame[link: string, preco: bigint, endereco: string, area: bigint, iptu: double, preco_condominio: double, quartos: string, banheiros: string, garagem: string]>

link,preco,endereco,area,iptu,preco_condominio,quartos,banheiros,garagem
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-lagoa-rio-de-janeiro-66m2-id-2779285837/,4500,Avenida Epitácio Pessoa,66,374.0,1222.0,2,1,1


In [0]:
# Extract titulo from the link string

df_ZAP_RJ = df_ZAP_RJ.withColumn(
    "Titulo",
    regexp_extract(
        expr("concat_ws('-', slice(split(link, '/'), 5, 1))"),
        r'^(.*?)-rio-de-janeiro',
        1
    )
)


#####Enrichment with cep info

In [0]:
# Open table cep_rio as pyspark dataset

df_cep_rj = spark.table("silver_aux.cep_rio")

In [0]:
# 1. Defining a function to normalize strings: all column strings should be in lower case
def normalizar_col(df, col_name):
    return df.withColumn(col_name, trim(lower(col(col_name))))

# Apply function to normalize
for col_name in ["Endereco"]:
    df_ZAP_RJ = normalizar_col(df_ZAP_RJ, col_name)

for col_name in ["logradouro"]:
    df_cep_rj = normalizar_col(df_cep_rj, col_name)


In [0]:
# 2. Rename columns localizacao to Logradouro and bairro to Bairro
df_ZAP_RJ = df_ZAP_RJ.withColumnRenamed("Endereco", "Logradouro")

# 3. Convert cep to string 
df_cep_rj = df_cep_rj.withColumn("Cep", col("Cep").cast(StringType()))


In [0]:
# 4. Deduplicate CEP dataset (pick one cep for each logradouro)

# Count how many times each cep appears in each neighborhood
df_cep_freq = df_cep_rj.groupBy("Logradouro", "Cep") \
    .agg(count("*").alias("freq"),
         first("Bairro").alias("Bairro")) # keep bairro info

window_spec = Window.partitionBy( "Logradouro").orderBy(col("freq").desc(), col("Cep"))
df_cep_dedup = df_cep_freq.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("freq", "rn")


In [0]:
# 5. Main join by Logradouro

df_join = df_ZAP_RJ.join(
    df_cep_dedup.select("Logradouro", "Cep", "Bairro"),
    on=["Logradouro"],
    how="left"
).select(df_ZAP_RJ["*"], df_cep_dedup["Cep"].alias("Cep"), df_cep_dedup["Bairro"].alias("Bairro"))  # Avoid errors


In [0]:
# 6. Identify rows without CEP

df_sem_cep = df_join.filter(col("Cep").isNull())

print(df_join.filter(col("Cep").isNull()).count())
print(df_join.count()) 
# There are 46 rows withou cep information after join

46
387


In [0]:
# Extract bairros from link string if the string contains desired texts to fill df_sem_ep rows

bairros = ["copacabana", "ipanema", "leblon", "botafogo", "flamengo", "lapa", "gloria", "catete", "urca", "cosme-velho"]

df_sem_cep = df_sem_cep.withColumn("Bairro_join", lit(None))

# Search with contains
for bairro in bairros:
    df_sem_cep = df_sem_cep.withColumn(
        "Bairro_join",
        when(col("Bairro_join").isNull() & col("Link").contains(bairro), bairro).otherwise(col("Bairro_join")))
    

In [0]:
print(df_sem_cep.filter(col("Bairro_join").isNotNull()).count())
print(df_sem_cep.count())
# Count ok drop 2 rows 

44
46


In [0]:
df_sem_cep = df_sem_cep.filter(col("Bairro_join").isNotNull())
print(df_sem_cep.count())   # count ok                        

44


In [0]:
# Normalizar nomes dos bairros
df_cep_dedup_normalizado = df_cep_dedup.withColumn("Bairro_norm", lower(trim(col("Bairro"))))

# Escolher um CEP por bairro (por ordem alfabética ou qualquer outro critério)
f_cep_freq = df_cep_dedup.groupBy("Bairro", "Cep") \
    .agg(count("*").alias("freq"),
         first("Bairro").alias("Bairro")) # keep bairro info

window_spec = Window.partitionBy( "Bairro").orderBy(col("freq").desc(), col("Cep"))

df_cep_dedup_bairro = df_cep_freq.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("freq", "rn")


In [0]:
# Second join with sem cep dataframe to capture cep for this 44 rows

# Normalize column Bairro
df_cep_dedup_bairro_norm = df_cep_dedup_bairro.withColumn(
    "Bairro_normalizado",
    regexp_replace(lower(col("Bairro")), "[^a-z0-9]", ""))


# Perform join between datasets
df_sem_cep_enriched = df_sem_cep.join(
    df_cep_dedup_bairro_norm.select(
        col("Bairro_normalizado"),
        col("Cep").alias("Cep_enriched")
    ),
    df_sem_cep["Bairro_join"] == df_cep_dedup_bairro_norm["Bairro_normalizado"],
    how="left"
).drop(df_cep_dedup_bairro_norm["Bairro_normalizado"])


# Fill Cep values with cep from base_cep_dedup_bairro 
df_sem_cep_enriched = df_sem_cep_enriched.withColumn(
    "Cep",
    coalesce(col("Cep"), col("Cep_enriched"))
).drop("Cep_enriched")



In [0]:
df_sem_cep_enriched.show(5)
                      

+--------------------+-----+----------+----+------+----------------+-------+---------+-------+--------------------+--------+------+-----------+
|                link|preco|Logradouro|area|  iptu|preco_condominio|quartos|banheiros|garagem|              Titulo|     Cep|Bairro|Bairro_join|
+--------------------+-----+----------+----+------+----------------+-------+---------+-------+--------------------+--------+------+-----------+
|https://www.zapim...|10000|      null| 180| 719.0|          2100.0|      4|        3|      1|venda-apartamento...|22010000|  null| copacabana|
|https://www.zapim...|23000|      null| 240|1562.0|          3008.0|      4|        2|      2|aluguel-apartamen...|22071110|  null|    ipanema|
|https://www.zapim...|13000|      null| 421|1327.0|          4500.0|      4|        6|      1|venda-apartamento...|22231000|  null|   botafogo|
|https://www.zapim...| 5200|      null| 110| 541.0|          2200.0|      3|        2|      1|venda-apartamento...|22010000|  null| copa

In [0]:
print(df_sem_cep_enriched.count()) #count ok
print(df_sem_cep_enriched.filter(col("Cep").isNull()).count()) #count ok

44
0


In [0]:
print("df_join sem Nulls:", df_join.filter(~col('Cep').isNull()).count())  # Deve ser 341
print("df_sem_cep_enriched:", df_sem_cep_enriched.count()) # Deve ser 44

df_join sem Nulls: 341
df_sem_cep_enriched: 44


In [0]:
# Faz join entre df_join e df_sem_cep_enriched usando 'Link'
df_join_final = df_join.alias("base").join(
    df_sem_cep_enriched.select(
        "Link",
        col("Cep").alias("Cep_novo"),
        col("Bairro_join").alias("Bairro_novo")
    ).alias("enriq"),
    on="Link",
    how="left"
)

# Atualiza as colunas Cep e Bairro, mantendo os valores existentes se houver
df_join_final = df_join_final.withColumn(
    "Cep", coalesce(col("base.Cep"), col("Cep_novo"))
).withColumn(
    "Bairro", coalesce(col("base.Bairro"), col("Bairro_novo"))
).drop("Cep_novo", "Bairro_novo")


In [0]:
# Drop only rows without cep
df_join_final = df_join_final.filter((col("Cep").isNotNull()) & (trim(col("Cep")) != ""))


In [0]:
# 11. Verificar resultado final
print("Total de imóveis:", df_join_final.count())
print("Imóveis sem CEP:", df_join_final.filter(col("Cep").isNull()).count())

# count ok!

Total de imóveis: 413
Imóveis sem CEP: 0


In [0]:
df_join_final.show()

+--------------------+-----+--------------------+----+-------+----------------+-------+---------+-------+--------------------+--------+----------+
|                link|preco|          Logradouro|area|   iptu|preco_condominio|quartos|banheiros|garagem|              Titulo|     Cep|    Bairro|
+--------------------+-----+--------------------+----+-------+----------------+-------+---------+-------+--------------------+--------+----------+
|https://www.zapim...| 2500|rua dois de dezembro|  41|  104.0|           751.0|      1|        1|      0|aluguel-apartamen...|22220040|  Flamengo|
|https://www.zapim...| 3300| rua anita garibaldi|  90|  345.0|          1880.0|      3|        1|      0|venda-apartamento...|22041080|Copacabana|
|https://www.zapim...| 6490|avenida ataulfo d...|  73|  370.0|           970.0|      3|        1|   null|aluguel-apartamen...|22440032|    Leblon|
|https://www.zapim...| 7500|rua almirante gui...|  50|  366.0|          2396.0|      1|        1|      0|venda-flat-1-

In [0]:
df_final = df_join_final

#####Save dataset ZAP RJ

In [0]:
# Apply Id function

df_final = adicionar_id_imovel(df_final, 2103, nome_coluna="ID_imovel")

In [0]:
# Map columns to schema
column_mapping = {
    'ID_imovel': 'ID_imovel',
    'Link': 'Link',
    'Titulo':  'Titulo',
    'Logradouro': 'Endereco',
    'Bairro': 'Bairro',
    'Cep': 'Cep',
    'Preco': 'Preco',
    'preco_condominio': 'Condominio',
    'IPTU': 'IPTU',
    'Area': 'Area', 
    'quartos': 'Dormitorios',
    'Banheiros': 'Banheiros',
    'Garagem': 'Garagem',
}

# Renaming columns in the DataFrame using Pyspark
for old_name, new_name in column_mapping.items():
    df_final = df_final.withColumnRenamed(old_name, new_name)

# Reorder the columns
esquema= ['ID_imovel', 'Link', 'Titulo', 'Endereco', 'Bairro', 'Cep', 'Preco', 'Condominio', 'IPTU', 'Area', 'Dormitorios', 'Banheiros', 'Garagem']

df_final = df_final.select(esquema)


In [0]:
# Verify duplicates, if endereco e preco are the same, hence i can consider duplicates

df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1) \
    .orderBy(col("qtde").desc()) \
    .show(truncate=False)

+----------------------------+-----+----+
|Endereco                    |Preco|qtde|
+----------------------------+-----+----+
|null                        |23000|17  |
|null                        |13000|16  |
|avenida vieira souto        |40000|8   |
|null                        |20000|5   |
|avenida vieira souto        |35000|4   |
|rua das laranjeiras         |6500 |4   |
|avenida ataulfo de paiva    |9500 |4   |
|avenida general san martin  |18000|4   |
|null                        |15990|4   |
|rua professor arthur ramos  |11800|4   |
|rua aristides espinola      |8500 |4   |
|rua vinícius de moraes      |15500|3   |
|avenida bartolomeu mitre    |10500|3   |
|rua prudente de morais      |19900|3   |
|rua visconde de pirajá      |9200 |3   |
|rua raimundo correia        |4150 |3   |
|rua prudente de morais      |18000|3   |
|rua professor sabóia ribeiro|8500 |3   |
|avenida epitácio pessoa     |15000|3   |
|avenida epitácio pessoa     |5500 |3   |
+----------------------------+----

In [0]:
# Count duplicates
duplicates = df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1)

print("Total de combinações duplicadas:", duplicates.count())

Total de combinações duplicadas: 44


In [0]:
df_final = df_final.dropDuplicates(["Endereco", "Preco"])

print(df_final.count()) # Finished with 303 rows of data

303


In [0]:
df_final.show(5)

+---------+--------------------+--------------------+--------+----------+--------+-----+----------+------+----+-----------+---------+-------+
|ID_imovel|                Link|              Titulo|Endereco|    Bairro|     Cep|Preco|Condominio|  IPTU|Area|Dormitorios|Banheiros|Garagem|
+---------+--------------------+--------------------+--------+----------+--------+-----+----------+------+----+-----------+---------+-------+
| 21030079|https://www.zapim...|aluguel-apartamen...|    null|copacabana|22010000|  300|      null|  null|  48|          1|        1|   null|
| 21030192|https://www.zapim...|aluguel-apartamen...|    null|copacabana|22010000|  350|     463.0|  90.0|  36|          1|        1|   null|
| 21030255|https://www.zapim...|aluguel-quitinete...|    null|copacabana|22010000| 1200|     780.0| 100.0|  25|          1|        1|      0|
| 21030044|https://www.zapim...|venda-apartamento...|    null|copacabana|22010000| 5200|    2200.0| 541.0| 110|          3|        2|      1|
| 2103

In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.zap_rio"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/zap_rio", recurse=True) 

df_final.write.format("delta")\
    .mode("overwrite")\
        .option("overwriteSchema", "true")\
            .saveAsTable("silver_imoveis.zap_rio")
            

####ZAP SP

In [0]:
%sql
-- Using SQL to create a copy in silver database before applying transformations

CREATE OR REPLACE TABLE silver_imoveis.zap_sp_sql AS
SELECT *
FROM bronze_imoveis.zap_sp;

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- Using describe to display data types and column names

DESCRIBE silver_imoveis.zap_sp_sql

col_name,data_type,comment
link,string,
preco,string,
endereco,string,
title,string,
area,string,
iptu,array,
preco_condominio,array,
quartos,string,
banheiros,string,
garagem,string,


In [0]:
%sql
SELECT * 
FROM silver_imoveis.zap_sp_sql
LIMIT 5

link,preco,endereco,title,area,iptu,preco_condominio,quartos,banheiros,garagem
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-3-quartos-mobiliado-pompeia-zona-oeste-sao-paulo-sp-70m2-id-2790787127/,R$ 4.496,Rua Venâncio Aires,"Vila Pompéia, São Paulo",70 m²,"List(Cond. R$ 1.236 , IPTU R$ 328)","List(Cond. R$ 1.236 , IPTU R$ 328)",3,2,2.0
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-com-academia-perdizes-zona-oeste-sao-paulo-sp-61m2-id-2783198119/,R$ 5.000,,"Sumaré, São Paulo",61 m²,"List(Cond. R$ 1.271 , IPTU R$ 425)","List(Cond. R$ 1.271 , IPTU R$ 425)",2,2,2.0
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-1-quarto-com-academia-pompeia-zona-oeste-sao-paulo-sp-25m2-id-2780587822/,R$ 3.499,Rua Venâncio Aires,"Vila Pompéia, São Paulo",25 m²,"List(Cond. R$ 525 , IPTU R$ 100)","List(Cond. R$ 525 , IPTU R$ 100)",1,1,
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-com-piscina-jaguare-sao-paulo-49m2-id-2792125040/,R$ 1.500,Rua Floresto Bandecchi,"Jaguaré, São Paulo",49 m²,"List(Cond. R$ 543 , IPTU R$ 55)","List(Cond. R$ 543 , IPTU R$ 55)",2,1,1.0
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-vila-madalena-sao-paulo-46m2-id-2792230750/,R$ 2.800,Rua Girassol,"Vila Madalena, São Paulo",46 m²,"List(Cond. R$ 831 , IPTU R$ 73)","List(Cond. R$ 831 , IPTU R$ 73)",2,1,1.0


In [0]:
%sql
-- Group all transformations inside a multiple CTE command and replace the table in silver layer

CREATE OR REPLACE TABLE silver_imoveis.zap_sp_sql AS
WITH 
extracted_condo AS (
  SELECT 
    *,
    CASE WHEN size (preco_condominio) > 0 THEN preco_condominio[0] ELSE NULL END 
    AS first_condo  -- 1st array element of condominio column
  FROM silver_imoveis.zap_sp_sql),

extracted_iptu AS (
  SELECT
  *, 
  CASE WHEN size (iptu) > 1 THEN iptu[1] ELSE NULL END 
  AS first_iptu -- Extract 2nd value of array of iptu column
FROM extracted_condo),

cleaned_data AS ( -- Formatting columns
  SELECT
    * EXCEPT (preco, area, preco_condominio, first_condo, iptu, first_iptu),
    CAST(REPLACE(REGEXP_EXTRACT(first_condo, '([0-9]+[\\.]*[0-9]*)', 1), '.', '') AS FLOAT) AS condominio_cleaned,

    CAST(REPLACE(REGEXP_EXTRACT(first_iptu, '([0-9]+[\\.]*[0-9]*)', 1), '.', '') AS FLOAT) AS iptu_cleaned,

    CAST(REPLACE(REGEXP_EXTRACT(preco, '([0-9]+[.]*[0-9]*)', 1), '.', '') AS FLOAT) AS preco_cleaned,

    CAST(REPLACE(REGEXP_EXTRACT(area, '(^[0-9]+)', 1), '.', '') AS INT) AS area_cleaned

FROM extracted_iptu)

-- renaming, reordering and changing data types 
SELECT 
  CAST(link AS STRING) AS Link,
  CAST(title AS STRING) AS Titulo,
  CAST(endereco AS STRING) AS Endereco,
  CAST(REGEXP_EXTRACT(title, '^([^,]+)', 1) AS STRING) AS Bairro,
  CAST('0' AS STRING) AS Cep,
  CAST(preco_cleaned AS FLOAT) AS Preco,
  CAST(condominio_cleaned AS FLOAT) AS Condominio,
  CAST(iptu_cleaned AS FLOAT) AS IPTU,
  CAST(area_cleaned AS INT) AS Area,
  CAST(quartos AS INT) AS Dormitorios,
  CAST(banheiros AS INT) AS Banheiros,
  CAST(garagem AS INT) AS Garagem
   
FROM cleaned_data;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * 
FROM silver_imoveis.zap_sp_sql
LIMIT 5

Link,Titulo,Endereco,Bairro,Cep,Preco,Condominio,IPTU,Area,Dormitorios,Banheiros,Garagem
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-3-quartos-mobiliado-pompeia-zona-oeste-sao-paulo-sp-70m2-id-2790787127/,"Vila Pompéia, São Paulo",Rua Venâncio Aires,Vila Pompéia,0,4496.0,1236.0,328.0,70,3,2,2.0
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-com-academia-perdizes-zona-oeste-sao-paulo-sp-61m2-id-2783198119/,"Sumaré, São Paulo",,Sumaré,0,5000.0,1271.0,425.0,61,2,2,2.0
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-1-quarto-com-academia-pompeia-zona-oeste-sao-paulo-sp-25m2-id-2780587822/,"Vila Pompéia, São Paulo",Rua Venâncio Aires,Vila Pompéia,0,3499.0,525.0,100.0,25,1,1,
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-com-piscina-jaguare-sao-paulo-49m2-id-2792125040/,"Jaguaré, São Paulo",Rua Floresto Bandecchi,Jaguaré,0,1500.0,543.0,55.0,49,2,1,1.0
https://www.zapimoveis.com.br/imovel/aluguel-apartamento-2-quartos-vila-madalena-sao-paulo-46m2-id-2792230750/,"Vila Madalena, São Paulo",Rua Girassol,Vila Madalena,0,2800.0,831.0,73.0,46,2,1,1.0


#####Enrichment with cep info

In [0]:
# Open table zap_sp_sql, that was previously treated (in sql) as pyspark dataset

df_ZAP_SP = spark.table("silver_imoveis.zap_sp_sql")

In [0]:
# Open table cep_sp as pyspark dataset

df_cep_sp = spark.table("silver_aux.cep_sp")

In [0]:
# 1. Defining a function to normalize strings: all column strings should be in lower case
def normalizar_col(df, col_name):
    return df.withColumn(col_name, trim(lower(col(col_name))))

# Apply function to normalize
for col_name in ["Endereco", "Bairro"]:
    df_ZAP_SP = normalizar_col(df_ZAP_SP, col_name)

for col_name in ["logradouro", "bairro"]:
    df_cep_sp = normalizar_col(df_cep_sp, col_name)


In [0]:
# 2. Rename columns localizacao to Logradouro
df_ZAP_SP = df_ZAP_SP.withColumnRenamed("Endereco", "Logradouro")

# 3. Convert cep to string 
df_cep_sp = df_cep_sp.withColumn("Cep", col("Cep").cast(StringType()))


In [0]:
# 4. Deduplicate CEP database (choose only one CEP for each logradouro + bairro)
window_spec = Window.partitionBy("Logradouro", "Bairro").orderBy("Cep")

df_cep_dedup = df_cep_sp.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("rn")

In [0]:
# 5. Main join using Logradouro + Bairro

df_join = df_ZAP_SP.join(
    df_cep_dedup.select(
        col("Logradouro"),
        col( "Bairro"),
        col("Cep").alias("Cep_join")),
    on=["Logradouro", "Bairro"],
    how="left"
).select(df_ZAP_SP["*"], col("Cep_join"))  # avoid errors in column names


In [0]:
display(df_join.show(5))

+--------------------+--------------------+--------------------+-------------+---+------+----------+-----+----+-----------+---------+-------+--------+
|                Link|              Titulo|          Logradouro|       Bairro|Cep| Preco|Condominio| IPTU|Area|Dormitorios|Banheiros|Garagem|Cep_join|
+--------------------+--------------------+--------------------+-------------+---+------+----------+-----+----+-----------+---------+-------+--------+
|https://www.zapim...|Vila Pompéia, São...|  rua venâncio aires| vila pompéia|  0|4496.0|    1236.0|328.0|  70|          3|        2|      2|05024030|
|https://www.zapim...|   Sumaré, São Paulo|                null|       sumaré|  0|5000.0|    1271.0|425.0|  61|          2|        2|      2|    null|
|https://www.zapim...|Vila Pompéia, São...|  rua venâncio aires| vila pompéia|  0|3499.0|     525.0|100.0|  25|          1|        1|   null|05024030|
|https://www.zapim...|  Jaguaré, São Paulo|rua floresto band...|      jaguaré|  0|1500.0|     

In [0]:
# 6. Filter not assigned CEP rows

df_sem_cep = df_join.filter(col("Cep_join").isNull())
print(df_sem_cep.count())

print(df_join.count())
# 128 rows were not populated with ceps let's try a fallback join

128
1500


In [0]:
# 7. Deduplicate cep data using only bairro to perform second fallback join
df_cep_bairro = df_cep_dedup.dropDuplicates(["Bairro"]).select("Bairro", "Cep")

# Rename column cep to avoid errors
df_cep_bairro = df_cep_bairro.withColumnRenamed("Cep", "Cep_fallback")

In [0]:
# 8. Fallback join using only Bairro

df_fallback = df_sem_cep.join(
    df_cep_bairro,
    on="Bairro",
    how="left")


In [0]:
print(df_fallback.filter(col("Cep_fallback").isNull()).count())
print(df_fallback.count())
# count ok!

0
128


In [0]:
# Replace null values in column cep with cep fallback, then remove cep_fallback column

# drop column cep from the original df
df_fallback = df_fallback.drop("Cep")

df_fallback = df_fallback.withColumn(
    "Cep_join",
    when((col("Cep_join").isNull()) | (col("Cep_fallback") == "0"), col("Cep_fallback"))
    .otherwise(col("Cep_join"))
).drop("Cep_fallback")


In [0]:
# 9. Replace null CEP with fallback

df_com_cep = df_join.filter(col("Cep_join").isNotNull())

print(df_com_cep.count())


1372


In [0]:
# 10. Perform final union (keep same schema/columns)
df_com_cep = df_com_cep.drop("Cep")
df_fallback = df_fallback.drop("Cep")

df_final = df_com_cep.unionByName(df_fallback)

In [0]:
# Rename column cep join to Cep

df_final = df_final.withColumnRenamed("Cep_join", "Cep")

In [0]:
# 11. Check final results 

print("Total de imóveis:", df_final.count())
print("Imóveis sem CEP:", df_final.filter(col("Cep").isNull()).count())

# Count ok!

Total de imóveis: 1500
Imóveis sem CEP: 0


In [0]:
print(df_final.show(5))

+--------------------+--------------------+----------------+---------------+------+----------+------+----+-----------+---------+-------+--------+
|                Link|              Titulo|      Logradouro|         Bairro| Preco|Condominio|  IPTU|Area|Dormitorios|Banheiros|Garagem|     Cep|
+--------------------+--------------------+----------------+---------------+------+----------+------+----+-----------+---------+-------+--------+
|https://www.zapim...|Jardim Paulista, ...|alameda campinas|jardim paulista|4000.0|    1637.0|2326.0|  90|          2|        2|      1|01404000|
|https://www.zapim...|Jardim Paulista, ...|alameda campinas|jardim paulista|4000.0|       1.0|   1.0|  30|          1|        1|      1|01404000|
|https://www.zapim...|Jardim Paulista, ...|alameda campinas|jardim paulista|2320.0|    1200.0| 280.0|  33|          1|        1|      1|01404000|
|                null|Jardim Paulista, ...|alameda campinas|jardim paulista|4200.0|      null|  null|  30|          1|      

#####Save dataset ZAP SP

In [0]:
# Apply Id function

df_final = adicionar_id_imovel(df_final, 1103, nome_coluna="ID_imovel")

In [0]:
# Map columns to schema
column_mapping = {
    'ID_imovel': 'ID_imovel',
    'Link': 'Link',
    'titulo':  'Titulo',
    'Logradouro': 'Endereco',
    'bairro': 'Bairro',
    'Cep': 'Cep',
    'Preco_': 'Preco',
    'Condominio': 'Condominio',
    'IPTU': 'IPTU',
    'Area': 'Area', 
    'Dormitorios': 'Dormitorios',
    'Banheiros': 'Banheiros',
    'Garagem': 'Garagem',
}

# Renaming columns in the DataFrame using Pyspark
for old_name, new_name in column_mapping.items():
    df_final = df_final.withColumnRenamed(old_name, new_name)

# Reorder the columns
esquema= ['ID_imovel', 'Link', 'Titulo', 'Endereco', 'Bairro', 'Cep', 'Preco', 'Condominio', 'Iptu', 'Area', 'Dormitorios', 'Banheiros', 'Garagem']
df_final = df_final.select(esquema)


In [0]:
df_final.show(5)

+---------+--------------------+--------------------+----------------+---------------+--------+------+----------+------+----+-----------+---------+-------+
|ID_imovel|                Link|              Titulo|        Endereco|         Bairro|     Cep| Preco|Condominio|  Iptu|Area|Dormitorios|Banheiros|Garagem|
+---------+--------------------+--------------------+----------------+---------------+--------+------+----------+------+----+-----------+---------+-------+
| 11030001|https://www.zapim...|Jardim Paulista, ...|alameda campinas|jardim paulista|01404000|4000.0|    1637.0|2326.0|  90|          2|        2|      1|
| 11030002|https://www.zapim...|Jardim Paulista, ...|alameda campinas|jardim paulista|01404000|4000.0|       1.0|   1.0|  30|          1|        1|      1|
| 11030003|https://www.zapim...|Jardim Paulista, ...|alameda campinas|jardim paulista|01404000|2320.0|    1200.0| 280.0|  33|          1|        1|      1|
| 11030004|                null|Jardim Paulista, ...|alameda cam

In [0]:
# Verify duplicates, if endereco e preco are the same, hence i can consider duplicates

df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1) \
    .orderBy(col("qtde").desc()) \
    .show(truncate=False)

+-------------------------------------+-------+----+
|Endereco                             |Preco  |qtde|
+-------------------------------------+-------+----+
|rua benedito branco de abreu         |1800.0 |7   |
|rua da consolação                    |8100.0 |6   |
|rua harmonia                         |3800.0 |5   |
|rua benedito branco de abreu         |2100.0 |5   |
|rua joseph nigri                     |5500.0 |5   |
|null                                 |3000.0 |5   |
|rua joão miguel jarra                |2600.0 |4   |
|rua alves guimarães                  |4500.0 |4   |
|rua francisco leitão                 |10800.0|4   |
|rua federação paulista de futebol    |2700.0 |4   |
|rua doutor virgílio de carvalho pinto|5999.0 |4   |
|rua apinajés                         |4996.0 |4   |
|rua diana                            |2999.0 |4   |
|null                                 |3500.0 |4   |
|rua sebastião gil                    |2800.0 |3   |
|rua fernão dias                      |3800.0 

In [0]:
duplicates = df_final.groupBy("Endereco", "Preco") \
    .agg(count("*").alias("qtde")) \
    .filter(col("qtde") > 1)

print("Total de combinações duplicadas:", duplicates.count())

Total de combinações duplicadas: 200


In [0]:
df_final = df_final.dropDuplicates(["Endereco", "Preco"])

print(df_final.count()) # Finished with 1223 rows of data

1223


In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.zap_sp"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/zap_sp", recurse=True) 

df_final.write.format("delta")\
    .mode("overwrite")\
        .option("overwriteSchema", "true")\
            .saveAsTable("silver_imoveis.zap_sp")

## Etapa 3 Unificar as tabelas por cidade

####Imóveis Rio

In [0]:
# Create new tables in the silver layer for rio data

# Open 3 already treated datasets for rio from the silver layer

df_VR_RJ = spark.table("silver_imoveis.vivareal_rio")

df_OLX_RJ = spark.table("silver_imoveis.olx_rio")

df_ZAP_RJ = spark.table("silver_imoveis.zap_rio")


In [0]:
# Perform Union

df_RJ_All = df_ZAP_RJ.unionByName(df_OLX_RJ).unionByName(df_VR_RJ)


In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.imoveis_rio"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/imoveis_rio", recurse=True) 

df_RJ_All.write.format("delta")\
    .mode("overwrite")\
        .option("overwriteSchema", "true")\
            .saveAsTable("silver_imoveis.imoveis_rio")

####Imóveis São Paulo

In [0]:
# Create new tables in the silver layer for rio data

# Open 3 already treated datasets for rio from the silver layer

df_VR_SP = spark.table("silver_imoveis.vivareal_sp")

df_OLX_SP = spark.table("silver_imoveis.olx_sp")

df_ZAP_SP = spark.table("silver_imoveis.zap_sp")

In [0]:
# Perform Union

df_SP_All = df_ZAP_SP.unionByName(df_OLX_SP).unionByName(df_VR_SP)

In [0]:
# Saving delta table in silver_imoveis database

tabela = "silver_imoveis.imoveis_sp"

if spark._jsparkSession.catalog().tableExists(tabela):
    spark.sql(f"DROP TABLE {tabela}")

# Delete metadata after dropping table(if it exists)
dbutils.fs.rm("dbfs:/user/hive/warehouse/silver_imoveis.db/imoveis_sp", recurse=True) 

df_SP_All.write.format("delta")\
    .mode("overwrite")\
        .option("overwriteSchema", "true")\
            .saveAsTable("silver_imoveis.imoveis_sp")