In [0]:
# --- Bloco de Importações do Projeto ---

# Importação da Sessão Spark e Funções Nativas
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, when, split, make_date, year, 
    avg, percentile_approx, count, desc, format_number
)

# Importação de Módulos Específicos
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

## Camada Bronze: Carga dos Dados Brutos

Nesta etapa, realizamos a ingestão dos dados brutos a partir dos arquivos de origem localizados no Databricks Volumes. Para garantir a resiliência do nosso pipeline, aplicamos uma estratégia para lidar com dados malformados durante a leitura do arquivo CSV.

In [0]:
# --- Carga dos Dados (Bronze) ---

# Carregamos o CSV com a opção .option("mode", "DROPMALFORMED").
# Esta foi uma descoberta crucial da nossa investigação, pois o arquivo continha uma linha com mais colunas que o cabeçalho.
# Esta opção instrui o Spark a descartar silenciosamente qualquer linha estruturalmente corrompida, prevenindo erros na carga.
caminho_csv = "/Volumes/workspace/melbourne_housing_market/melbourne_housing_volume/MELBOURNE_HOUSE_PRICES_LESS.csv"
df_csv_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "DROPMALFORMED") \
    .load(caminho_csv)

# Carregamos o JSON que contém dados mais detalhados
caminho_json = "/Volumes/workspace/melbourne_housing_market/melbourne_housing_volume/Melbourne_housing_FULL.json"
df_json_bronze = spark.read.json(caminho_json, multiLine=True)

print("Dados brutos carregados com sucesso!")

Dados brutos carregados com sucesso!


## Camada Silver: Limpeza, Transformação e Unificação (ETL)

Esta é a seção principal do nosso pipeline de ETL. O objetivo é transformar os dados brutos em um único DataFrame limpo, consistente e pronto para a análise.

In [0]:
# --- Etapa 1: Padronização de Nomes (snake_case) ---
# Renomeamos manualmente as colunas para garantir um padrão consistente e corrigir erros de digitação (ex: Longtitude -> longitude).
df_csv_renamed = df_csv_bronze.withColumnRenamed("Suburb", "suburb") \
    .withColumnRenamed("Address", "address") \
    .withColumnRenamed("Rooms", "rooms") \
    .withColumnRenamed("Type", "type") \
    .withColumnRenamed("Price", "price") \
    .withColumnRenamed("Method", "method") \
    .withColumnRenamed("SellerG", "seller_g") \
    .withColumnRenamed("Date", "date") \
    .withColumnRenamed("Distance", "distance") \
    .withColumnRenamed("Postcode", "post_code") \
    .withColumnRenamed("Regionname", "region_name") \
    .withColumnRenamed("Propertycount", "property_count") \
    .withColumnRenamed("CouncilArea", "council_area")

df_json_renamed = df_json_bronze.withColumnRenamed("Suburb", "suburb") \
    .withColumnRenamed("Address", "address") \
    .withColumnRenamed("Rooms", "rooms") \
    .withColumnRenamed("Type", "type") \
    .withColumnRenamed("Price", "price") \
    .withColumnRenamed("Method", "method") \
    .withColumnRenamed("SellerG", "seller_g") \
    .withColumnRenamed("Date", "date") \
    .withColumnRenamed("Distance", "distance") \
    .withColumnRenamed("Postcode", "post_code") \
    .withColumnRenamed("Bedroom2", "bedroom_2") \
    .withColumnRenamed("Bathroom", "bathroom") \
    .withColumnRenamed("Car", "car") \
    .withColumnRenamed("Landsize", "land_size") \
    .withColumnRenamed("BuildingArea", "building_area") \
    .withColumnRenamed("YearBuilt", "year_built") \
    .withColumnRenamed("CouncilArea", "council_area") \
    .withColumnRenamed("Lattitude", "latitude") \
    .withColumnRenamed("Longtitude", "longitude") \
    .withColumnRenamed("Regionname", "region_name") \
    .withColumnRenamed("Propertycount", "property_count")

print("Nomes das colunas padronizados para snake_case.")

Nomes das colunas padronizados para snake_case.


