# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [9]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5



You are already connected to a glueetl session 6d63a357-6829-45d9-8e99-6db65b5e2131.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 6d63a357-6829-45d9-8e99-6db65b5e2131.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 6d63a357-6829-45d9-8e99-6db65b5e2131.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 6d63a357-6829-45d9-8e99-6db65b5e2131.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [130]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import pandas as pd
import requests
import json
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [11]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='f1_target_db', table_name='races')
dyf.printSchema()

root
|-- raceid: long
|-- round: long
|-- circuitid: long
|-- name: string
|-- date: string
|-- time: string


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [12]:
spark_df = dyf.toDF()
spark_df.show()

+------+-----+---------+--------------------+----------+--------+
|raceid|round|circuitid|                name|      date|    time|
+------+-----+---------+--------------------+----------+--------+
|     1|    1|        1|Australian Grand ...|2009-03-29|06:00:00|
|     2|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|
|     3|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|
|     4|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|
|     5|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00|
|     6|    6|        6|   Monaco Grand Prix|2009-05-24|12:00:00|
|     7|    7|        5|  Turkish Grand Prix|2009-06-07|12:00:00|
|     8|    8|        9|  British Grand Prix|2009-06-21|12:00:00|
|     9|    9|       20|   German Grand Prix|2009-07-12|12:00:00|
|    10|   10|       11|Hungarian Grand Prix|2009-07-26|12:00:00|
|    11|   11|       12| European Grand Prix|2009-08-23|12:00:00|
|    12|   12|       13|  Belgian Grand Prix|2009-08-30|12:00:00|
|    13|  

In [23]:
# Convert spark df to pandas
df = dyf.toDF().toPandas()
df['date'] = pd.to_datetime(df['date'])
df.head()

   raceid  round  circuitid                   name        date      time
0       1      1          1  Australian Grand Prix  2009-03-29  06:00:00
1       2      2          2   Malaysian Grand Prix  2009-04-05  09:00:00
2       3      3         17     Chinese Grand Prix  2009-04-19  07:00:00
3       4      4          3     Bahrain Grand Prix  2009-04-26  12:00:00
4       5      5          4     Spanish Grand Prix  2009-05-10  12:00:00


In [92]:
existing_rounds_2024 = df[df['date'].dt.year == 2024]['round'].unique()
list(existing_rounds_2024)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]


In [123]:
def get_2024_races_data(exclude_rounds = []):
    response = requests.get(url='https://ergast.com/api/f1/2024.json')
    races_2024_df = pd.DataFrame.from_dict(response.json()['MRData']['RaceTable']['Races'])
    races_2024_df['round'] = races_2024_df['round'].astype(int)
    races_2024_df.drop(columns=['url', 'season', 'FirstPractice', 'SecondPractice', 'ThirdPractice', 'Qualifying', 'Sprint', 'Circuit'], inplace=True)

    return races_2024_df[~races_2024_df['round'].isin(exclude_rounds)]




In [124]:
new_races = get_2024_races_data()




In [125]:
new_races_spark = spark.createDataFrame(new_races)
new_races_spark.show()

+-----+--------------------+----------+---------+
|round|            raceName|      date|     time|
+-----+--------------------+----------+---------+
|    1|  Bahrain Grand Prix|2024-03-02|15:00:00Z|
|    2|Saudi Arabian Gra...|2024-03-09|17:00:00Z|
|    3|Australian Grand ...|2024-03-24|04:00:00Z|
|    4| Japanese Grand Prix|2024-04-07|05:00:00Z|
|    5|  Chinese Grand Prix|2024-04-21|07:00:00Z|
|    6|    Miami Grand Prix|2024-05-05|20:00:00Z|
|    7|Emilia Romagna Gr...|2024-05-19|13:00:00Z|
|    8|   Monaco Grand Prix|2024-05-26|13:00:00Z|
|    9| Canadian Grand Prix|2024-06-09|18:00:00Z|
|   10|  Spanish Grand Prix|2024-06-23|13:00:00Z|
|   11| Austrian Grand Prix|2024-06-30|13:00:00Z|
|   12|  British Grand Prix|2024-07-07|14:00:00Z|
|   13|Hungarian Grand Prix|2024-07-21|13:00:00Z|
|   14|  Belgian Grand Prix|2024-07-28|13:00:00Z|
|   15|    Dutch Grand Prix|2024-08-25|13:00:00Z|
|   16|  Italian Grand Prix|2024-09-01|13:00:00Z|
|   17|Azerbaijan Grand ...|2024-09-15|11:00:00Z|


In [None]:
new_races_dynamic_frame = DynamicFrame.fromDF(new_races_spark, glueContext, 'new_races')
glueContext.write_dynamic_frame.from_options(
    frame=new_races_dynamic_frame,
    connection_type="s3",
    connection_options={"path": 's3://ie6750-f1-dwh/new_races/new_races.csv'},
    format="csv"
)

In [None]:
job.commit()