In [28]:
import os
from pyspark.sql import SparkSession

# 1. Ajustando o caminho para navegar entre as pastas
# os.path.dirname(os.getcwd()) -> Sobe para a raiz (SBD2-Austin-Airbnb)
# Depois entra em 'Data Layer' e depois em 'raw'
RAIZ_PROJETO = os.path.dirname(os.getcwd())
BASE_PATH = os.path.join(RAIZ_PROJETO, "Data Layer", "raw")

print(f"Procurando arquivos em: {BASE_PATH}")

# 2. Dicionário com os nomes exatos dos arquivos
arquivos = {
    "Listings": "dados_brutos_listings.csv",
    "Calendar": "dados_brutos_calendar.csv",
    "Reviews": "dados_brutos_reviews.csv"
}

# 3. Validação de existência
for nome, arquivo in arquivos.items():
    caminho_completo = os.path.join(BASE_PATH, arquivo)
    if os.path.exists(caminho_completo):
        print(f"✅ Arquivo {nome} ENCONTRADO!")
    else:
        print(f"❌ ERRO: Arquivo {nome} NÃO encontrado em: {caminho_completo}")

# 4. Iniciar Spark e ler os arquivos
spark = SparkSession.builder \
    .appName("ETL_Austin_Airbnb") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

try:
    # Lendo usando o caminho correto montado acima
    df_listings_raw = spark.read.csv(os.path.join(BASE_PATH, arquivos["Listings"]), header=True, inferSchema=True)
    df_calendar_raw = spark.read.csv(os.path.join(BASE_PATH, arquivos["Calendar"]), header=True, inferSchema=True)
    df_reviews_raw = spark.read.csv(os.path.join(BASE_PATH, arquivos["Reviews"]), header=True, inferSchema=True)
    
    print("\n Sucesso! Os DataFrames da camada RAW foram carregados.")
except Exception as e:
    print(f"\n Falha na leitura: {e}")

Procurando arquivos em: /home/gandalfe/Documentos/sbd2/SBD2-Austin-Airbnb/Data Layer/raw
✅ Arquivo Listings ENCONTRADO!
✅ Arquivo Calendar ENCONTRADO!
✅ Arquivo Reviews ENCONTRADO!

 Sucesso! Os DataFrames da camada RAW foram carregados.


In [33]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# 1. Aplicando try_cast em massa para ignorar erros de deslocamento de coluna no CSV
df_listings = df_listings_raw.select(
    F.expr("try_cast(id as int)").alias("listing_id"),
    F.col("name").alias("listing_name"),
    "property_type", 
    "room_type", 
    "bed_type", 
    F.expr("try_cast(accommodates as int)").alias("accommodates"),
    F.expr("try_cast(bathrooms as double)").alias("bathrooms"),
    F.expr("try_cast(bedrooms as double)").alias("bedrooms"),
    F.expr("try_cast(beds as double)").alias("beds"),
    "neighbourhood_cleansed",
    # Limpeza de preço com try_cast
    F.expr("try_cast(regexp_replace(price, '[^0-9.]', '') as decimal(10,2))").alias("listing_price"),
    F.expr("try_cast(number_of_reviews as int)").alias("number_of_reviews"),
    F.expr("try_cast(first_review as date)").alias("first_review"),
    F.expr("try_cast(last_review as date)").alias("last_review"),
    F.expr("try_cast(host_id as int)").alias("host_id"),
    "host_name"
).fillna({
    "listing_price": 0.0, 
    "number_of_reviews": 0, 
    "accommodates": 0,
    "bedrooms": 0,
    "beds": 0,
    "bathrooms": 0
})

# 2. Garantir que listing_id não seja nulo (pois é PK no banco)
df_listings = df_listings.filter(F.col("listing_id").isNotNull())

# 3. Tratamento do Calendar (também com try_cast por segurança)
df_calendar = df_calendar_raw.select(
    F.expr("try_cast(listing_id as int)").alias("listing_id"),
    F.expr("try_cast(date as date)").alias("calendar_date"),
    F.when(F.col("available") == "t", True).otherwise(False).alias("calendar_available")
).filter(F.col("listing_id").isNotNull() & F.col("calendar_date").isNotNull())

# 4. Join Final
df_silver = df_calendar.join(df_listings, "listing_id", "inner")

# 5. Configuração JDBC e Gravação no Postgres
jdbc_url = "jdbc:postgresql://localhost:5433/austin_airbnb"
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

print(f"Enviando {df_silver.count():,} linhas para o banco...")

try:
    df_silver.write.jdbc(
        url=jdbc_url, 
        table="silver.one_big_table", 
        mode="append", # 'append' porque a tabela já existe via DDL
        properties=db_properties
    )
    print("SUCESSO! O banco de dados foi populado.")
except Exception as e:
    print(f"Erro na carga: {e}")

Enviando 1,033,610 linhas para o banco...


                                                                                

SUCESSO! O banco de dados foi populado.


In [34]:
import psycopg2

conn = psycopg2.connect(host="localhost", port="5433", database="austin_airbnb", user="postgres", password="postgres")
cur = conn.cursor()
cur.execute("SELECT count(*) FROM silver.one_big_table")
print(f"Total de registros no banco: {cur.fetchone()[0]:,}")
cur.close()
conn.close()

Total de registros no banco: 1,033,610


In [None]:
import psycopg2
conn = psycopg2.connect(host="localhost", port="5433", database="austin_airbnb", user="postgres", password="postgres")
cur = conn.cursor()
cur.execute("TRUNCATE TABLE silver.one_big_table;")
conn.commit()
cur.close()
conn.close()
print("Tabela limpa")

Tabela limpa! Pode rodar a carga do Spark agora.