In [0]:
# --- Etapa 2: Harmonização e Unificação ---
# A causa raiz do nosso principal desafio de ETL foi a inconsistência de tipos entre os DataFrames.
# Para resolver, convertemos temporariamente TODAS as colunas para String ANTES da união.
# Isso garante que os schemas sejam idênticos, permitindo uma união segura e sem erros de cast.

df_csv_str = df_csv_renamed
for c in df_csv_str.columns: df_csv_str = df_csv_str.withColumn(c, col(c).cast("string"))

df_json_str = df_json_renamed
for c in df_json_str.columns: df_json_str = df_json_str.withColumn(c, col(c).cast("string"))

# Unificamos os DataFrames preservando todas as colunas de ambos
all_columns = sorted(list(set(df_csv_str.columns) | set(df_json_str.columns)))

def align_df_columns(df, all_cols):
    for col_name in all_cols:
        if col_name not in df.columns:
            df = df.withColumn(col_name, lit(None))
    return df.select(all_cols)

df_csv_aligned = align_df_columns(df_csv_str, all_columns)
df_json_aligned = align_df_columns(df_json_str, all_columns)
df_silver_unificado = df_csv_aligned.unionByName(df_json_aligned)

print("Schemas harmonizados e DataFrames unificados com sucesso.")

Schemas harmonizados e DataFrames unificados com sucesso.


In [0]:
# --- Etapa 3: Tratamento Final (Pós-União) ---
# Com o DataFrame unificado, agora aplicamos as conversões e regras de negócio.

# 3.1 - Converte as colunas para seus tipos corretos (Data e Numérico)
df_tratado = df_silver_unificado.withColumn("date_final", make_date(
    when(col("date").contains('/'), split(col("date"), "/").getItem(2).cast("integer")).otherwise(split(col("date"), "-").getItem(0).cast("integer")),
    when(col("date").contains('/'), split(col("date"), "/").getItem(1).cast("integer")).otherwise(split(col("date"), "-").getItem(1).cast("integer")),
    when(col("date").contains('/'), split(col("date"), "/").getItem(0).cast("integer")).otherwise(split(col("date"), "-").getItem(2).cast("integer"))
)).drop("date").withColumnRenamed("date_final", "date")

numeric_cols = ["rooms", "price", "distance", "post_code", "bedroom_2", "bathroom", "car", "land_size", "building_area", "year_built", "property_count", "latitude", "longitude"]
for num_col in numeric_cols:
    df_tratado = df_tratado.withColumn(num_col, col(num_col).cast("double"))

# 3.2 - Limpeza, mapeamento e tratamento de nulos
df_tratado = df_tratado.dropna(subset=["date", "price"])
type_map = {"h": "House", "u": "Unit", "t": "Townhouse"}
df_tratado = df_tratado.replace(to_replace=type_map, subset=['type'])

window_suburb = Window.partitionBy("suburb")
median_year_built = percentile_approx("year_built", 0.5).over(window_suburb)
median_building_area = percentile_approx("building_area", 0.5).over(window_suburb)
df_tratado = df_tratado.withColumn("year_built", when(col("year_built").isNull(), median_year_built).otherwise(col("year_built")))
df_tratado = df_tratado.withColumn("building_area", when(col("building_area").isNull(), median_building_area).otherwise(col("building_area")))
df_tratado = df_tratado.dropna(subset=["year_built", "building_area"])

# 3.3 - Engenharia de Features e Seleção Final de Colunas
df_silver_final = df_tratado.withColumn("age_of_property", year(col("date")) - col("year_built")) \
    .select(
        "suburb", "address", "rooms", "type", "price", "method", "seller_g", "date",
        "distance", "post_code", "bedroom_2", "bathroom", "car", "land_size", "building_area",
        "year_built", "council_area", "region_name", "property_count", "age_of_property"
    )

# 3.4 - Ajuste fino dos tipos (Opcional, mas recomendado para clareza)
integer_cols = ["rooms", "post_code", "bedroom_2", "bathroom", "car", "year_built", "property_count", "age_of_property"]
for int_col in integer_cols:
    df_silver_final = df_silver_final.withColumn(int_col, col(int_col).cast(IntegerType()))

