<a href="https://colab.research.google.com/github/JJader/api-frontend/blob/feat%2Fcreate-celery-routes/notebook/enriquecimento.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Configurando o ambiente

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install -U pandas==1.5.3 &> /dev/null
!pip install -q findspark pyspark "mlflow==2.15.1" &> /dev/null

In [3]:
import os
import findspark

In [4]:
# instalar as dependências do spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# !wget  https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf /content/drive/MyDrive/Colab\ Notebooks/spark-3.1.2-bin-hadoop2.7.tgz

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
findspark.init()

In [5]:
!wget https://github.com/PicPay/case-machine-learning-engineer-pleno/raw/main/notebook/airports-database.zip
!unzip airports-database.zip -d airports-database

--2024-08-26 20:24:47--  https://github.com/PicPay/case-machine-learning-engineer-pleno/raw/main/notebook/airports-database.zip
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/PicPay/case-machine-learning-engineer-pleno/main/notebook/airports-database.zip [following]
--2024-08-26 20:24:48--  https://raw.githubusercontent.com/PicPay/case-machine-learning-engineer-pleno/main/notebook/airports-database.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10463368 (10.0M) [application/zip]
Saving to: ‘airports-database.zip’


2024-08-26 20:24:48 (64.4 MB/s) - ‘airports-database.zip’ saved [10463368/1

# Sessão

In [6]:

# iniciar uma sessão local
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, count, date_format, stddev, expr, first, to_date, udf, explode, lit, date_add
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType
import requests

#sc = SparkSession.builder.master('local[*]').getOrCreate()
spark = SparkSession.builder.appName("Introducao").getOrCreate()
spark

In [7]:
df = spark.read.csv("/content/airports-database/airports-database.csv", header=True)
df = df.withColumn("date", to_date(col("time_hour"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("day_of_week_name", date_format(col("date"), "EEEE"))

## Enriquecimento

### AirportDB API:

In [None]:
schema = StructType([
    StructField("latitude_deg", StringType(), True),
    StructField("longitude_deg", StringType(), True),
])

@udf(returnType=schema)
def fetch_data(airport_code: str):
    airportdb_key = "ae7555efbe721faff667b69c9031a059619294bbdec5489a16c65b5196dff4d4dfd7e1f58bb76e07a22128297ac96e1d" # em um sistema produtivo colocar como variável de ambiente ou secrets
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_key}"

    response = requests.get(url)
    result_json = response.json()
    latitude_deg = result_json.get("latitude_deg")
    longitude_deg = result_json.get("longitude_deg")
    return latitude_deg,longitude_deg

In [None]:
origin_df = df.select("origin").withColumnRenamed("origin", "airport")
dest_df = df.select("dest").withColumnRenamed("dest", "airport")

combined_df = origin_df.union(dest_df).distinct()
combined_list = (
     combined_df
    .rdd.map(lambda row : row[0])
    .collect()
)

In [None]:
response_df = combined_df.withColumn("response", fetch_data("airport"))
airport_location_df = response_df.select(*combined_df.columns,"response.*")

In [None]:
airport_location_df.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("/content/drive/MyDrive/picpay/airport.csv")

+-------+------------------+------------------+
|airport|      latitude_deg|     longitude_deg|
+-------+------------------+------------------+
|    LGA|         40.777199|        -73.872597|
|    EWR|         40.692501|        -74.168701|
|    JFK|         40.639801|          -73.7789|
|    PSE|              null|              null|
|    MSY| 29.99340057373047|-90.25800323486328|
|    SNA|         33.675701|       -117.867996|
|    BUR|         34.197703|       -118.356378|
|    GRR|       42.88079834|      -85.52279663|
|    MYR|     33.6796989441|    -78.9282989502|
|    GSO|36.097801208496094|-79.93730163574219|
|    PVD|         41.732601|        -71.420403|
|    OAK|         37.721298|       -122.221001|
|    MSN|           43.1399|        -89.337502|
|    DCA|           38.8521|        -77.037697|
|    LEX|  38.0364990234375|-84.60590362548828|
|    ORF| 36.89459991455078|-76.20120239257812|
|    CRW| 38.37310028076172|-81.59320068359375|
|    SAV|       32.12760162|      -81.20

### Weatherbit API:

In [8]:
@udf(StringType())
def fetch_weatherbit_api(latitude: str, longitude: str, start_date, end_date ):
    url = "https://api.weatherbit.io/v2.0/history/daily"
    weatherbit_key = "1cfc8ea9e4084678b02062bbf2181923" # em um sistema produtivo colocar como variável de ambiente ou secrets

    params = {
      'lat': latitude,
      'lon': longitude,
      'start_date': start_date.strftime('%Y-%m-%d'),
      'end_date': end_date.strftime('%Y-%m-%d'),
      'key': weatherbit_key,
    }
    headers = {
      'Accept': 'application/json',
    }
    response = requests.get(url, params=params, headers=headers)
    data = response.json()
    return data["data"][0].get("wind_spd")

In [9]:
airport_location_df = spark.read.csv("/content/drive/MyDrive/picpay/airport.csv", header=True)

In [10]:
df_with_origin_coords = df.join(
    airport_location_df,
    airport_location_df.airport == df.origin,
).select(
    *df.columns,
    col('latitude_deg').alias('origin_latitude'),
    col('longitude_deg').alias('origin_longitude')
)


In [11]:
df_with_coords = df_with_origin_coords.join(
    airport_location_df,
    airport_location_df.airport == df_with_origin_coords.dest,
).select(
    *df_with_origin_coords.columns,
    col('latitude_deg').alias('dest_latitude'),
    col('longitude_deg').alias('dest_longitude')
)

df_with_coords.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+----------+----------------+---------------+----------------+------------------+------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|      date|day_of_week_name|origin_latitude|origin_longitude|     dest_latitude|    dest_longitude|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+----------+----------------+---------------+----------------+------------------+------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR|

In [35]:
df_with_coords_date = df_with_coords.withColumn("date", to_date(col("time_hour"), "yyyy-MM-dd HH:mm:ss"))
df_with_coords_date = df_with_coords_date.withColumn("next_day", date_add("date", 1))

limit_df = df_with_coords_date.sort(col("arr_delay").desc()).limit(5)

In [36]:
response_origin_df = limit_df.withColumn(
    "origin_wind_spd",
    fetch_weatherbit_api("origin_latitude", "origin_longitude", "date", "next_day" )
)

In [38]:
response_df = response_origin_df.withColumn(
    "dest_wind_spd",
    fetch_weatherbit_api("dest_latitude", "dest_longitude", "date", "next_day" )
)

In [39]:
response_df.show()

+-----+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+----------+----------------+---------------+----------------+-------------+--------------+----------+---------------+-------------+
|   id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|      date|day_of_week_name|origin_latitude|origin_longitude|dest_latitude|dest_longitude|  next_day|origin_wind_spd|dest_wind_spd|
+-----+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+----------+----------------+---------------+----------------+-------------+--------------+----------+---------------+-------------+
| 6651|2013|    

In [None]:
df_with_coords_date.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("/content/drive/MyDrive/picpay/df_with_date_coords.csv")