# Carga Camada Bronze

In [0]:
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime, timedelta
from delta.tables import DeltaTable

In [0]:
current_date = datetime.now()

# Calcula o início do mês anterior
previous_month = current_date.replace(day=1) - timedelta(days=1)
year_month = previous_month.strftime("%Y%m")
year = previous_month.strftime("%Y")
month = str(int(previous_month.strftime("%m")))
begin_previous_month = previous_month.replace(day=1).strftime("%Y%m%d")

# Calcula o fim do mês anterior
end_previous_month = previous_month.strftime("%Y%m%d")

print(year_month)
print(year)
print(month)

202305
2023
5


In [0]:
delta_table_path = "/mnt/projeto_climatico/bronze/nasa_solar_flux_temperature"
condicao_exclusao = f"YEAR = {year} AND MO = {month}"
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.delete(condicao_exclusao)

In [0]:
caminho_arquivo = "dbfs:/mnt/projeto_climatico/landing/nasa_solar_flux_temperature/" + year + "/nasa_solar_flux_temperature_" + year_month + ".csv"

rdd = spark.sparkContext.textFile(caminho_arquivo)

# Filtra as linhas após a linha "-END HEADER-"
header_end_index = rdd.collect().index("-END HEADER-")
linhas = rdd.collect()[header_end_index + 1:]

# Obtém o cabeçalho do arquivo
header = linhas[0]

# Remove a linha do cabeçalho
linhas = linhas[1:]

# Define o esquema original
schema = StructType([
    StructField("YEAR", StringType(), True),
    StructField("MO", StringType(), True),
    StructField("DY", StringType(), True),
    StructField("ALLSKY_SFC_SW_DWN", StringType(), True),
    StructField("CLRSKY_SFC_SW_DWN", StringType(), True),
    StructField("ALLSKY_KT", StringType(), True),
    StructField("ALLSKY_SFC_LW_DWN", StringType(), True),
    StructField("ALLSKY_SFC_PAR_TOT", StringType(), True),
    StructField("CLRSKY_SFC_PAR_TOT", StringType(), True),
    StructField("ALLSKY_SFC_UVA", StringType(), True),
    StructField("ALLSKY_SFC_UVB", StringType(), True),
    StructField("ALLSKY_SFC_UV_INDEX", StringType(), True),
    StructField("T2M", StringType(), True),
    StructField("T2MDEW", StringType(), True),
    StructField("T2MWET", StringType(), True),
    StructField("TS", StringType(), True),
    StructField("T2M_RANGE", StringType(), True),
    StructField("T2M_MAX", StringType(), True),
    StructField("T2M_MIN", StringType(), True) 
])

# Verifica se o número de colunas no arquivo corresponde ao número de colunas no esquema
num_colunas_arquivo = len(header.split(','))
num_colunas_esquema = len(schema.fields)

if num_colunas_arquivo == num_colunas_esquema:
    # As colunas do arquivo correspondem ao esquema original
    rdd_temp = spark.sparkContext.parallelize(linhas).map(lambda line: tuple(line.split(',')))
else:
    # As colunas do arquivo são diferentes do esquema original
    schema = StructType([StructField(col_name, StringType(), True) for col_name in header.split(',')])
    rdd_temp = spark.sparkContext.parallelize(linhas).map(lambda line: tuple(line.split(',')))

# Converte o RDD em DataFrame usando o esquema
df_temp = spark.createDataFrame(rdd_temp, schema)

df_temp = df_temp.withColumn("YEAR", col("YEAR").cast("int"))


In [0]:
df_temp.write.mode("append").option("path", "/mnt/projeto_climatico/bronze/nasa_solar_flux_temperature").format("delta").saveAsTable("bronze.nasa_solar_flux_temperature")

In [0]:
%sql
OPTIMIZE bronze.nasa_solar_flux_temperature