print("DataFrame Silver finalizado e pronto para as análises!")
display(df_silver_final)

DataFrame Silver finalizado e pronto para as análises!


suburb,address,rooms,type,price,method,seller_g,date,distance,post_code,bedroom_2,bathroom,car,land_size,building_area,year_built,council_area,region_name,property_count,age_of_property
Albion,3/15 Drummartin St,3,Unit,515000.0,S,Douglas,2017-04-01,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,3A Kororoit St,4,House,717000.0,S,Bells,2017-04-01,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,2 Bazentin St,3,House,570000.0,S,Barry,2017-07-01,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,39 Sydney St,3,House,585000.0,S,hockingstuart,2018-06-02,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,58
Albion,3 Clairmont St,3,House,787000.0,S,hockingstuart,2017-06-03,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,40 Ridley St,5,House,905000.0,S,hockingstuart,2017-09-03,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,7 Dalworth St,2,House,765000.0,SA,Barry,2016-02-04,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,56
Albion,14 Delmont St,3,House,730000.0,S,hockingstuart,2017-03-04,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,12 Wyalong St,2,House,730000.0,S,Bells,2017-03-04,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,57
Albion,29 Norwood St,3,House,593000.0,SP,Sweeney,2016-06-04,10.5,3020,,,,,120.0,1960,Brimbank City Council,Western Metropolitan,2185.0,56


## Camada Gold: Análises e Insights

Com o DataFrame `df_silver_final` limpo e validado, realizamos as análises para responder às perguntas de negócio propostas.

In [0]:
# Pergunta de Negócio: O preço por metro quadrado para casas ('House') valorizou ou desvalorizou 
# entre o primeiro e o segundo semestre de 2017?

# Etapa 1: Filtra o DataFrame para conter apenas imóveis do tipo 'House' vendidos em 2017.
df_2017 = df_silver_final.filter((year(col("date")) == 2017) & (col("type") == "House"))

# Etapa 2: Adiciona 'semestre' e calcula 'preco_m2' com tratamento para evitar divisão por zero.
df_semestres = df_2017.withColumn("semestre", when(col("date").between("2017-01-01", "2017-06-30"), 1).otherwise(2)) \
                      .withColumn("preco_m2",
                                  # Usamos when() para verificar se o divisor é diferente de zero.
                                  when(col("building_area") != 0, col("price") / col("building_area"))
                                  .otherwise(None) # Se for zero, o resultado é NULO.
                                 )

# Etapa 3: Agrupa e calcula a média. A função avg() ignora automaticamente os valores nulos.
preco_medio_semestre = df_semestres.groupBy("semestre").agg(avg("preco_m2").alias("preco_medio_m2")).orderBy("semestre")

# Etapa 4: Formata a coluna numérica para uma exibição mais clara na tabela.
preco_medio_formatado = preco_medio_semestre.withColumn(
    "preco_medio_m2_formatado",
    format_number(col("preco_medio_m2"), 2)
)

print("--- Análise 1: Preço médio por m² por semestre em 2017 ---")
display(preco_medio_formatado.select("semestre", "preco_medio_m2_formatado"))

# Etapa 5: O cálculo da variação continua usando os valores numéricos originais.
s1_row = preco_medio_semestre.filter(col("semestre") == 1).first()
s2_row = preco_medio_semestre.filter(col("semestre") == 2).first()

print("\n--- Gráfico da Análise 1 ---")
display(preco_medio_semestre)

if s1_row and s2_row and s1_row['preco_medio_m2'] is not None and s2_row['preco_medio_m2'] is not None:
    s1_value = s1_row['preco_medio_m2']
    s2_value = s2_row['preco_medio_m2']
    if s1_value > 0:
      # A formatação aqui (f-string) já controla as casas decimais e adiciona separadores.
      print(f"\nPreço médio/m² no 1º semestre de 2017: ${s1_value:,.2f}")
      print(f"Preço médio/m² no 2º semestre de 2017: ${s2_value:,.2f}")
      variacao = ((s2_value - s1_value) / s1_value) * 100
      print(f"Variação Percentual: {variacao:.2f}%")
    else:
      print("\nNão foi possível calcular a variação pois o valor do primeiro semestre é zero.")
