In [0]:
from pyspark.sql.functions import col, when, concat_ws, to_date

df_disasters = spark.table("bronze.bronze_disasters")

In [0]:
# Checando a lista de colunas do tipo string e atribuindo à uma variável de lista.
string_columns = [f.name for f in df_disasters.schema.fields if f.dataType.typeName == 'string']

df_normalizado = df_disasters

# Criando um novo DataFrame e aplicando as transformações de lower() e trim() nos campos de string
for coluna in string_columns:
    df_normalizado = df_normalizado.withColumn(
        coluna,
        lower(trim(col(coluna)))
    )

# Removendo caracteres invisíveis 
for coluna in string_columns:
    df_normalizado = df_normalizado.withColumn(
        coluna,
        regexp_replace(col(coluna), r'[\u0000-\u001F\u007F-\u009F]','')
    )

# Exibindo o schema para validar as transformações
#df_normalizado.printSchema()
df_normalizado.limit(10).display()


In [0]:
from pyspark.sql import functions as F

df_silver = (
    df_normalizado
    # Substitui valores que indicam nulo pelo padrão do Spark (None)
    .replace(["null", "n/a", "na", "unknow", "unknown", ""], None)

    # Converte campos booleanos
    .withColumn(
        "ofda_response",
        F.when(F.col("ofda_response").isin("yes", "y", "1", "true"), True)
         .when(F.col("ofda_response").isin("no", "n", "0", "false"), False)
         .otherwise(None)
    )
    .withColumn(
        "appeal",
        F.when(F.col("appeal").isin("yes", "y", "1", "true"), True)
         .when(F.col("appeal").isin("no", "n", "0", "false"), False)
         .otherwise(None)
    )
    .withColumn(
        "declaration",
        F.when(F.col("declaration").isin("yes", "y", "1", "true"), True)
         .when(F.col("declaration").isin("no", "n", "0", "false"), False)
         .otherwise(None)
    )

    # Conversões numéricas simples
    .withColumn("year", F.col("year").cast("int"))
    .withColumn("start_year", F.col("start_year").cast("int"))
    .withColumn("start_month", F.col("start_month").cast("int"))
    .withColumn("start_day", F.col("start_day").cast("int"))
    .withColumn("end_year", F.col("end_year").cast("int"))
    .withColumn("end_month", F.col("end_month").cast("int"))
    .withColumn("end_day", F.col("end_day").cast("int"))

    # Latitude com limpeza segura
    .withColumn(
        "latitude_clean",
        F.when(F.col("latitude").isNotNull(),
               F.when(F.col("latitude").rlike("(?i)[Ss]"),
                      -F.expr("try_cast(regexp_replace(latitude, '[^0-9\\.\\-]', '') as double)"))
                .otherwise(F.expr("try_cast(regexp_replace(latitude, '[^0-9\\.\\-]', '') as double)"))
        ).otherwise(None)
    )

    # Longitude com limpeza segura
    .withColumn(
        "longitude_clean",
        F.when(F.col("longitude").isNotNull(),
               F.when(F.col("longitude").rlike("(?i)[Ww]"),
                      -F.expr("try_cast(regexp_replace(longitude, '[^0-9\\.\\-]', '') as double)"))
                .otherwise(F.expr("try_cast(regexp_replace(longitude, '[^0-9\\.\\-]', '') as double)"))
        ).otherwise(None)
    )

    # Outros campos numéricos
    .withColumn("total_deaths", F.col("total_deaths").cast("int"))
    .withColumn("no_injured", F.col("no_injured").cast("int"))
    .withColumn("no_affected", F.col("no_affected").cast("int"))
    .withColumn("no_homeless", F.col("no_homeless").cast("int"))
    .withColumn("total_affected", F.col("total_affected").cast("int"))
    .withColumn("reconstruction_costs_000_us", F.col("reconstruction_costs_000_us").cast("double"))
    .withColumn("insured_damages_000_us", F.col("insured_damages_000_us").cast("double"))
    .withColumn("total_damages_000_us", F.col("total_damages_000_us").cast("double"))

    # Datas
    .withColumn(
        "start_date",
        F.to_timestamp(F.concat_ws("-", F.col("start_year"), F.col("start_month"), F.col("start_day")))
    )
    .withColumn(
        "end_date",
        F.to_timestamp(F.concat_ws("-", F.col("end_year"), F.col("end_month"), F.col("end_day")))
    )

    # Padroniza país e região
    .withColumn("country", F.trim(F.initcap(F.col("country"))))
    .withColumn("region", F.trim(F.initcap(F.col("region"))))

    # Substitui latitude/longitude originais
    .drop("latitude", "longitude")
    .withColumnRenamed("latitude_clean", "latitude")
    .withColumnRenamed("longitude_clean", "longitude")
)

df_silver.printSchema()


In [0]:
df_silver.write.format("delta").mode("overwrite").saveAsTable("silver.silver_disasters")

