In [None]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession


# Como o notebook est√° em /Transformer e o na raiz
load_dotenv(os.path.join(os.path.dirname(os.getcwd()), '.env'))

# 2. CONFIGURAR CAMINHOS BASEADOS NA ESTRUTURA DE PASTAS
RAIZ_PROJETO = os.path.dirname(os.getcwd())
BASE_PATH = os.path.join(RAIZ_PROJETO, "Data Layer", "raw")

print(f"Procurando arquivos em: {BASE_PATH}")

# 3. DICION√ÅRIO E VALIDA√á√ÉO
arquivos = {
    "Listings": "dados_brutos_listings.csv",
    "Calendar": "dados_brutos_calendar.csv",
    "Reviews": "dados_brutos_reviews.csv"
}

for nome, arquivo in arquivos.items():
    if os.path.exists(os.path.join(BASE_PATH, arquivo)):
        print(f" Arquivo {nome} ENCONTRADO!")
    else:
        print(f" ERRO: Arquivo {nome} N√ÉO encontrado!")

# 4. INICIAR SPARK 
spark = SparkSession.builder \
    .appName("ETL_Austin_Airbnb") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

# 5. EXTRA√á√ÉO
try:
    df_listings_raw = spark.read.csv(os.path.join(BASE_PATH, arquivos["Listings"]), header=True, inferSchema=True)
    df_calendar_raw = spark.read.csv(os.path.join(BASE_PATH, arquivos["Calendar"]), header=True, inferSchema=True)
    df_reviews_raw = spark.read.csv(os.path.join(BASE_PATH, arquivos["Reviews"]), header=True, inferSchema=True)
    print("\nSucesso! Os DataFrames da camada RAW foram carregados.")
except Exception as e:
    print(f"\nFalha na leitura: {e}")

üîç Procurando arquivos em: /home/mike/Downloads/SBD2-Austin-Airbnb/Data Layer/raw
 Arquivo Listings ENCONTRADO!
 Arquivo Calendar ENCONTRADO!
 Arquivo Reviews ENCONTRADO!

 Sucesso! Os DataFrames da camada RAW foram carregados.


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

print("Iniciando transforma√ß√£o completa dos dados...\n")

# ==================== 1. LIMPAR E TRANSFORMAR LISTINGS ====================
print("Processando LISTINGS...")