else:
    print("\nNão foi possível calcular a variação pois faltam dados de um dos semestres.")

--- Análise 1: Preço médio por m² por semestre em 2017 ---


semestre,preco_medio_m2_formatado
1,8578.78
2,8419.53



--- Gráfico da Análise 1 ---


Databricks visualization. Run in Databricks to view.

semestre,preco_medio_m2
1,8578.783841358327
2,8419.529402457985



Preço médio/m² no 1º semestre de 2017: $8,578.78
Preço médio/m² no 2º semestre de 2017: $8,419.53
Variação Percentual: -1.86%


In [0]:
# Pergunta de Negócio: Quais são os subúrbios mais caros (usando a mediana) 
# que possuem um mercado consolidado (com pelo menos 50 vendas)?

# Etapa 1: Agrupa por subúrbio, calcula a mediana e a contagem de imóveis.
df_gold_top5_suburbs = df_silver_final.groupBy("suburb") \
    .agg(
        percentile_approx("price", 0.5).alias("mediana_preco"),
        count("*").alias("qtde_imoveis")
    ) \
    .filter(col("qtde_imoveis") >= 50) \
    .orderBy(desc("mediana_preco")) \
    .limit(5)

# Etapa 2: Formata a coluna de mediana de preço para ter 0 casas decimais e separador de milhar.
df_top5_formatado = df_gold_top5_suburbs.withColumn(
    "mediana_preco_formatada",
    format_number(col("mediana_preco"), 0)
)

print("--- Análise 2: Top 5 subúrbios com maior mediana de preço (com no mínimo 50 imóveis na amostra) ---")
display(df_top5_formatado.select("suburb", "mediana_preco_formatada", "qtde_imoveis"))
print("\n--- Gráfico da Análise 2 ---")
display(df_gold_top5_suburbs)

--- Análise 2: Top 5 subúrbios com maior mediana de preço (com no mínimo 50 imóveis na amostra) ---


suburb,mediana_preco_formatada,qtde_imoveis
Canterbury,2200000,201
Malvern,2000000,275
Albert Park,1900000,221
Middle Park,1850000,118
Brighton,1826000,721



--- Gráfico da Análise 2 ---


Databricks visualization. Run in Databricks to view.

suburb,mediana_preco,qtde_imoveis
Canterbury,2200000.0,201
Malvern,2000000.0,275
Albert Park,1900000.0,221
Middle Park,1850000.0,118
Brighton,1826000.0,721


In [0]:
# Pergunta de Negócio (Adaptada): Como o preço médio dos imóveis se comportou ao longo dos anos 
# disponíveis no dataset? Houve uma tendência geral de alta ou de baixa no mercado?

# Etapa 1: Agrupa os dados pelo ano extraído da coluna 'date'.
df_gold_preco_tempo = df_silver_final.groupBy(year("date").alias("ano")) \
    .agg(avg("price").alias("preco_medio")) \
    .orderBy("ano")

# Etapa 2: Formata a coluna de preço médio para uma exibição mais clara na tabela.
df_preco_tempo_formatado = df_gold_preco_tempo.withColumn(
    "preco_medio_formatado",
    format_number(col("preco_medio"), 2)
)

print("--- Análise 3: Variação do Preço Médio dos Imóveis ao Longo do Tempo ---")
display(df_preco_tempo_formatado.select("ano", "preco_medio_formatado"))
print("\n--- Gráfico da Análise 3 ---")
display(df_gold_preco_tempo)

--- Análise 3: Variação do Preço Médio dos Imóveis ao Longo do Tempo ---


ano,preco_medio_formatado
2016,1004388.76
2017,1034251.51
2018,1001924.79



--- Gráfico da Análise 3 ---


Databricks visualization. Run in Databricks to view.

ano,preco_medio
2016,1004388.7570160028
2017,1034251.5060098856
2018,1001924.7929162636
