In [1]:
import polars as pl

In [2]:
queimadas_df = pl.read_parquet("data/queimadas-full.pqt.zstd")
municipios_df = pl.read_csv("data/municipios.csv", separator=";",encoding='iso-8859-1')
uf_df = pl.read_csv("data/uf.csv")

In [3]:
municipios_df.head()

CÓDIGO DO MUNICÍPIO - TOM,CÓDIGO DO MUNICÍPIO - IBGE,MUNICÍPIO - TOM,MUNICÍPIO - IBGE,UF
i64,i64,str,str,str
1,1100106,"""GUAJARÁ-MIRIM""","""Guajará-Mirim""","""RO"""
2,1100379,"""ALTO ALEGRE DOS PARECIS""","""Alto Alegre dos Parecis""","""RO"""
3,1100205,"""PORTO VELHO""","""Porto Velho""","""RO"""
4,1100452,"""BURITIS""","""Buritis""","""RO"""
5,1100122,"""JI-PARANÁ""","""Ji-Paraná""","""RO"""


In [4]:
uf_df.head()

id_uf,sigla,nome,regiao
i64,str,str,str
42,"""SC""","""Santa Catarina""","""Sul"""
41,"""PR""","""Paraná""","""Sul"""
43,"""RS""","""Rio Grande do Sul""","""Sul"""
11,"""RO""","""Rondônia""","""Norte"""
13,"""AM""","""Amazonas""","""Norte"""


In [5]:
queimadas_df = queimadas_df.filter(pl.col('bioma').is_not_null())
queimadas_df = queimadas_df.with_columns([pl.col('dias_sem_chuva').cast(pl.Int64)])
queimadas_df = queimadas_df.with_columns([pl.col('dias_sem_chuva').replace(-999,None)])
queimadas_df = queimadas_df.with_columns([pl.col('risco_fogo').replace(-999,None)])
queimadas_df

ano,mes,data_hora,bioma,sigla_uf,id_municipio,latitude,longitude,satelite,dias_sem_chuva,precipitacao,risco_fogo,potencia_radiativa_fogo
i64,i64,datetime[μs],str,str,str,f64,f64,str,i64,f64,f64,f64
2008,1,2008-01-01 15:54:00,"""Mata Atlântica""","""BA""","""2900801""",-17.406,-39.387,,,,,
2008,1,2008-01-01 15:55:00,"""Caatinga""","""AL""","""2702306""",-10.085,-36.342,,,,,
2008,1,2008-01-01 15:55:00,"""Caatinga""","""SE""","""2805604""",-9.931,-37.239,,,,,
2008,1,2008-01-01 15:55:00,"""Caatinga""","""SE""","""2807402""",-10.971,-38.002,,,,,
2008,1,2008-01-01 15:55:00,"""Caatinga""","""SE""","""2807402""",-10.969,-37.988,,,,,
…,…,…,…,…,…,…,…,…,…,…,…,…
2003,12,2003-12-31 16:33:00,"""Amazônia""","""MA""","""2109239""",-2.291,-45.786,,,,,
2003,12,2003-12-31 16:33:00,"""Amazônia""","""MA""","""2109239""",-2.289,-45.777,,,,,
2003,12,2003-12-31 16:33:00,"""Amazônia""","""MA""","""2109239""",-2.28,-45.778,,,,,
2003,12,2003-12-31 16:33:00,"""Amazônia""","""MA""","""2110039""",-2.613,-45.984,,,,,


In [6]:
# generate dim_horarios_queimada
# generate all combinations of hour and minutes in a day
minuto_list = 24*[list(range(0, 60))]
dim_horarios_full = pl.DataFrame({
    "hora": list(range(0, 24)),
    "minuto": minuto_list}).explode("minuto").with_row_index("id_horario").select([
        pl.col("id_horario").cast(pl.Int32),
        pl.col("hora").cast(pl.Int8),
        pl.col("minuto").cast(pl.Int8)
    ])
dim_horarios_full

