Arquitetura Proposta:

Processamento de Dados: Apache Spark no Databricks para processamento distribuído.
Banco de Dados Distribuído:  Delta Lake para garantir transações ACID e versionamento dos dados.
Ferramentas de Consulta e Visualização: Utilizar notebooks Databricks para consultas interativas e visualizações.

Particionamento por uf e ano_cadastro:
Particionamento por uf e ano_cadastro permite consultas eficientes por estado e ano, otimizando a distribuição dos dados e melhorando o desempenho das operações de leitura/escrita.

Esquema de Dados: uf (String): Unidade federativa.
municipio (String): Município.
codigo_ibge (Integer): Código IBGE do município.
area_do_imovel (Double): Área total do imóvel.
registro_car (String): Registro do CAR.
situacao_cadastro (String): Situação do cadastro.
condicao_cadastro (String): Condição do cadastro.
area_liquida (Double): Área líquida.
area_remanescente_vegetacao_nativa (Double): Área remanescente de vegetação nativa.
area_reserva_legal_proposta (Double): Área de reserva legal proposta.
area_preservacao_permanente (Double): Área de preservação permanente.
area_nao_classificada (Double): Área não classificada.
solicitacao_adesao_pra (Boolean): Solicitação de adesão ao PRA.
latitude (Double): Latitude.
longitude (Double): Longitude.
data_inscricao (Date): Data de inscrição.
data_alteracao_condicao_cadastro (Date): Data de alteração da condição de cadastro.
area_rural_consolidada (Double): Área rural consolidada.
area_servidao_admin (Double): Área de servidão administrativa.

###Criação da estrutura de diretórios

In [0]:
dbutils.fs.mkdirs('/FileStore/Ada/BigData/csv')
dbutils.fs.mkdirs('/FileStore/Ada/BigData/raw')
dbutils.fs.mkdirs('/FileStore/Ada/BigData/bronze')
dbutils.fs.mkdirs('/FileStore/Ada/BigData/silver')
dbutils.fs.mkdirs('/FileStore/Ada/BigData/gold')

Out[7]: True

###Importa arquivo .zip e descompacta

In [0]:
import os

zip_path = "dbfs:/FileStore/temas_ambientais.zip"
extracted_dir = "dbfs:/FileStore/"
local_zip_path = "/tmp/temas_ambientais.zip"
dbutils.fs.cp(zip_path, local_zip_path)
os.system(f'unzip {local_zip_path} -d {extracted_dir}')




In [0]:
import pandas as pd

caminho_arquivo_csv = "dbfs:/FileStore/temas_ambientais.csv"
df = pd.read_csv(caminho_arquivo_csv, sep=';')
print(df.head())


   uf              municipio  codigo_ibge  area_do_imovel  \
0  GO                Nazário      5214408        119.6326   
1  SC                Meleiro      4210803          7.5340   
2  GO              Nova Roma      5214903         19.4883   
3  GO  Santa Helena de Goiás      5219308         22.9340   
4  PR      Cornélio Procópio      4106407         10.9560   

                                  registro_car situacao_cadastro  \
0  GO-5214408-3AEF2043582E40238C0F84A553686CA7                AT   
1  SC-4210803-BC127B0EC8DB49AC9D46D723286241A2                AT   
2  GO-5214903-7F58049BD79046E9A904CC81C5AC177A                PE   
3  GO-5219308-6478196E75CF4F65800ACA0758575820                PE   
4  PR-4106407-0F06081500254BE3A479EE8EFFDD5319                AT   

                                   condicao_cadastro  area_liquida  \
0  Analisado com pendências, aguardando retificaç...      119.6326   
1  Aguardando análise, não passível de revisão de...        7.5340   
2  Analisado c

In [0]:
display(df)

