In [1]:
import pandas as pd
from bs4 import BeautifulSoup
import bs4
import requests
import boto3
from botocore.exceptions import ClientError
from io import StringIO
import os
import html5lib
from datetime import datetime

In [2]:
print(f'pandas=={pd.__version__}')
print(f'bs4=={bs4.__version__}')
print(f'requests=={requests.__version__}')
print(f'html5lib=={html5lib.__version__}')

pandas==1.5.3
bs4==4.12.2
requests==2.29.0
html5lib==1.1


# 1- EXTRACT

In [4]:
# Race id's per season
season_2017_id = ['972','974','975','976','977','979','978','980','981','982','973']
season_2018_id = ['983','984','984','985','986','987','988','989','990','991','992','993','994']
season_2019_id = ['1007','996','997','1008','999','1000','1001','1002','1003','1004','1005','1006']
season_2020_id = ['1012','1021','1015','1014','1022','1010','1016','1017','1023','1024','1025','1026']
season_2021_id = ['1027','1028','1029','1030','1031','1032','1033','1034']
season_2022_id = ['1035','1036','1037','1038','1039','1040','1041','1042','1049','1043','1044','1045','1046','1048']
season_2023_id = ['1050','1051','1052','1053','1055','1056']

all_race_ids = []
all_race_ids.extend(season_2017_id)
all_race_ids.extend(season_2018_id)
all_race_ids.extend(season_2019_id)
all_race_ids.extend(season_2020_id)
all_race_ids.extend(season_2021_id)
all_race_ids.extend(season_2022_id)
all_race_ids.extend(season_2023_id)

## 1.1- Web scraping

In [3]:
def extract_data(raceid):
    """
    Extracts data from the FIA Formula 2 website for the given race IDs.
    
    Args:
        raceid (str): str with the race IDs. unfortunately they do not have a clear order
        
    Returns:
        pandas.DataFrame: Extracted data as a DataFrame.
        whit data from Race, Sprint Race/Practice2, Practice1 and Qualy of the event
    """ 

    ENDPOINT = 'http://www.fiaformula2.com/Results?raceid='
    # Create an empty DataFrame to store the extracted data
    df = pd.DataFrame()  
    # Request the data
    url = requests.get(ENDPOINT + raceid)
    # Create a BeautifulSoup object  
    soup = BeautifulSoup(url.text, 'html.parser')    
    # Read HTML tables from the web page
    raw = pd.read_html(url.text)  
    
    circuit = soup.find(class_='country-circuit').text  # Extract circuit information
    schedule = soup.find(class_='schedule').text  # Extract schedule information
    tables = soup.find_all('table')  # Find all tables on the web page (4 tables)
    events = soup.find_all(class_='collapsible-header')  # Find all collapsible headers (Event information)
    
    for index_out in range(len(tables)):
        pos = tables[index_out].find_all('div', class_='pos')  # Find all pilot race positions in the table
        car_no = tables[index_out].find_all('div', class_='car-no')  # Find car numbers in the table
        names = tables[index_out].find_all('div', class_='driver-name')  # Find driver names in the table

        event = events[index_out].find('span').text  # Extract event information
        
        # Create empty lists to store extracted data for each row
        pos_data = []
        car_no_data = []
        driver_name_data = []
        team_name_data = []
        circuit_data = []
        schedule_data = []
        event_data = []

        for index_in in range(len(pos)):
            # Append extracted data to the respective lists
            pos_data.append(pos[index_in].text)
            car_no_data.append(car_no[index_in].text)
            driver_name_data.append(names[index_in].find_all(class_='visible-desktop-up')[0].text)
            team_name_data.append(names[index_in].find_all(class_='team-name')[0].text)
            circuit_data.append(circuit)
            schedule_data.append(schedule)
            event_data.append(event)

        raw[index_out].drop('POSNumber / Driver and TeamNo / Driver', axis=1, inplace=True)
        raw[index_out]['POS'] = pos_data
        raw[index_out]['CAR'] = car_no_data
        raw[index_out]['PILOT NAME'] = driver_name_data
        raw[index_out]['TEAM'] = team_name_data
        raw[index_out]['CIRCUIT'] = circuit_data
        raw[index_out]['SCHEDULE'] = schedule_data
        raw[index_out]['TYPE'] = event_data

        # Concatenate each table (Race, Sprint Race/Practice2, Practice1 and Qualy) data to the full DataFrame
        df = pd.concat([df, raw[index_out]], ignore_index=True)
    
    return df

## 1.2 Simple transformation

In [4]:
def transform_data(df):
    """
    Transforms the given DataFrame by extracting and formatting specific columns.

    Args:
        df (pandas.DataFrame): The DataFrame to be transformed.

    Returns:
        None
    """

    round_num = df['SCHEDULE'].apply(lambda x: x.split('|')[0])  # Extract round number from 'SCHEDULE' column
    date = df['SCHEDULE'].apply(lambda x: x.split('|')[1].split('-')[1])  # Extract date from 'SCHEDULE' column (Only the race date)

    df.drop('SCHEDULE', axis=1, inplace=True)  # Drop unnecessary columns from the DataFrame

    df['ROUND'] = round_num  # Add 'ROUND' column with extracted round numbers
    df['DATE'] = pd.to_datetime(date)  # Convert extracted date to datetime format

    return None

## 1.3 Test code

In [5]:
#for race_id in all_race_ids:
#    df = extract_data(race_id)
#    transform_data(df)
#    df.to_csv(f'DATA/id/{race_id}.csv',index=False)

# 2- TRANSFORM

## 2.1- Agroup

