# üì¶ RentSight ‚Äî Pipeline em camadas (Databricks / Spark)

Esses notebooks s√£o **exemplos read-only** que reproduzem a l√≥gica do seu orquestrador `run_pipeline.py`, s√≥ que em **PySpark**.

Camadas:
- **Bronze**: ingest√£o do CSV (tudo como string) e escrita em Parquet.
- **Silver**: sele√ß√£o de colunas, casts seguros, normaliza√ß√£o/simula√ß√£o de `room_type`, e escrita em Parquet.
- **Gold**: agrega√ß√µes anal√≠ticas e escrita das tabelas finais em Parquet.

‚úÖ Dica: voc√™ pode rodar cada notebook isolado (ele l√™ da camada anterior pelo caminho padr√£o).


## ü•á GOLD ‚Äî Agrega√ß√µes anal√≠ticas

Reproduz a fun√ß√£o `gold(df_silver)` do seu orquestrador:
- Price por bairro/tipo (count, avg, p50, p90)
- Availability por bairro/tipo (count, avg, p50)
- Reviews por bairro (count, avg)
- Cost-benefit ranking (p50_price / (p50_availability_365 + 1))


In [0]:
from pyspark.sql import functions as F

silver_in_path = '/Volumes/rentsight/silver/listings_silver_rj'
gold_out_dir = '/Volumes/rentsight/gold'

print('SILVER IN:', silver_in_path)
print('GOLD OUT DIR:', gold_out_dir)


In [0]:
# 1) L√™ Silver
df = spark.read.parquet(silver_in_path)
print('Rows:', df.count())
display(df.limit(5))


In [0]:
GOLD_PRICE = f"{gold_out_dir}/gold_price_by_neighbourhood_roomtype"
GOLD_AVAIL = f"{gold_out_dir}/gold_availability_by_neighbourhood_roomtype"
GOLD_REVIEWS = f"{gold_out_dir}/gold_reviews_by_neighbourhood"
GOLD_COST_BENEFIT = f"{gold_out_dir}/gold_cost_benefit_ranking"

print(GOLD_PRICE)
print(GOLD_AVAIL)
print(GOLD_REVIEWS)
print(GOLD_COST_BENEFIT)


In [0]:
df_price = df.filter(F.col('price').isNotNull())

gold_price = (
    df_price
    .groupBy('neighbourhood', 'room_type_simulated')
    .agg(
        F.count('id').alias('listings_count'),
        F.avg('price').alias('avg_price'),
        F.percentile_approx('price', 0.50, 10000).alias('p50_price'),
        F.percentile_approx('price', 0.90, 10000).alias('p90_price'),
    )
)

gold_price.write.mode('overwrite').parquet(GOLD_PRICE)
display(gold_price.orderBy('neighbourhood', 'room_type_simulated').limit(10))


In [0]:
df_av = df.filter(F.col('availability_365').isNotNull())
df_av = df_av.withColumn('_availability_float', F.col('availability_365').cast('double'))

gold_avail = (
    df_av
    .groupBy('neighbourhood', 'room_type_simulated')
    .agg(
        F.count('id').alias('listings_count'),
        F.avg(F.col('availability_365').cast('double')).alias('avg_availability_365'),
        F.percentile_approx('_availability_float', 0.50, 10000).alias('p50_availability_365'),
    )
)

gold_avail.write.mode('overwrite').parquet(GOLD_AVAIL)
display(gold_avail.orderBy('neighbourhood', 'room_type_simulated').limit(10))


In [0]:
gold_reviews = (
    df
    .groupBy('neighbourhood')
    .agg(
        F.count('id').alias('listings_count'),
        F.avg(F.col('number_of_reviews').cast('double')).alias('avg_number_of_reviews'),
        F.avg('reviews_per_month').alias('avg_reviews_per_month'),
    )
)

gold_reviews.write.mode('overwrite').parquet(GOLD_REVIEWS)
display(gold_reviews.orderBy('neighbourhood').limit(10))


In [0]:
merged = (
    gold_price
    .join(
        gold_avail.select('neighbourhood', 'room_type_simulated', 'p50_availability_365'),
        on=['neighbourhood', 'room_type_simulated'],
        how='inner'
    )
    .withColumn('cost_benefit_score', F.col('p50_price') / (F.col('p50_availability_365') + F.lit(1.0)))
    .orderBy(F.col('cost_benefit_score').asc())
)

merged.write.mode('overwrite').parquet(GOLD_COST_BENEFIT)
display(merged.limit(20))


In [0]:
print('‚úÖ Gold conclu√≠do!')
print('GOLD DIR:', gold_out_dir)