uf,municipio,codigo_ibge,area_do_imovel,registro_car,situacao_cadastro,condicao_cadastro,area_liquida,area_remanescente_vegetacao_nativa,area_reserva_legal_proposta,area_preservacao_permanente,area_nao_classificada,solicitacao_adesao_pra,latitude,longitude,data_inscricao,data_alteracao_condicao_cadastro,area_rural_consolidada,area_servidao_administrativa,tipo_imovel_rural,modulos_fiscais,area_uso_restrito,area_reserva_legal_averbada,area_reserva_legal_aprovada_nao_averbada,area_pousio,data_ultima_retificacao
GO,Nazário,5214408,119.6326,GO-5214408-3AEF2043582E40238C0F84A553686CA7,AT,"Analisado com pendências, aguardando retificação e/ou apresentação de documentos",119.6326,6.43202795367569,6.432,5.29751084282892,0.001022956103809,Sim,-16.5923058689987,-49.9019017039191,2014-05-07 16:01:44.305,,112.301149046683,0.0,IRU,5.4378,0.0,0.0,0.0,0.0,2014-05-07 16:01:44.305
SC,Meleiro,4210803,7.534,SC-4210803-BC127B0EC8DB49AC9D46D723286241A2,AT,"Aguardando análise, não passível de revisão de dados",7.534,5.51785410336982,0.0,0.0,0.0015934931798488,Nao,-28.7930798512303,-49.6472023744097,2014-05-07 16:02:02.915,,2.01445162492469,0.0,IRU,0.4186,0.0,1.506,0.0,0.0,2014-05-07 16:02:02.915
GO,Nova Roma,5214903,19.4883,GO-5214903-7F58049BD79046E9A904CC81C5AC177A,PE,"Analisado com pendências, aguardando retificação e/ou apresentação de documentos",19.36,19.4882633569717,3.872,0.0,0.0,Nao,-13.6370551503248,-47.0339670619739,2014-05-07 16:06:15.777,,0.0,0.0,IRU,0.2784,0.0,0.0,0.0,0.0,2014-05-07 16:06:15.777
GO,Santa Helena de Goiás,5219308,22.934,GO-5219308-6478196E75CF4F65800ACA0758575820,PE,"Analisado com pendências, aguardando retificação",22.88,1.5021496193707,1.5028,0.568927694143718,18.2335997514784,Sim,-17.8327877579625,-50.6015145422339,2014-05-07 17:49:36.938,,3.08289042473435,0.0,IRU,1.1467,0.0,0.0,0.0,0.0,2014-05-07 17:49:36.938
PR,Cornélio Procópio,4106407,10.956,PR-4106407-0F06081500254BE3A479EE8EFFDD5319,AT,Em análise,10.956,0.0,0.0,0.0,0.161657419734154,Nao,-23.1841101335722,-50.6715645967419,2014-05-07 17:52:55.333,,10.7942669028953,0.0,IRU,0.6087,0.0,0.0,0.0,0.0,2014-05-07 17:52:55.333
GO,Buriti de Goiás,5203939,37.3628,GO-5203939-24B66714683B47D1A96ED4AB5541A78D,AT,"Analisado com pendências, aguardando retificação e/ou apresentação de documentos",37.271,0.0,7.4542,3.48182904076294,36.6495720619986,Sim,-16.2164807816082,-50.4953581696022,2014-05-07 18:10:09.195,,0.0,0.0,IRU,1.6983,0.0,0.0,0.0,0.0,2014-05-07 18:10:09.195
GO,Planaltina,5217609,97.9881,GO-5217609-5AF3970F46CD4FF29FFDAC79F72610C1,PE,"Analisado com pendências, aguardando retificação e/ou apresentação de documentos",98.1345,0.0,21.8673,2.12858352269074,97.6714521959102,Nao,-15.1179720733421,-47.8383739460241,2014-05-08 00:02:44.538,,0.0,0.0,IRU,2.7997,0.0,0.0,0.0,0.0,2014-05-08 00:02:44.538
PR,Jandaia do Sul,4112108,6.1533,PR-4112108-9A8173321C6C49A8B4D12A5379ACD56B,AT,"Aguardando análise, não passível de revisão de dados",6.1533,0.0,0.0,0.0,6.14899874393451,Sim,-23.6224664264801,-51.6578666515836,2014-05-08 08:00:36.973,,0.0,0.0,IRU,0.3846,0.0,0.0,0.0,0.0,2014-05-08 08:00:36.973
PR,Jandaia do Sul,4112108,4.9027,PR-4112108-A5B59AF15D6642E4BCFBEEA253497168,AT,"Aguardando análise, não passível de revisão de dados",4.9027,0.0,0.0,0.0,4.89928986130661,Sim,-23.6224240639671,-51.6578657246737,2014-05-08 08:03:25.423,,0.0,0.0,IRU,0.3064,0.0,0.0,0.0,0.0,2014-05-08 08:03:25.423
GO,Formosa,5208004,5.4097,GO-5208004-4F872D13349C49BE8C9120FF56C7434F,PE,"Analisado com pendências, aguardando retificação e/ou apresentação de documentos",5.4139,0.0,1.1026,2.27859310705466,1.21704276768816,Sim,-15.5759058160859,-47.347320962841,2014-05-08 08:36:37.958,,3.76410267729312,0.0,IRU,0.1352,0.0,0.0,0.0,1.21609669322065,2014-05-08 08:36:37.958