In [6]:
def reduce_data(race):
    """
    Reduce the data in the 'race' DataFrame by replacing values in the 'TYPE' column and adding a new 'QUALI TYPE' column.

    Args:
        race (pandas.DataFrame): DataFrame containing the race data.

    Returns:
        None. The 'race' DataFrame is modified in place.
    """
    def replace_ab(type):
        if type == 'Qualifying Group B':
            return 'Group B'
        if type == 'Qualifying Group A':
            return 'Group A'
        else:
            return 'Unique'
    
    # Replace values in 'TYPE' column using the 'replace_ab' function
    race['QUALI TYPE'] = race['TYPE'].apply(replace_ab)
    
    # Replace specific values in 'TYPE' column
    race['TYPE'].replace(['Qualifying Group B', 'Qualifying Group A'],'Qualifying Session',inplace=True)
    race['TYPE'].replace('Sprint Race 1','Sprint Race',inplace=True)

## 2.2 Format

In [7]:
def format_data(race):
    """
    Extract specific data from the 'race' DataFrame based on the 'TYPE' column.

    Args:
        race (pandas.DataFrame): DataFrame containing the race data.

    Returns:
        dict: A dictionary containing the extracted data for each race type.
    """
    
    db_dict = {
        'Sprint Race': None,
        'Feature Race': None,
        'Qualifying Session': None,
        'Free Practice': None,
        'Sprint Race 2': None
    }
    
    db_drop = {
        'Sprint Race': ['LAP SET ON','QUALI TYPE'],
        'Feature Race': ['LAP SET ON','QUALI TYPE'],
        'Qualifying Session': ['BEST','LAP'],
        'Free Practice': ['BEST','LAP'],
        'Sprint Race 2': ['LAP SET ON','QUALI TYPE'],
    }

    for key in db_dict:
        # Extract data for each race type
        db_dict[key] = race[race['TYPE'] == key]
        
        if key == 'Free Practice':
            # Drop 'QUALI TYPE' column for 'Free Practice' race type (did not work with de dict)
            db_dict[key] = db_dict[key].drop('QUALI TYPE',axis=1)
        try:
            # Drop additional columns specified in 'db_drop' dictionary (FIA change the metrics, so we need to handle exceptions)
            db_dict[key] = db_dict[key].drop(db_drop[key],axis=1)
        except:
            None
        
    return(db_dict)

## 2.3 Test code

In [None]:
import glob
from pathlib import Path

path = r'C:\Users\alarc\OneDrive - Instituto Tecnológico de Culiacán\DEV\Portfolio\test-F2-database-ETL\DATA\id'

all_racing = []
for file in Path(path).glob('**/*.csv'):
    df = pd.read_csv(file)
    all_racing.append(df)
    
len(all_racing)

In [None]:
for race in all_racing:
    reduce_data(race)

In [None]:
new_data_db = {
        'Sprint Race': pd.DataFrame(),
        'Feature Race': pd.DataFrame(),
        'Qualifying Session': pd.DataFrame(),
        'Free Practice': pd.DataFrame(),
        'Sprint Race 2': pd.DataFrame()
    }

for race in all_racing:
    db = extract_data(race)
    for key in db:
        new_data_db[key] = pd.concat([new_data_db[key],db[key]])

### 2.3.1 Save localy

In [None]:
for key,value in all_data.items():
    filename = key.replace(' ','-')
    value.to_csv(f'DATA/event/{filename}.csv',index=False)

# 3- LOAD

## 3.1 Read file

In [None]:
db_files = ['Sprint-Race','Feature-Race','Qualifying-Session','Free-Practice','Sprint-Race-2']
database_s3 = {}

for file in db_files:
    key = f'{file}.csv'
    response = s3.get_object(Bucket='f2-events-db',Key=key)
    status = response['ResponseMetadata']['HTTPStatusCode']
    
    if status == 200:
        database_s3[file] = pd.read_csv(response['Body'])
        print(f'Status - {status}: successful S3 get_object response. Key: {key}')
    else:
        print(f"Status - {status}: unsuccessful S3 get_object response.")

In [None]:
client = boto3.client('s3')
bucket = 'f2-events-db'
for key,value in database_s3.items():
    new_data_key = key.replace('-',' ')
    concatenated_db = pd.concat([value,new_data_db[new_data_key]])
    
    
    csv_buffer = StringIO()
    concatenated_db.to_csv(csv_buffer,index=False)
    
    client.put_object(
        Body=csv_buffer.getvalue(),
        Bucket=bucket,
        Key=f'{key}.csv')
    print(f'successful S3 put_object response. Key: {key}.csv')

# 4- Update localy

In [22]:
new_race = '1057'

In [23]:
new_data = extract_data(new_race)
reduce_data(new_data)
new_data.to_csv(f'DATA/id/race_id_{new_race}.csv')

In [2]:
db_files = ['Sprint-Race','Feature-Race','Qualifying-Session','Free-Practice','Sprint-Race-2']
database_s3 = {}

for file in db_files:
    database_s3[file] = pd.read_csv(f'DATA/event/{file}.csv')

In [25]:
reduce_data(new_data)
new_entry = format_data(new_data)

In [34]:
for key,value in database_s3.items():
    entry_key = key.replace('-',' ')
    updated_df = pd.concat([value,new_entry[entry_key]])
    updated_df.to_csv(f'DATA/event/{key}.csv',index=False)

# 5 Concat all

In [11]:
df = pd.DataFrame()
for value in database_s3.values():
    df = pd.concat([df,value],ignore_index=True)

In [12]:
df.to_csv('DATA/full_data.csv',index=False)