#Preparando análises

##Separando em 4 quadrantes os oceanos que envolvem a américa do sul

In [None]:
max_lat = 6
min_lat = -34
max_lon = -3.9
min_lon = -180
fronteira_lat = -20
fronteira_lon = -37.5

quadrante_mar_lat_lon = {
    'mar_nordeste': {
            'lat':(fronteira_lat,max_lat),
            'lon':(fronteira_lon,max_lon)
    },
    'mar_suldeste': {
            'lat':(min_lat,fronteira_lat),
            'lon':(fronteira_lon,max_lon)
    },
    'mar_noroeste': {
            'lat':(fronteira_lat,max_lat),
            'lon':(min_lon,fronteira_lon)
    },
    'mar_sudoeste': {
            'lat':(min_lat,fronteira_lat),
            'lon':(min_lon,fronteira_lon)
    }
}

In [None]:
bronze_surftemp = '/mnt/usp/ProjetoIntegrador/bronze/surftemp-sst'
mar_temp_df = spark.read.format("parquet").load(bronze_surftemp)

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as StringType

def get_mar_quadrante(lat, lon):
    for quadrante, ranges in quadrante_mar_lat_lon.items():
        lat_range, lon_range = ranges['lat'], ranges['lon']
        if lat_range[0] <= lat < lat_range[1] and lon_range[0] <= lon < lon_range[1]:
            return quadrante
    return 'demais'

get_mar_quadrante_udf = F.udf(get_mar_quadrante)

mar_quadrante_temp_df = (
    mar_temp_df
    .withColumn("quadrante_mar", get_mar_quadrante_udf(F.col("lat"), F.col("lon")))
    .groupBy("time","year","month","day")
    .pivot("quadrante_mar")
    .agg((F.avg("analysed_sst") - F.lit(273.15)).alias("temp_mar_quadrante"))
    .drop("demais")
)

In [None]:
mar_quadrante_temp_df.display()

time,year,month,day,mar_nordeste,mar_noroeste,mar_sudoeste,mar_suldeste
2020-11-11,2020,11,11,298.92181912493504,298.1266710033132,293.88995257616494,294.00565618252267
2020-05-21,2020,5,21,300.8735381410489,300.305446251077,295.4228122929547,295.904345690312
2020-12-20,2020,12,20,299.12926503921005,298.4016230238594,295.6677429668224,296.8161030998035
2020-08-21,2020,8,21,298.0385594556565,298.16985515615045,292.8027449905719,293.3270638032835
2020-02-04,2020,2,4,301.2618590461764,300.08699331907,297.397837112816,298.29579140215503
2020-07-22,2020,7,22,298.78711100124576,298.78926009801273,293.2606779079258,293.747055127345
2020-02-01,2020,2,1,301.13271626032406,300.1204575227241,297.3347540584254,298.3962482984374
2020-09-04,2020,9,4,298.0847201324066,297.8746537965853,292.7940701860714,293.1753499709019
2020-11-18,2020,11,18,298.99621964461966,298.14811858115235,294.32597306031624,294.2251174584538
2020-10-31,2020,10,31,298.7929987969285,297.8198516193767,293.5145393084308,293.5551729272012


##Adicionando dados de temperatura por dia por cidade

In [None]:
bronze_climatico = '/mnt/usp/ProjetoIntegrador/bronze/climaticos/'
ibge_cidade_codigo = "/mnt/historyzoneprod/Brazil/Manual/Sales/DigitalInsights/RevenueManagement/DataAcquisition/Ibm/LatLonCad"

In [None]:
def snake_case(s):
    from re import sub
    from unidecode import unidecode
    return sub('[^a-z0-9_]', '', unidecode('_'.join(
        sub('([A-Z][a-z]+)', r' \1',
        sub('([A-Z]+)', r' \1',
        s.replace('-',' '))).split()).lower())).replace('__','_')
    
snake_case_udf = F.udf(snake_case)

In [None]:
climatico_df = (
    spark.read.format("parquet").load(bronze_climatico)
    .select("latitude","longitude","precip24Hour","relativeHumidity","temperature","year","month","day","codigo")
    .join(spark.read.format("parquet").load(ibge_cidade_codigo).select("codigo","municipio","nome_uf"), "codigo")
    .withColumn("municipio",snake_case_udf("municipio"))
    .withColumn("nome_uf",snake_case_udf("nome_uf"))
)

In [None]:
climatico_df.display()

codigo,latitude,longitude,precip24Hour,relativeHumidity,temperature,year,month,day,municipio,nome_uf
1200203,-7.626373626373635,-72.6855652660139,2.7,93.7,24.4,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,2.4,95.6,23.6,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,2.2,96.5,23.0,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,1.8,97.0,22.9,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,1.3,98.5,22.4,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,0.8,98.0,22.5,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,0.6,98.0,21.9,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,0.3,98.5,21.9,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,0.1,100.0,21.6,2023,2,21,cruzeiro_do_sul,acre
1200203,-7.626373626373635,-72.6855652660139,0.1,100.0,21.4,2023,2,21,cruzeiro_do_sul,acre


##Dados de produção e valor da produção por cidade

In [None]:
bronze_pam = '/mnt/usp/ProjetoIntegrador/bronze/pam/'
pam_df = spark.read.format("parquet").load(bronze_pam)

In [None]:
from pyspark.sql.window import Window

pam_df = (
    pam_df
    .select(F.col("regiao").alias("municipio"),"area_colhida_hectares","valor_da_producao_mil_reais","tipo_lavoura","year","file_path")
    .join(
        pam_df
        .withColumn("marcador_estado", F.row_number().over(Window.partitionBy("file_path").orderBy(F.col("area_colhida_hectares").desc())))
        .filter(F.col("marcador_estado") == 1)
        .select("file_path",F.col("regiao").alias("nome_uf")),
        "file_path"
    )
    .drop("file_path")
    .withColumn("municipio",snake_case_udf("municipio"))
    .withColumn("nome_uf",snake_case_udf("nome_uf"))
)

In [None]:
pam_df.display()

municipio,area_colhida_hectares,valor_da_producao_mil_reais,tipo_lavoura,year,nome_uf
fonte_ibge_producao_agricola_municipal,,,lavouras_permanentes,2020,maranhao
sao_raimundo_das_mangabeiras,188.0,800.0,lavouras_permanentes,2020,maranhao
sao_felix_de_balsas,,,lavouras_permanentes,2020,maranhao
sao_domingos_do_azeitao,9.0,91.0,lavouras_permanentes,2020,maranhao
sambaiba,15.0,144.0,lavouras_permanentes,2020,maranhao
nova_colinas,23.0,101.0,lavouras_permanentes,2020,maranhao
loreto,16.0,150.0,lavouras_permanentes,2020,maranhao
fortaleza_dos_nogueiras,17.0,48.0,lavouras_permanentes,2020,maranhao
benedito_leite,6.0,67.0,lavouras_permanentes,2020,maranhao
chapadas_das_mangabeiras,274.0,1400.0,lavouras_permanentes,2020,maranhao


In [None]:
df = (
    climatico_df
    .join(pam_df, ['municipio','nome_uf','year'])
    .join(mar_quadrante_temp_df, ["year","month","day"])
)

In [None]:
df.display()

In [None]:
silver = '/mnt/usp/ProjetoIntegrador/silver/features/'
df.write.format("parquet").mode("overwrite").save(silver)