Lista o que tem no container

In [0]:
%python
dbutils.fs.ls("abfss://imoveis@datalakepriscylla.dfs.core.windows.net/bronze")

Lê a estrutura do arquivo

In [0]:
path = "abfss://imoveis@datalakepriscylla.dfs.core.windows.net/bronze/dados_brutos_imoveis"
df = spark.read.format("delta").load(path)

In [0]:
display(df)

Parsear JSON (a coluna anuncio veio como um json inteiro em formato string)

In [0]:
from pyspark.sql.functions import col, from_json, expr, regexp_replace
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType,
    ArrayType
)

# 1) Schema do JSON em `anuncio` (string -> struct)
schema = StructType([
    StructField("andar", IntegerType(), True),
    StructField("area_total", ArrayType(StringType()), True),
    StructField("area_util",  ArrayType(StringType()), True),
    StructField("banheiros",  ArrayType(IntegerType()), True),
    StructField("caracteristicas", ArrayType(StringType()), True),
    StructField("endereco", StructType([
        StructField("bairro",    StringType(), True),
        StructField("cep",       StringType(), True),
        StructField("cidade",    StringType(), True),
        StructField("estado",    StringType(), True),
        StructField("latitude",  DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("pais",      StringType(), True),
        StructField("rua",       StringType(), True),
        StructField("zona",      StringType(), True),
    ]), True),
    StructField("id", StringType(), True),  # id do anúncio (pode duplicar com a coluna df.id)
    StructField("quartos", ArrayType(IntegerType()), True),
    StructField("suites",  ArrayType(IntegerType()), True),
    StructField("tipo_anuncio", StringType(), True),
    StructField("tipo_unidade", StringType(), True),
    StructField("tipo_uso",     StringType(), True),
    StructField("vaga", ArrayType(IntegerType()), True),
    StructField("valores", ArrayType(StructType([
        StructField("condominio", StringType(), True),
        StructField("iptu",       StringType(), True),
        StructField("tipo",       StringType(), True),
        StructField("valor",      StringType(), True),
    ])), True),
])

# 2) Parse do JSON
df_parsed = df.withColumn("anuncio", col("anuncio"))

# 3) Flatten: seleciona campos e tira valores de arrays (pega o 1º elemento)
#    (em Spark, element_at usa índice 1-based)
df_flat = df_parsed.select(
    col("id").alias("row_id"),                  # id da sua linha (original)
    col("anuncio.id").alias("anuncio_id"),      # id do anúncio (do JSON)

    col("anuncio.andar").alias("andar"),

    expr("element_at(anuncio.area_total, 1)").alias("area_total_raw"),
    expr("element_at(anuncio.area_util,  1)").alias("area_util_raw"),

    expr("element_at(anuncio.banheiros, 1)").alias("banheiros"),
    expr("element_at(anuncio.quartos,   1)").alias("quartos"),
    expr("element_at(anuncio.suites,    1)").alias("suites"),
    expr("element_at(anuncio.vaga,      1)").alias("vaga"),

    col("anuncio.caracteristicas").alias("caracteristicas"),

    col("anuncio.tipo_anuncio").alias("tipo_anuncio"),
    col("anuncio.tipo_unidade").alias("tipo_unidade"),
    col("anuncio.tipo_uso").alias("tipo_uso"),

    col("anuncio.endereco.bairro").alias("bairro"),
    col("anuncio.endereco.cep").alias("cep"),
    col("anuncio.endereco.cidade").alias("cidade"),
    col("anuncio.endereco.estado").alias("estado"),
    col("anuncio.endereco.pais").alias("pais"),
    col("anuncio.endereco.rua").alias("rua"),
    col("anuncio.endereco.zona").alias("zona"),
    col("anuncio.endereco.latitude").alias("latitude"),
    col("anuncio.endereco.longitude").alias("longitude"),

    expr("element_at(anuncio.valores, 1).tipo").alias("preco_tipo"),
    expr("element_at(anuncio.valores, 1).valor").alias("preco_valor_raw"),
    expr("element_at(anuncio.valores, 1).condominio").alias("condominio_raw"),
    expr("element_at(anuncio.valores, 1).iptu").alias("iptu_raw"),
)

# 4) Casts limpos (se vierem números como string)
df_final = (
    df_flat
      .withColumn("area_total_m2", regexp_replace("area_total_raw", "[^0-9,.-]", "").cast("double"))
      .withColumn("area_util_m2",  regexp_replace("area_util_raw",  "[^0-9,.-]", "").cast("double"))
      .withColumn("preco_valor",   regexp_replace("preco_valor_raw","[^0-9,.-]", "").cast("double"))
      .withColumn("condominio",    regexp_replace("condominio_raw", "[^0-9,.-]", "").cast("double"))
      .withColumn("iptu",          regexp_replace("iptu_raw",       "[^0-9,.-]", "").cast("double"))
      .drop("area_total_raw","area_util_raw","preco_valor_raw","condominio_raw","iptu_raw")
)

display(df_final)


Exclusão de colunas

In [0]:
df_silver = df_final.drop("caracteristicas", "endereco", "row_id")
display(df_silver)

Salvar na camada silver

In [0]:
path = "abfss://imoveis@datalakepriscylla.dfs.core.windows.net/silver/dataset_imoveis"
df_silver.write.format("delta").mode("overwrite").save(path)