df_listings = df_listings_raw.select(
    # IDs e Nomes
    F.expr("try_cast(id as int)").alias("listing_id"),
    F.col("name").cast("string").alias("listing_name"),
    
    # Tipos de Propriedade
    F.col("property_type").cast("string"),
    F.col("room_type").cast("string"),
    F.col("bed_type").cast("string"),
    
    # Capacidade
    F.expr("try_cast(accommodates as int)").alias("accommodates"),
    F.expr("try_cast(bathrooms as int)").alias("bathrooms"),
    F.expr("try_cast(bedrooms as int)").alias("bedrooms"),
    F.expr("try_cast(beds as int)").alias("beds"),
    
    # Localiza√ß√£o
    F.expr("try_cast(neighbourhood_cleansed as int)").alias("neighbourhood_cleansed"),
    F.col("city").cast("string"),
    F.col("state").cast("string"),
    F.expr("try_cast(zipcode as double)").alias("zipcode"),
    F.col("market").cast("string"),
    F.col("country_code").cast("string"),
    F.col("country").cast("string"),
    F.expr("try_cast(latitude as decimal(10,8))").alias("latitude"),
    F.expr("try_cast(longitude as decimal(11,8))").alias("longitude"),
    F.when(F.col("is_location_exact") == "t", True).otherwise(False).alias("is_location_exact"),
    
    # Pre√ßos (limpeza completa)
    F.expr("try_cast(regexp_replace(price, '[^0-9.]', '') as decimal(10,2))").alias("listing_price"),
    F.expr("try_cast(regexp_replace(security_deposit, '[^0-9.]', '') as decimal(10,2))").alias("security_deposit"),
    F.expr("try_cast(regexp_replace(cleaning_fee, '[^0-9.]', '') as decimal(10,2))").alias("cleaning_fee"),
    F.expr("try_cast(guests_included as int)").alias("guests_included"),
    F.expr("try_cast(regexp_replace(extra_people, '[^0-9.]', '') as decimal(10,2))").alias("extra_people"),
    
    # Pol√≠ticas
    F.expr("try_cast(minimum_nights as int)").alias("minimum_nights"),
    F.expr("try_cast(maximum_nights as int)").alias("maximum_nights"),
    F.when(F.col("instant_bookable") == "t", True).otherwise(False).alias("instant_bookable"),
    F.col("cancellation_policy").cast("string"),
    F.when(F.col("require_guest_profile_picture") == "t", True).otherwise(False).alias("require_guest_profile_picture"),
    F.when(F.col("require_guest_phone_verification") == "t", True).otherwise(False).alias("require_guest_phone_verification"),
    
    # Disponibilidade
    F.expr("try_cast(availability_30 as int)").alias("availability_30"),
    F.expr("try_cast(availability_60 as int)").alias("availability_60"),
    F.expr("try_cast(availability_90 as int)").alias("availability_90"),
    F.expr("try_cast(availability_365 as int)").alias("availability_365"),
    
    # Reviews
    F.expr("try_cast(number_of_reviews as int)").alias("number_of_reviews"),
    F.expr("try_cast(first_review as date)").alias("first_review"),
    F.expr("try_cast(last_review as date)").alias("last_review"),
    F.expr("try_cast(reviews_per_month as decimal(5,2))").alias("reviews_per_month"),
    F.expr("try_cast(review_scores_rating as decimal(4,2))").alias("review_scores_rating"),
    F.expr("try_cast(review_scores_accuracy as decimal(4,2))").alias("review_scores_accuracy"),
    F.expr("try_cast(review_scores_cleanliness as decimal(4,2))").alias("review_scores_cleanliness"),
    F.expr("try_cast(review_scores_checkin as decimal(4,2))").alias("review_scores_checkin"),
    F.expr("try_cast(review_scores_communication as decimal(4,2))").alias("review_scores_communication"),
    F.expr("try_cast(review_scores_location as decimal(4,2))").alias("review_scores_location"),
    F.expr("try_cast(review_scores_value as decimal(4,2))").alias("review_scores_value"),
    F.col("amenities").cast("string"),
    
    # Host
    F.expr("try_cast(host_id as int)").alias("host_id"),
    F.col("host_name").cast("string"),
    F.expr("try_cast(host_since as date)").alias("host_since"),
    F.col("host_location").cast("string"),
    F.col("host_response_time").cast("string"),
    F.col("host_response_rate").cast("string"),
    F.col("host_acceptance_rate").cast("string"),
    F.when(F.col("host_is_superhost") == "t", True).otherwise(False).alias("host_is_superhost"),
    F.col("host_neighbourhood").cast("string"),
    F.expr("try_cast(host_listings_count as int)").alias("host_listings_count"),
    F.expr("try_cast(host_total_listings_count as int)").alias("host_total_listings_count"),
    F.col("host_verifications").cast("string"),
    F.when(F.col("host_has_profile_pic") == "t", True).otherwise(False).alias("host_has_profile_pic"),
    F.when(F.col("host_identity_verified") == "t", True).otherwise(False).alias("host_identity_verified"),
    F.expr("try_cast(calculated_host_listings_count as int)").alias("calculated_host_listings_count")
).filter(F.col("listing_id").isNotNull())

print(f"   {df_listings.count():,} listings processados")

# ==================== 2. LIMPAR CALENDAR ====================
print("\nProcessando CALENDAR...")

df_calendar = df_calendar_raw.select(
    F.expr("try_cast(listing_id as int)").alias("listing_id"),
    F.expr("try_cast(date as date)").alias("calendar_date"),
    F.when(F.col("available") == "t", True).otherwise(False).alias("calendar_available"),
    F.expr("try_cast(regexp_replace(price, '[^0-9.]', '') as decimal(10,2))").alias("calendar_price")
).filter(F.col("listing_id").isNotNull() & F.col("calendar_date").isNotNull())

print(f"   {df_calendar.count():,} registros de calendar")

# ==================== 3. LIMPAR REVIEWS ====================
print("\nProcessando REVIEWS...")

df_reviews = df_reviews_raw.select(
    F.expr("try_cast(listing_id as int)").alias("listing_id"),
    F.expr("try_cast(id as int)").alias("review_id"),
    F.expr("try_cast(date as date)").alias("review_date"),
    F.expr("try_cast(reviewer_id as int)").alias("reviewer_id"),
    F.col("reviewer_name").cast("string")
).filter(F.col("listing_id").isNotNull() & F.col("review_id").isNotNull())

print(f"   {df_reviews.count():,} reviews")

# ==================== 4. CONSOLIDAR (ONE BIG TABLE) ====================
print("\nConsolidando dados...")

# INNER JOIN Calendar + Listings (s√≥ registros com listings v√°lidos)
# Isso garante que is_location_exact e outras colunas obrigat√≥rias n√£o sejam NULL
df_silver = df_calendar.join(df_listings, "listing_id", "inner")

# LEFT JOIN Reviews (para manter registros sem reviews)
df_silver = df_silver.join(df_reviews, ["listing_id"], "left")

print(f"   {df_silver.count():,} registros ap√≥s joins")

# ==================== 5. REMOVER OUTLIERS DE PRE√áO ====================
print("\nRemovendo outliers de CALENDAR_PRICE (IQR method)...")

# Filtrar outliers baseado no pre√ßo di√°rio do calendar, n√£o do listing
percentis = df_silver.approxQuantile("calendar_price", [0.25, 0.75], 0.01)
Q1, Q3 = percentis[0], percentis[1]
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR

df_silver = df_silver.filter(
    (F.col("calendar_price") >= lower) & 
    (F.col("calendar_price") <= upper) &
    (F.col("calendar_price") > 0)
)

print(f"   {df_silver.count():,} registros finais (limites: ${lower:.2f} - ${upper:.2f})")

# ==================== 6. CARREGAR NO POSTGRES ====================
print("\nCarregando no PostgreSQL...")

jdbc_url = "jdbc:postgresql://localhost:5433/austin_airbnb"
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

try:
    df_silver.write.jdbc(
        url=jdbc_url, 
        table="silver.one_big_table", 
        mode="append",
        properties=db_properties
    )
    print("\nSUCESSO! Banco de dados populado com dados completos!")
    print(f"   Total: {df_silver.count():,} registros")
    print(f"   Colunas: {len(df_silver.columns)}")
except Exception as e:
    print(f"\nErro na carga: {e}")

üîÑ Iniciando transforma√ß√£o completa dos dados...

üìã Processando LISTINGS...
   ‚úÖ 5,764 listings processados

üìÖ Processando CALENDAR...
   ‚úÖ 1,048,575 registros de calendar

‚≠ê Processando REVIEWS...
   ‚úÖ 62,976 reviews

üîó Consolidando dados...
   ‚úÖ 11,610,945 registros ap√≥s joins

üßπ Removendo outliers de CALENDAR_PRICE (IQR method)...


                                                                                

   ‚úÖ 7,932,919 registros finais (limites: $-76.00 - $356.00)

üíæ Carregando no PostgreSQL...


                                                                                


‚úÖ SUCESSO! Banco de dados populado com dados completos!
   üìä Total: 7,932,919 registros
   üìã Colunas: 68


In [24]:
import psycopg2

conn = psycopg2.connect(host="localhost", port="5433", database="austin_airbnb", user="postgres", password="postgres")
cur = conn.cursor()
cur.execute("SELECT count(*) FROM silver.one_big_table")
print(f"Total de registros no banco: {cur.fetchone()[0]:,}")
cur.close()
conn.close()

Total de registros no banco: 7,932,919


In [None]:
import psycopg2
from psycopg2.extras import RealDictCursor

try:
    # 1. Conectar ao banco
    conn = psycopg2.connect(
        host="localhost", 
        port="5433", 
        database="austin_airbnb", 
        user="postgres", 
        password="postgres"
    )
    
    # 2. Criar um cursor (usando RealDictCursor para ver os nomes das colunas como um dicion√°rio)
    cur = conn.cursor(cursor_factory=RealDictCursor)
    
    # 3. Executar a consulta para pegar 10 linhas
    query = "SELECT listing_id, calendar_date, listing_price, listing_name FROM silver.one_big_table LIMIT 10;"
    cur.execute(query)
    
    # 4. Recuperar os resultados
    rows = cur.fetchall()
    
    print(f"--- Exibindo {len(rows)} linhas da tabela silver.one_big_table ---\n")
    
    # 5. Iterar e imprimir de forma organizada
    for row in rows:
        print(f"ID: {row['listing_id']} | Data: {row['calendar_date']} | Pre√ßo: ${row['listing_price']} | Nome: {row['listing_name'][:30]}...")

except Exception as e:
    print(f"Erro ao consultar o banco: {e}")

finally:
    # 6. Fechar conex√£o sempre!
    if cur: cur.close()
    if conn: conn.close()

--- Exibindo 10 linhas da tabela silver.one_big_table ---

ID: 3862246 | Data: 2015-11-09 | Pre√ßo: $None | Nome: East Side Bungalow / Private S...
ID: 3862246 | Data: 2015-11-09 | Pre√ßo: $None | Nome: East Side Bungalow / Private S...
ID: 3862246 | Data: 2015-11-09 | Pre√ßo: $None | Nome: East Side Bungalow / Private S...
ID: 5785387 | Data: 2016-02-07 | Pre√ßo: $None | Nome: Cute Shabby Chic Room in E. Au...
ID: 3862246 | Data: 2015-11-09 | Pre√ßo: $None | Nome: East Side Bungalow / Private S...
ID: 5785387 | Data: 2016-02-07 | Pre√ßo: $None | Nome: Cute Shabby Chic Room in E. Au...
ID: 3862246 | Data: 2015-11-09 | Pre√ßo: $None | Nome: East Side Bungalow / Private S...
ID: 5785387 | Data: 2016-02-07 | Pre√ßo: $None | Nome: Cute Shabby Chic Room in E. Au...
ID: 3862246 | Data: 2015-11-09 | Pre√ßo: $None | Nome: East Side Bungalow / Private S...
ID: 5785387 | Data: 2016-02-08 | Pre√ßo: $None | Nome: Cute Shabby Chic Room in E. Au...