id_horario,hora,minuto
i32,i8,i8
0,0,0
1,0,1
2,0,2
3,0,3
4,0,4
…,…,…
1435,23,55
1436,23,56
1437,23,57
1438,23,58


In [7]:
# create dim_data_queimada
dim_data_queimada = (queimadas_df
    .select([pl.col("data_hora").alias("date_time_iso"),
             pl.col("data_hora").dt.day().alias("dia"), 
             pl.col("data_hora").dt.month().alias("mes"), 
             pl.col("data_hora").dt.year().alias("ano"),
             # generate semester column
             pl.col("data_hora").dt.month().map_elements(lambda x: 1 if x <= 6 else 2).alias("semestre"),
             # generate trimester column
             ((pl.col("data_hora").dt.month() - 1 )//3 + 1).alias("trimestre"),
             # generate week day column
             pl.col("data_hora").dt.weekday().alias("dia_semana"),
             # generate day of year column
             pl.col("data_hora").dt.ordinal_day().alias("dia_ano"),
             # generate is_weekend column, starting from saturday (6)
             pl.col("data_hora").dt.weekday().map_elements(lambda x: x >= 6).alias("is_weekend"),
             # generate week of year column
             pl.col("data_hora").dt.week().alias("semana_ano")
             ]).unique().sort("date_time_iso")).with_row_index("id_data")
print(len(dim_data_queimada))
dim_data_queimada

Expr.map_elements is significantly slower than the native expressions API.
Only use if you absolutely CANNOT implement your logic otherwise.
Replace this expression...
  - pl.col("data_hora").map_elements(lambda x: ...)
with this one instead:
  + pl.col("data_hora") >= 6

  pl.col("data_hora").dt.weekday().map_elements(lambda x: x >= 6).alias("is_weekend"),


1072406


id_data,date_time_iso,dia,mes,ano,semestre,trimestre,dia_semana,dia_ano,is_weekend,semana_ano
u32,datetime[μs],i8,i8,i32,i64,i8,i8,i16,bool,i8
0,2003-01-01 16:04:00,1,1,2003,1,1,3,1,false,1
1,2003-01-01 16:05:00,1,1,2003,1,1,3,1,false,1
2,2003-01-01 16:06:00,1,1,2003,1,1,3,1,false,1
3,2003-01-01 16:07:00,1,1,2003,1,1,3,1,false,1
4,2003-01-01 16:08:00,1,1,2003,1,1,3,1,false,1
…,…,…,…,…,…,…,…,…,…,…
1072401,2025-02-12 22:20:00,12,2,2025,1,1,3,43,false,7
1072402,2025-02-12 22:30:00,12,2,2025,1,1,3,43,false,7
1072403,2025-02-12 22:50:00,12,2,2025,1,1,3,43,false,7
1072404,2025-02-12 23:00:00,12,2,2025,1,1,3,43,false,7


In [8]:
# add season to dim_data_queimada using day and month
# AAAAAAAAAAAAAA there is no built-in season function in polars
# so we have to do it manually
# Summer - 1: Dec 21 - Mar 19
# Autumn - 2: Mar 20 - Jun 20
# Winter - 3: Jun 21 - Sep 21
# Spring - 4: Sep 22 - Dec 20
dim_data_queimada = dim_data_queimada.with_columns(
    pl.when(( (pl.col("mes") == 12) & (pl.col("dia") >= 21) ) | (pl.col("mes").is_in([1,2])) | ((pl.col("mes") == 3) & (pl.col("dia") < 20)))
    .then(1)
    .when(( (pl.col("mes") == 3) & (pl.col("dia") >= 20) ) | (pl.col("mes").is_in([4,5])) | ((pl.col("mes") == 6) & (pl.col("dia") < 21)))
    .then(2)
    .when(( (pl.col("mes") == 6) & (pl.col("dia") >= 21) ) | (pl.col("mes").is_in([7,8])) | ((pl.col("mes") == 9) & (pl.col("dia") < 22)))
    .then(3)
    .otherwise(4).alias("estacao")
)
dim_data_queimada.head()

id_data,date_time_iso,dia,mes,ano,semestre,trimestre,dia_semana,dia_ano,is_weekend,semana_ano,estacao
u32,datetime[μs],i8,i8,i32,i64,i8,i8,i16,bool,i8,i32
0,2003-01-01 16:04:00,1,1,2003,1,1,3,1,False,1,1
1,2003-01-01 16:05:00,1,1,2003,1,1,3,1,False,1,1
2,2003-01-01 16:06:00,1,1,2003,1,1,3,1,False,1,1
3,2003-01-01 16:07:00,1,1,2003,1,1,3,1,False,1,1
4,2003-01-01 16:08:00,1,1,2003,1,1,3,1,False,1,1


In [9]:
# generate dim_local_queimada
dim_local_queimada = (queimadas_df
    .select([pl.col("id_municipio").alias("id_municipio").cast(pl.Int32),
             pl.col("sigla_uf").alias("sigla_uf"),
             pl.col("bioma").alias("bioma"),
             pl.col("latitude").alias("latitude"),
             pl.col("longitude").alias("longitude"),]).unique().sort(["id_municipio", "sigla_uf"]))
dim_local_queimada.head()

id_municipio,sigla_uf,bioma,latitude,longitude
i32,str,str,f64,f64
1100015,"""RO""","""Amazônia""",-12.18125,-62.61745
1100015,"""RO""","""Amazônia""",-12.065,-61.996
1100015,"""RO""","""Amazônia""",-12.264,-62.219
1100015,"""RO""","""Amazônia""",-12.799,-62.517
1100015,"""RO""","""Amazônia""",-12.231,-62.13


In [10]:
# join with municipios to get municipio name
# join with uf to get uf name and regiao
dim_local_queimada = (dim_local_queimada
    .join(municipios_df.select([pl.col("MUNICÍPIO - IBGE"), pl.col('CÓDIGO DO MUNICÍPIO - IBGE')]), left_on="id_municipio", right_on="CÓDIGO DO MUNICÍPIO - IBGE", how="left")
    .join(uf_df.select([pl.col("sigla"), pl.col("nome"), pl.col("regiao")]), left_on="sigla_uf", right_on="sigla", how="left"))
dim_local_queimada = dim_local_queimada.select([
    pl.col("id_municipio"),
    pl.col("MUNICÍPIO - IBGE").alias("nome_municipio"),
    pl.col("sigla_uf"),
    pl.col("nome").alias("nome_uf"),
    pl.col("regiao").alias("regiao_uf"),
    pl.col("bioma"),
    pl.col("latitude"),
    pl.col("longitude"),
]).sort(["id_municipio", "sigla_uf"]).unique().with_row_index("id_local")
print(len(dim_local_queimada))
dim_local_queimada

14612826


id_local,id_municipio,nome_municipio,sigla_uf,nome_uf,regiao_uf,bioma,latitude,longitude
u32,i32,str,str,str,str,str,f64,f64
0,1500859,"""Anapu""","""PA""","""Pará""","""Norte""","""Amazônia""",-3.01851,-51.3448
1,1507300,"""São Félix do Xingu""","""PA""","""Pará""","""Norte""","""Amazônia""",-6.9085,-52.6711
2,2903201,"""Barreiras""","""BA""","""Bahia""","""Nordeste""","""Cerrado""",-11.92053,-45.09534
3,2102002,"""Bom Jardim""","""MA""","""Maranhão""","""Nordeste""","""Amazônia""",-3.84972,-46.60426
4,1301803,"""Ipixuna""","""AM""","""Amazonas""","""Norte""","""Amazônia""",-7.14025,-71.68433
…,…,…,…,…,…,…,…,…
14612821,1507300,"""São Félix do Xingu""","""PA""","""Pará""","""Norte""","""Amazônia""",-5.734,-51.283
14612822,1304104,"""Tapauá""","""AM""","""Amazonas""","""Norte""","""Amazônia""",-6.65837,-63.06742
14612823,1504703,"""Moju""","""PA""","""Pará""","""Norte""","""Amazônia""",-2.19439,-48.81997
14612824,1100205,"""Porto Velho""","""RO""","""Rondônia""","""Norte""","""Amazônia""",-8.71761,-62.54329


In [11]:
# generate fct_queimadas

# join by date_time
fct_queimadas = (queimadas_df.join(
    dim_data_queimada.select([pl.col("date_time_iso"), pl.col('id_data')]),
    left_on="data_hora", right_on="date_time_iso", how="left"
).with_columns([pl.col('id_municipio').cast(pl.Int32)])
# join by id_municipio, sigla_uf, latitude, longitude, and bioma
.join(
    dim_local_queimada.select([pl.col("id_local"), pl.col("id_municipio"), pl.col("sigla_uf"), pl.col("latitude"), pl.col("longitude"), pl.col("bioma")]),
    left_on=["id_municipio", "sigla_uf", "latitude", "longitude", "bioma"],
    right_on=["id_municipio", "sigla_uf", "latitude", "longitude", "bioma"],
    how="left"
)
# join dim horarios_queimada to get id_horario
.join(
    dim_horarios_full.select([pl.col("id_horario"), pl.col("hora"), pl.col("minuto")]),
    left_on=[pl.col("data_hora").dt.hour(), pl.col("data_hora").dt.minute()],
    right_on=["hora", "minuto"],
    how="left")
.select([
    pl.col("id_data"),
    pl.col("id_local"),
    pl.col("id_horario"),
    pl.col('precipitacao').alias('precipitacao'),
    pl.col('risco_fogo').alias('risco_fogo'),
    pl.col('potencia_radiativa_fogo').alias('potencia_radiativa_fogo'),
    pl.col('dias_sem_chuva').alias('dias_sem_chuva')
])).unique()
    
print(len(fct_queimadas))
fct_queimadas

17542892


id_data,id_local,id_horario,precipitacao,risco_fogo,potencia_radiativa_fogo,dias_sem_chuva
u32,u32,i32,f64,f64,f64,i64
341774,6469260,1027,0.0,0.57,8.0,4
1064663,4808639,948,0.0,1.0,2.4,
62419,2707158,1067,,,,
849405,463269,291,0.0,1.0,3.2,17
879866,12382949,282,0.0,1.0,1.9,119
…,…,…,…,…,…,…
881950,3372030,1035,0.0,1.0,167.4,95
10896,2086131,1019,,,,
851584,6586803,1022,0.0,1.0,44.6,99
700884,10389284,1042,0.0,0.82,35.9,57


In [12]:
# load to parquet
dim_horarios_full.write_parquet("data/dim_horarios_queimada.pqt.zstd", compression="zstd")
dim_local_queimada.write_parquet("data/dim_local_queimada.pqt.zstd", compression="zstd")
dim_data_queimada.write_parquet("data/dim_data.pqt.zstd", compression="zstd")
fct_queimadas.write_parquet("data/fct_queimadas.pqt.zstd", compression="zstd")

In [13]:
clima_df = pl.read_parquet('data/sisam-full.pqt.zstd',n_rows=5000)
clima_df = clima_df.filter(pl.col('sigla_uf').is_not_null() & pl.col('id_municipio').is_not_null() )
clima_df

ano,sigla_uf,id_municipio,data_hora,co_ppb,no2_ppb,o3_ppb,pm25_ugm3,so2_ugm3,precipitacao_dia,temperatura,umidade_relativa,vento_direcao,vento_velocidade
i64,str,str,datetime[μs],f64,f64,f64,f64,f64,f64,f64,f64,i64,f64
2008,"""AM""","""1300144""",2008-08-20 18:00:00,378.6,0.9,53.7,35.8,1.4,,37.2,23.0,121,1.8
2008,"""AM""","""1300144""",2008-08-16 18:00:00,315.0,1.0,54.2,57.1,1.2,,37.4,23.0,76,1.7
2008,"""AM""","""1300144""",2008-08-23 18:00:00,354.0,1.6,67.3,59.1,1.4,,36.9,23.0,78,0.9
2008,"""AM""","""1300144""",2008-07-26 18:00:00,152.0,0.5,29.2,3.9,0.4,,36.3,24.0,114,2.1
2008,"""AM""","""1300144""",2008-08-22 18:00:00,475.1,1.2,66.6,48.9,1.6,,36.4,24.0,134,1.3
…,…,…,…,…,…,…,…,…,…,…,…,…,…
2008,"""AL""","""2701001""",2008-12-30 18:00:00,133.0,0.7,38.3,15.5,0.8,,29.5,54.0,117,3.4
2008,"""AL""","""2701001""",2008-12-16 18:00:00,89.0,0.4,36.6,2.8,0.8,,30.4,54.0,107,2.6
2008,"""AL""","""2701100""",2008-02-06 18:00:00,72.0,0.2,30.3,2.2,0.4,,29.7,54.0,112,3.9
2008,"""AL""","""2701100""",2008-02-08 18:00:00,85.5,0.3,25.5,12.6,0.5,,30.6,54.0,125,3.8


In [14]:
# insert data_hora from clima_df into dim_data_queimada

dim_data_clima = (clima_df
    .select([pl.col("data_hora").alias("date_time_iso"),
             pl.col("data_hora").dt.day().alias("dia"), 
             pl.col("data_hora").dt.month().alias("mes"), 
             pl.col("data_hora").dt.year().alias("ano"),
             # generate semester column
             pl.col("data_hora").dt.month().map_elements(lambda x: 1 if x <= 6 else 2).alias("semestre"),
             # generate trimester column
             ((pl.col("data_hora").dt.month() - 1 )//3 + 1).alias("trimestre"),
             # generate week day column
             pl.col("data_hora").dt.weekday().alias("dia_semana"),
             # generate day of year column
             pl.col("data_hora").dt.ordinal_day().alias("dia_ano"),
             # generate is_weekend column, starting from saturday (6)
             pl.col("data_hora").dt.weekday().map_elements(lambda x: x >= 6).alias("is_weekend"),
             # generate week of year column
             pl.col("data_hora").dt.week().alias("semana_ano")
             ]).unique().sort("date_time_iso")).with_row_index("id_data")
dim_data_clima

id_data,date_time_iso,dia,mes,ano,semestre,trimestre,dia_semana,dia_ano,is_weekend,semana_ano
u32,datetime[μs],i8,i8,i32,i64,i8,i8,i16,bool,i8
0,2008-01-01 18:00:00,1,1,2008,1,1,2,1,false,1
1,2008-01-02 18:00:00,2,1,2008,1,1,3,2,false,1
2,2008-01-03 18:00:00,3,1,2008,1,1,4,3,false,1
3,2008-01-04 18:00:00,4,1,2008,1,1,5,4,false,1
4,2008-01-05 18:00:00,5,1,2008,1,1,6,5,true,1
…,…,…,…,…,…,…,…,…,…,…
331,2008-12-28 18:00:00,28,12,2008,2,4,7,363,true,52
332,2008-12-29 18:00:00,29,12,2008,2,4,1,364,false,1
333,2008-12-30 18:00:00,30,12,2008,2,4,2,365,false,1
334,2008-12-31 00:00:00,31,12,2008,2,4,3,366,false,1


In [15]:

dim_data_clima = dim_data_clima.with_columns(
    pl.when(( (pl.col("mes") == 12) & (pl.col("dia") >= 21) ) | (pl.col("mes").is_in([1,2])) | ((pl.col("mes") == 3) & (pl.col("dia") < 20)))
    .then(1)
    .when(( (pl.col("mes") == 3) & (pl.col("dia") >= 20) ) | (pl.col("mes").is_in([4,5])) | ((pl.col("mes") == 6) & (pl.col("dia") < 21)))
    .then(2)
    .when(( (pl.col("mes") == 6) & (pl.col("dia") >= 21) ) | (pl.col("mes").is_in([7,8])) | ((pl.col("mes") == 9) & (pl.col("dia") < 22)))
    .then(3)
    .otherwise(4).alias("estacao")
)
dim_data_clima.head(1000)

id_data,date_time_iso,dia,mes,ano,semestre,trimestre,dia_semana,dia_ano,is_weekend,semana_ano,estacao
u32,datetime[μs],i8,i8,i32,i64,i8,i8,i16,bool,i8,i32
0,2008-01-01 18:00:00,1,1,2008,1,1,2,1,false,1,1
1,2008-01-02 18:00:00,2,1,2008,1,1,3,2,false,1,1
2,2008-01-03 18:00:00,3,1,2008,1,1,4,3,false,1,1
3,2008-01-04 18:00:00,4,1,2008,1,1,5,4,false,1,1
4,2008-01-05 18:00:00,5,1,2008,1,1,6,5,true,1,1
…,…,…,…,…,…,…,…,…,…,…,…
331,2008-12-28 18:00:00,28,12,2008,2,4,7,363,true,52,1
332,2008-12-29 18:00:00,29,12,2008,2,4,1,364,false,1,1
333,2008-12-30 18:00:00,30,12,2008,2,4,2,365,false,1,1
334,2008-12-31 00:00:00,31,12,2008,2,4,3,366,false,1,1


In [16]:
# insert tuples from dim_data_clima into dim_data_queimada, avoiding duplicates and increasing id_data accordingly
max_id_data = dim_data_queimada.select(pl.col("id_data").max()).item()
dim_data_clima = dim_data_clima.with_columns(pl.col('id_data')+ max_id_data + 1)
print(max_id_data)
new_data = dim_data_clima.join(
    dim_data_queimada.select("date_time_iso"),
    on="date_time_iso",
    how="anti"
)
print(len(new_data))
dim_data = pl.concat([dim_data_queimada, new_data]).sort("id_data")
dim_data

1072405
320


id_data,date_time_iso,dia,mes,ano,semestre,trimestre,dia_semana,dia_ano,is_weekend,semana_ano,estacao
u32,datetime[μs],i8,i8,i32,i64,i8,i8,i16,bool,i8,i32
0,2003-01-01 16:04:00,1,1,2003,1,1,3,1,false,1,1
1,2003-01-01 16:05:00,1,1,2003,1,1,3,1,false,1,1
2,2003-01-01 16:06:00,1,1,2003,1,1,3,1,false,1,1
3,2003-01-01 16:07:00,1,1,2003,1,1,3,1,false,1,1
4,2003-01-01 16:08:00,1,1,2003,1,1,3,1,false,1,1
…,…,…,…,…,…,…,…,…,…,…,…
1072737,2008-12-28 18:00:00,28,12,2008,2,4,7,363,true,52,1
1072738,2008-12-29 18:00:00,29,12,2008,2,4,1,364,false,1,1
1072739,2008-12-30 18:00:00,30,12,2008,2,4,2,365,false,1,1
1072740,2008-12-31 00:00:00,31,12,2008,2,4,3,366,false,1,1


In [17]:
# generate dim_horario_clima

dim_horarios_clima = pl.DataFrame({
    "hora": list(range(0, 24)),}).with_row_index("id_horario")
dim_horarios_clima

id_horario,hora
u32,i64
0,0
1,1
2,2
3,3
4,4
…,…
19,19
20,20
21,21
22,22


In [18]:
# link dim_horario_clima to dim_horarios_full
dim_horarios_full = dim_horarios_full.join(
    dim_horarios_clima,
    on="hora",
    how="inner"
).rename({"id_horario_right": "id_horario_clima"})
dim_horarios_full

id_horario,hora,minuto,id_horario_clima
i32,i8,i8,u32
0,0,0,0
1,0,1,0
2,0,2,0
3,0,3,0
4,0,4,0
…,…,…,…
1435,23,55,23
1436,23,56,23
1437,23,57,23
1438,23,58,23


In [19]:
# generate dim_local_clima

dim_local_clima = (clima_df
    .select([pl.col("id_municipio").alias("id_municipio").cast(pl.Int32),
             pl.col("sigla_uf").alias("sigla_uf"),
             ]).unique().sort(["id_municipio", "sigla_uf"]))
# join with municipios to get municipio name
# join with uf to get uf name and regiao
dim_local_clima = (dim_local_clima
    .join(municipios_df.select([pl.col("MUNICÍPIO - IBGE"), pl.col('CÓDIGO DO MUNICÍPIO - IBGE')]), left_on="id_municipio", right_on="CÓDIGO DO MUNICÍPIO - IBGE", how="left")
    .join(uf_df.select([pl.col("sigla"), pl.col("nome"), pl.col("regiao")]), left_on="sigla_uf", right_on="sigla", how="left"))

dim_local_clima = dim_local_clima.select([
    pl.col("id_municipio"),
    pl.col("MUNICÍPIO - IBGE").alias("nome_municipio"),
    pl.col("sigla_uf"),
    pl.col("nome").alias("nome_uf"),
    pl.col("regiao").alias("regiao_uf")
]).sort(["id_municipio", "sigla_uf"]).unique().with_row_index("id_local")
dim_local_clima

id_local,id_municipio,nome_municipio,sigla_uf,nome_uf,regiao_uf
u32,i32,str,str,str,str
0,1303536,"""Presidente Figueiredo""","""AM""","""Amazonas""","""Norte"""
1,1200013,"""Acrelândia""","""AC""","""Acre""","""Norte"""
2,2700300,"""Arapiraca""","""AL""","""Alagoas""","""Nordeste"""
3,2701357,"""Campestre""","""AL""","""Alagoas""","""Nordeste"""
4,2701605,"""Canapi""","""AL""","""Alagoas""","""Nordeste"""
…,…,…,…,…,…
162,1300409,"""Barcelos""","""AM""","""Amazonas""","""Norte"""
163,1304203,"""Tefé""","""AM""","""Amazonas""","""Norte"""
164,2705507,"""Murici""","""AL""","""Alagoas""","""Nordeste"""
165,2703304,"""Inhapi""","""AL""","""Alagoas""","""Nordeste"""


In [20]:
# link dim_local_clima to dim_local_queimada on id_municipio and sigla_uf to get id_local_clima

print("Before join:", dim_local_queimada.shape)
dim_local_queimada = dim_local_queimada.join(
    dim_local_clima.select([pl.col("id_local").alias("id_local_clima"), pl.col("id_municipio"), pl.col("sigla_uf")]),
    on=["id_municipio", "sigla_uf"],
    how="left"
    )
print("After join:", dim_local_queimada.shape)
dim_local_queimada

Before join: (14612826, 9)
After join: (14612826, 10)


id_local,id_municipio,nome_municipio,sigla_uf,nome_uf,regiao_uf,bioma,latitude,longitude,id_local_clima
u32,i32,str,str,str,str,str,f64,f64,u32
0,1500859,"""Anapu""","""PA""","""Pará""","""Norte""","""Amazônia""",-3.01851,-51.3448,
1,1507300,"""São Félix do Xingu""","""PA""","""Pará""","""Norte""","""Amazônia""",-6.9085,-52.6711,
2,2903201,"""Barreiras""","""BA""","""Bahia""","""Nordeste""","""Cerrado""",-11.92053,-45.09534,
3,2102002,"""Bom Jardim""","""MA""","""Maranhão""","""Nordeste""","""Amazônia""",-3.84972,-46.60426,
4,1301803,"""Ipixuna""","""AM""","""Amazonas""","""Norte""","""Amazônia""",-7.14025,-71.68433,76
…,…,…,…,…,…,…,…,…,…
14612821,1507300,"""São Félix do Xingu""","""PA""","""Pará""","""Norte""","""Amazônia""",-5.734,-51.283,
14612822,1304104,"""Tapauá""","""AM""","""Amazonas""","""Norte""","""Amazônia""",-6.65837,-63.06742,102
14612823,1504703,"""Moju""","""PA""","""Pará""","""Norte""","""Amazônia""",-2.19439,-48.81997,
14612824,1100205,"""Porto Velho""","""RO""","""Rondônia""","""Norte""","""Amazônia""",-8.71761,-62.54329,


In [21]:
# create fct_clima

fct_clima = (clima_df.join(
    dim_data.select([pl.col("date_time_iso"), pl.col('id_data')]),
    left_on="data_hora", right_on="date_time_iso", how="left"
).with_columns([pl.col('id_municipio').cast(pl.Int32)])
# join by id_municipio, sigla_uf to get id_local_clima
.join(
    dim_local_clima.select([pl.col("id_local"), pl.col("id_municipio"), pl.col("sigla_uf")]),
    left_on=["id_municipio", "sigla_uf"],
    right_on=["id_municipio", "sigla_uf"],
    how="left"
)
# join dim_horarios_clima to get id_horario
.join(
    dim_horarios_clima.select([pl.col("id_horario"), pl.col("hora")]),
    left_on=[pl.col("data_hora").dt.hour()],
    right_on=["hora"],
    how="left"
)
.select([
    pl.col("id_data"),
    pl.col("id_local"),
    pl.col("id_horario"),
    pl.col('temperatura'),
    pl.col('umidade_relativa'),
    pl.col('vento_velocidade'),
    pl.col('vento_direcao'),
    pl.col('co_ppb'),
    pl.col('no2_ppb'),
    pl.col('o3_ppb'),
    pl.col('pm25_ugm3'),
    pl.col('so2_ugm3'),
    pl.col('precipitacao_dia')
])).unique()
print(len(fct_clima))
fct_clima

4952


id_data,id_local,id_horario,temperatura,umidade_relativa,vento_velocidade,vento_direcao,co_ppb,no2_ppb,o3_ppb,pm25_ugm3,so2_ugm3,precipitacao_dia
u32,u32,u32,f64,f64,f64,i64,f64,f64,f64,f64,f64,f64
1072445,165,18,33.3,42.0,4.2,139,76.2,0.1,29.0,7.2,0.2,
1072417,65,18,32.6,34.0,3.9,143,92.7,0.2,42.8,0.0,0.4,
1072704,101,18,32.7,44.0,2.7,68,116.7,0.2,37.9,5.9,0.4,
1072441,85,18,31.8,52.0,5.0,107,89.4,0.2,30.6,,0.4,
20498,74,18,31.1,46.0,5.4,116,82.9,0.2,33.4,1.1,0.4,
…,…,…,…,…,…,…,…,…,…,…,…,…
1072679,89,18,31.0,44.0,4.0,82,86.1,0.2,41.8,5.7,0.5,
1072707,164,18,31.2,50.0,3.1,61,147.2,0.4,48.4,2.4,1.0,
1072570,39,18,34.6,40.0,2.1,335,181.7,0.0,27.9,10.6,0.2,
1072536,67,18,33.6,40.0,2.9,108,101.2,0.0,15.3,1.6,0.1,


In [22]:
# load to parquet
dim_horarios_clima.write_parquet("data/dim_horarios_clima.pqt.zstd", compression="zstd")
dim_local_clima.write_parquet("data/dim_local_clima.pqt.zstd", compression="zstd")
dim_data.write_parquet("data/dim_data.pqt.zstd", compression="zstd")
fct_clima.write_parquet("data/fct_clima.pqt.zstd", compression="zstd")

dim_horarios_full.write_parquet("data/dim_horarios_queimada.pqt.zstd", compression="zstd")
dim_local_queimada.write_parquet("data/dim_local_queimada.pqt.zstd", compression="zstd")