###Criação dos schemas

In [0]:
%sql
create schema raw;
create schema bronze;
create schema silver;
create schema gold;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-97822715669457>:10[0m
[1;32m      8[0m     display(df)
[1;32m      9[0m     [38;5;28;01mreturn[39;00m df
[0;32m---> 10[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m     11[0m [38;5;28;01mfinally[39;00m:
[1;32m     12[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-97822715669457>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mY3JlYXR

### Criação da tabela na camada raw

In [0]:
%sql
use raw;

CREATE TABLE IF NOT EXISTS temas_ambientais(
  uf STRING
  , municipio STRING
  , codigo_ibge STRING
  , area_do_imovel STRING
  , registro_car STRING
  , situacao_cadastro STRING
  , condicao_cadastro STRING
  , area_liquida STRING
  , area_remanescente_vegetacao_nativa STRING
  , area_reserva_legal_proposta STRING
  , area_preservacao_permanente STRING
  , area_nao_classificada STRING
  , solicitacao_adesao_pra STRING
  , latitude STRING
  , longitude STRING
  , data_inscricao STRING
  , data_alteracao_condicao_cadastro STRING
  , area_rural_consolidada STRING
  , area_servidao_administrativa STRING
  , tipo_imovel_rural STRING
  , modulos_fiscais STRING
  , area_uso_restrito STRING
  , area_reserva_legal_averbada STRING
  , area_reserva_legal_aprovada_nao_averbada STRING
  , area_pousio STRING
  , data_ultima_retificacao STRING
  , created_at STRING
  , source_file STRING
)
LOCATION '/FileStore/Ada/BigData/raw'
partitioned by (uf);

### Criação da tabela na camada bronze

In [0]:
%sql
use bronze;

CREATE TABLE IF NOT EXISTS temas_ambientais
(
  uf STRING
  , municipio STRING
  , codigo_ibge DOUBLE
  , area_do_imovel DOUBLE
  , registro_car STRING
  , situacao_cadastro STRING
  , condicao_cadastro STRING
  , area_liquida DOUBLE
  , area_remanescente_vegetacao_nativa DOUBLE
  , area_reserva_legal_proposta DOUBLE
  , area_preservacao_permanente DOUBLE
  , area_nao_classificada DOUBLE
  , solicitacao_adesao_pra STRING
  , latitude DOUBLE
  , longitude DOUBLE
  , data_inscricao DATE
  , data_alteracao_condicao_cadastro DATE
  , area_rural_consolidada DOUBLE
  , area_servidao_administrativa DOUBLE
  , tipo_imovel_rural STRING
  , modulos_fiscais DOUBLE
  , area_uso_restrito DOUBLE
  , area_reserva_legal_averbada DOUBLE
  , area_reserva_legal_aprovada_nao_averbada DOUBLE
  , area_pousio DOUBLE
  , data_ultima_retificacao DATE
  , created_at timestamp
  , source_file STRING
)
COMMENT "Tabela da camada BRONZE com os dados dos temas ambientais"
LOCATION '/FileStore/Ada/BigData/bronze'
partitioned by (uf);

### Criação da tabela na camada silver

In [0]:
%sql
use silver;

CREATE TABLE IF NOT EXISTS temas_ambientais
(
  uf STRING
  , municipio STRING
  , codigo_ibge DOUBLE
  , area_do_imovel DOUBLE
  , registro_car STRING
  , situacao_cadastro STRING
  , condicao_cadastro STRING
  , area_liquida DOUBLE
  , area_remanescente_vegetacao_nativa DOUBLE
  , area_reserva_legal_proposta DOUBLE
  , area_preservacao_permanente DOUBLE
  , area_nao_classificada DOUBLE
  , solicitacao_adesao_pra STRING
  , latitude DOUBLE
  , longitude DOUBLE
  , data_inscricao DATE
  , data_alteracao_condicao_cadastro DATE
  , area_rural_consolidada DOUBLE
  , area_servidao_administrativa DOUBLE
  , tipo_imovel_rural STRING
  , modulos_fiscais DOUBLE
  , area_uso_restrito DOUBLE
  , area_reserva_legal_averbada DOUBLE
  , area_reserva_legal_aprovada_nao_averbada DOUBLE
  , area_pousio DOUBLE
  , data_ultima_retificacao DATE
  , created_at timestamp
  , source_file STRING
)
COMMENT "Tabela da camada SILVER com os dados dos temas ambientais"
LOCATION '/FileStore/Ada/BigData/silver'
partitioned by (uf);

### Ingestão dos dados na tabela raw


In [0]:
dbutils.fs.ls("FileStore/Ada/BigData/csv/")

Out[25]: [FileInfo(path='dbfs:/FileStore/Ada/BigData/csv/temas_ambientais-1.zip', name='temas_ambientais-1.zip', size=726909246, modificationTime=1716378304000),
 FileInfo(path='dbfs:/FileStore/Ada/BigData/csv/temas_ambientais.zip', name='temas_ambientais.zip', size=726909246, modificationTime=1716376959000)]

### Delete das tabelas

In [0]:
%sql
drop table if exists missing_zone.temas_ambientais;
drop table if exists silver_zone.temas_ambientais;
drop table if exists bronze_zone.temas_ambientais;
drop table if exists raw_zone.temas_ambientais;

###Delete dos schemas

In [0]:
%sql
drop schema if exists missing_zone;
drop schema if exists silver_zone;
drop schema if exists bronze_zone;
drop schema if exists raw_zone;

###Converter o DataFrame para PySpark

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Cadastro Ambiental Rural").getOrCreate()

if isinstance(df, pd.DataFrame):
    df = spark.createDataFrame(df)

df.show(5)
df.printSchema()


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1971494851123052>:5[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m [38;5;28;01mimport[39;00m SparkSession
[1;32m      3[0m spark [38;5;241m=[39m SparkSession[38;5;241m.[39mbuilder[38;5;241m.[39mappName([38;5;124m"[39m[38;5;124mCadastro Ambiental Rural[39m[38;5;124m"[39m)[38;5;241m.[39mgetOrCreate()
[0;32m----> 5[0m [38;5;28;01mif[39;00m [38;5;28misinstance[39m(df, pd[38;5;241m.[39mDataFrame):
[1;32m      6[0m     df [38;5;241m=[39m spark[38;5;241m.[39mcreateDataFrame(df)
[1;32m      8[0m df[38;5;241m.[39mshow([38;5;241m5[39m)

[0;31mNameError[0m: name 'df' is not defined

###Particionar e Salvar os Dados no Delta Lake

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year

# otimizar
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df = df.withColumn("ano_cadastro", year("data_inscricao"))


df.cache()
df = df.repartition(200)

df.write.format("delta").mode("overwrite").partitionBy("uf", "ano_cadastro").save("/mnt/delta/cadastro_ambiental_rural")


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1971494851123056>:8[0m
[1;32m      5[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.sql.shuffle.partitions[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m200[39m[38;5;124m"[39m)
[1;32m      6[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.sql.execution.arrow.enabled[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)
[0;32m----> 8[0m df [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mano_cadastro[39m[38;5;124m"[39m, year([38;5;124m"[39m[38;5;124mdata_inscricao[39m[38;5;124m"[39m))
[1;32m     11[0m df[38;5;241m.[39mcache()
[1;32m     12[0m df [38;5;241m=[39m df[38;5;241m.[39mrepartition([38;5;241m200[39m)

[0;31mNameError[0m: name '

%md
###Tratamento dos schemas

In [0]:
# Carregar os dados brutos no esquema raw
df.write.format("delta").mode("overwrite").saveAsTable("raw.cadastro_ambiental_rural")

# Limpar os dados e armazenar no esquema bronze
df_bronze = spark.sql("""
SELECT DISTINCT *
FROM raw.cadastro_ambiental_rural
""")
df_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze.cadastro_ambiental_rural")





[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1971494851123072>:2[0m
[1;32m      1[0m [38;5;66;03m# Carregar os dados brutos no esquema raw[39;00m
[0;32m----> 2[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mraw.cadastro_ambiental_rural[39m[38;5;124m"[39m)
[1;32m      4[0m [38;5;66;03m# Limpar os dados e armazenar no esquema bronze[39;00m
[1;32m      5[0m df_bronze [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      6[0m [38;5;124mSELECT DISTINCT *[39m
[1;32m      7[0m [38;5;124mFROM raw.cadastro_ambiental_rural[39m
[1;32m      8[0m [38;5;124m"""[39m)

[0;31mNameError[0m: name 'df' is not de

In [0]:
#silver
from pyspark.sql.functions import year

# Adicionar uma coluna com o ano de cadastro, extraído de data_inscricao
df_silver = df_bronze.withColumn("ano_cadastro", year("data_inscricao"))

# Reparticionar o DataFrame
df_silver_repartitioned = df_silver.repartition(100, "uf", "ano_cadastro")

# Salvar o DataFrame particionado por uf e ano_cadastro no Delta Lake
df_silver_repartitioned.write.format("delta").mode("overwrite").partitionBy("uf", "ano_cadastro").saveAsTable("silver.cadastro_ambiental_rural")


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-939068778938099>:5[0m
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m year
[1;32m      4[0m [38;5;66;03m# Adicionar uma coluna com o ano de cadastro, extraído de data_inscricao[39;00m
[0;32m----> 5[0m df_silver [38;5;241m=[39m df_bronze[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mano_cadastro[39m[38;5;124m"[39m, year([38;5;124m"[39m[38;5;124mdata_inscricao[39m[38;5;124m"[39m))

[0;31mNameError[0m: name 'df_bronze' is not defined

###Soma de área (em hectares) para propriedades no MS e MT:

In [0]:
result_ms_mt = spark.sql("""
SELECT uf, SUM(area_do_imovel) as soma_area
FROM silver.cadastro_ambiental_rural
WHERE uf IN ('MS', 'MT')
GROUP BY uf
ORDER BY soma_area DESC
""")
result_ms_mt.write.format("delta").mode("overwrite").saveAsTable("gold.soma_area_ms_mt")



[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1971494851123057>:1[0m
[0;32m----> 1[0m result_ms_mt [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      2[0m [38;5;124mSELECT uf, SUM(area_do_imovel) as soma_area[39m
[1;32m      3[0m [38;5;124mFROM silver.cadastro_ambiental_rural[39m
[1;32m      4[0m [38;5;124mWHERE uf IN ([39m[38;5;124m'[39m[38;5;124mMS[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mMT[39m[38;5;124m'[39m[38;5;124m)[39m
[1;32m      5[0m [38;5;124mGROUP BY uf[39m
[1;32m      6[0m [38;5;124mORDER BY soma_area DESC[39m
[1;32m      7[0m [38;5;124m"""[39m)
[1;32m      8[0m result_ms_mt[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[3

###Filtrar propriedades da região sudeste:

In [0]:
result_sudeste = spark.sql("""
SELECT *
FROM silver.cadastro_ambiental_rural
WHERE uf IN ('SP', 'RJ', 'MG', 'ES')
""")
result_sudeste.write.format("delta").mode("overwrite").saveAsTable("gold.propriedades_sudeste")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1971494851123058>:1[0m
[0;32m----> 1[0m result_sudeste [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      2[0m [38;5;124mSELECT *[39m
[1;32m      3[0m [38;5;124mFROM silver.cadastro_ambiental_rural[39m
[1;32m      4[0m [38;5;124mWHERE uf IN ([39m[38;5;124m'[39m[38;5;124mSP[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mRJ[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mMG[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mES[39m[38;5;124m'[39m[38;5;124m)[39m
[1;32m      5[0m [38;5;124m"""[39m)
[1;32m      6[0m result_sudeste[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m

###Quantidade de propriedades cadastradas por ano:

In [0]:
propriedades_por_ano = spark.sql("""
SELECT ano_cadastro, COUNT(*) as quantidade
FROM silver.cadastro_ambiental_rural
GROUP BY ano_cadastro
ORDER BY ano_cadastro
""")
propriedades_por_ano.write.format("delta").mode("overwrite").saveAsTable("gold.propriedades_por_ano")


###Percentual médio de área remanescente de vegetação nativa em comparação à área total da propriedade:

In [0]:
percentual_vegetacao_nativa = spark.sql("""
SELECT AVG(area_remanescente_vegetacao_nativa / area_do_imovel * 100) as percentual_medio
FROM silver.cadastro_ambiental_rural
""")
percentual_vegetacao_nativa.write.format("delta").mode("overwrite").saveAsTable("gold.percentual_vegetacao_nativa")


###Contagem de propriedades rurais por estado


In [0]:
contagem_propriedades_estado = spark.sql("""
SELECT uf, COUNT(*) as quantidade
FROM silver.cadastro_ambiental_rural
GROUP BY uf
""")
contagem_propriedades_estado.write.format("delta").mode("overwrite").saveAsTable("gold.contagem_propriedades_estado")


###Média de área entre todas as propriedades e contagem das propriedades acima da média por estado

In [0]:
media_area = spark.sql("""
SELECT AVG(area_do_imovel) as media_area
FROM silver.cadastro_ambiental_rural
""").collect()[0]['media_area']


In [0]:
propriedades_acima_media = spark.sql(f"""
SELECT uf, COUNT(*) as quantidade
FROM silver.cadastro_ambiental_rural
WHERE area_do_imovel > {media_area}
GROUP BY uf
""")
propriedades_acima_media.write.format("delta").mode("overwrite").saveAsTable("gold.propriedades_acima_media")
