No Databricks, temos a flexibilidade de utilizar várias linguagens de programação, como Python, Scala e R, para manipular e processar dados usando os recursos do Spark.

No projeto atual, a linguagem Scala foi escolhida devido à preferência da empresa contratante. No entanto, é importante ressaltar que todas as transformações realizadas em Scala podem ser replicadas no Pyspark que é uma biblioteca Python que fornece uma interface para programação com o Apache Spark.

Com base nisso, o desafio proposto é o seguinte:

- Crie um novo notebook no Databricks.
- Utilize o Pyspark para aplicar todas as transformações necessárias nos dados, seguindo as etapas dos notebooks "inbound_to_bronze" e "bronze_to_silver".
- Certifique-se de que todas as transformações estão corretamente implementadas e que os dados resultantes são consistentes.

Dessa forma, você terá a oportunidade de aprimorar suas habilidades em Python e utilizar o poderoso conjunto de recursos do Spark para processar os dados de forma eficiente.

Utilizando tanto a linguagem Scala com a API do Spark quanto o Pyspark, os códigos utilizados para aplicar as transformações vão ser basicamente os mesmos. O que vai mudar são apenas peculiaridades do Scala para o Python, como a necessidade de sempre declarar uma variável nova e utilizar o "val" para isso e também a importação de bibliotecas.

## Códigos do notebook inbound_to_bronze utilizando Python:

### Lendo os dados na camada inbound:

In [0]:
path = "/mnt/dados/inbound/dados_brutos_imoveis.json"
dados = spark.read.json(path)
display(dados)

### Removendo as colunas:

In [0]:
df_bronze = dados.drop("imagens", "usuario")
display(df_bronze)

### Criando a coluna de identificação:

In [0]:
from pyspark.sql.functions import col

df_bronze = df_bronze.withColumn("id", col("anuncio.id"))
display(df_bronze)

### Salvando na camada bronze:

In [0]:
path = "dbfs:/mnt/dados/bronze/dataset_imoveis_py"
df_bronze.write.format("delta").mode("overwrite").save(path)

## Códigos do notebook bronze_to_silver utilizando Python:

### Lendo os dados na camada bronze:

In [0]:
path = "dbfs:/mnt/dados/bronze/dataset_imoveis_py/"
df = spark.read.format("delta").load(path)
display(df)

### Transformando os campos do json em colunas:

In [0]:
display(df.select("anuncio.*"))

In [0]:
display(
  df.select("anuncio.*", "anuncio.endereco.*")
)

In [0]:
df_silver = df.select("anuncio.*", "anuncio.endereco.*")
display(df_silver)

### Removendo colunas:

In [0]:
df_silver = df_silver.drop("caracteristicas", "endereco")
display(df_silver)

### Salvando na camada silver:

In [0]:
path = "dbfs:/mnt/dados/silver/dataset_imoveis_py"
df_silver.write.format("delta").mode("overwrite").save(path)