In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types
import os
import requests
import zipfile 
import io


In [2]:
pd.DataFrame.iteritems = pd.DataFrame.items

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

'''
df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df.show()
'''

'\ndf = spark.read     .option("header", "true")     .csv(\'taxi+_zone_lookup.csv\')\n\ndf.show()\n'

In [3]:
schema = types.StructType([
    types.StructField('id_bdq', types.LongType(), True),
    types.StructField('foco_id', types.StringType(), True),
    types.StructField('lat', types.DoubleType(), True),
    types.StructField('lon', types.DoubleType(), True),
    types.StructField('data_pas', types.StringType(), True),
    types.StructField('pais', types.StringType(), True),
    types.StructField('estado', types.StringType(), True),
    types.StructField('municipio', types.StringType(), True),
    types.StructField('bioma', types.DoubleType(), True)
 ])

In [None]:
years = range(2009, 2024)
for year in years:
    #DOWNLOAD THE FILE FOR THE REQUESTED YEAR
    data_url = f"https://dataserver-coids.inpe.br/queimadas/queimadas/focos/csv/anual/AMS_sat_ref/focos_ams_ref_{year}.zip"
    data_url
    focos_csv_years = {}
    response = requests.get(data_url)
    if response.status_code == 200:
        with zipfile.ZipFile(io.BytesIO(response.content), 'r') as zip_file:
            focos_csv_years[year] = zip_file.namelist()[0]
            zip_file.extractall()
        print(f"Files extracted successfully. The extracted file is {focos_csv_years[year]}")
    else:
        print(f"Failed to download the file. Status code {response.satus_code}")

    #READ THE FILE FOR THE REQUESTED YEAR WITH SPARK
    df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(focos_csv_years[year])

    #DO THE PARTITIONS AND WRITE THEM IN THE FOLDER
    df = df.repartition(6)
    df.write.mode('overwrite').parquet(f'magic-zoomcamp/focus/{year}/')

    #CHECK THE FILES ON THE FOLDER AND EXECUTED THE MAGE PIPELINE TO UPLOAD IT TO BUCKETS
    parquet_path = f'magic-zoomcamp/focus/{year}/' # Construct the path to the Parquet folder
    all_files = os.listdir(parquet_path) # List all files in the directory
    parquet_files = [file for file in all_files if file.endswith(".parquet")]# Filter out only the Parquet files
    for file_name in parquet_files:
        print(file_name)
        url = "http://localhost:6789/api/pipeline_schedules/1/pipeline_runs/8d564b53ffc34c72a19fddc9ba525e9e"
        payload = {
            "pipeline_run": {
                "variables": {
                    "filename": file_name,
                    "year": year
                }
            }
        }
        requests.post(url, json=payload)

    #EXECUTE THE SECOND MAGE PIPELINE TO TRANSFER IT FROM BUCKETS TO BIGQUERY
    for file_name in parquet_files:
        print(file_name)
        url = "http://localhost:6789/api/pipeline_schedules/2/pipeline_runs/8f6dc462027d432aa20cba31ad1ad43d"
        payload = {
            "pipeline_run": {
                "variables": {
                    "filename": file_name,
                    "year": year
                }
            }
        }
        requests.post(url, json=payload)