# F1 Fantasy Program

## ReadMe

The goal of this program is to webscrape, and gather, F1 data. The data would then be saved, transformed, and analysed.

# Imports

#### General

In [5]:
import sys
sys.path.append('modules')

from datetime import datetime
import requests

#### FastF1

In [6]:
import fastf1_extract as ff1

#### Webscrape

In [7]:
import f1_webscrape_extract as f1ws

#### MySQL

In [8]:
import f1stats_database as f1db

### Ignore Warnings

In [9]:
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)

# Global Variables & Settings

### Loaded races logfile name

In [10]:
loaded_races_logfile = 'loaded_races_logfile.txt'

# Save to CSV file

In [11]:
def save_to_csv(target_csv, data_to_load):
    data_to_load.to_csv(target_csv) #, mode='a', header=False, index=False)

# Extract: FastF1

### Log Loaded Race

In [12]:
def log_loaded_race(race_number, loaded_races_logfile = 'loaded_races_logfile.txt'):
	timestamp_format = '%Y-%h-%d-%H:%M:%S' #Year-Monthname-Day-Hour-Minute-Second
	now = datetime.now()
	timestamp = now.strftime(timestamp_format)
	with open(loaded_races_logfile, 'a') as log:
		log.write(timestamp + ','+str(race_number)+'\n')

### Read logfile

In [13]:
def read_loaded_races_logfile(loaded_races_logfile = 'loaded_races_logfile.txt'):
	loaded_races = []
	with open(loaded_races_logfile, 'r') as log:
		for line in log.readlines():
			print(line)
			fields = line.strip().split(',')
			loaded_races.append(int(fields[1]))
	
	return loaded_races

### Find missing races

In [14]:
def find_missing_races(races_list: list, season: int):
	# given a list of races, and the count of completed races, it finds the numbers that are missing
	if len(races_list) == 0:
		races_list = [0]
	missing = set(range(min(races_list), ff1.get_completed_events_count(season)+1)) - set(races_list)
	return list(missing)

# Webscraping

In [15]:
# f1ws.TODO_SORT_INTO_FUNCTIONS()

# Transform: Extract select columns from Data, into CSV files

In [16]:
def extract_data_into_csv(season=2023, session_type='R'):
	event_number = 0

	loaded_races = read_loaded_races_logfile()
	# print(loaded_races)
	missing_races = find_missing_races(loaded_races, season)
	# print(missing_races)

	races_csv_folder = 'races_csv/'
	for event_num in missing_races:
		session = ff1.get_session_data(season, event_num, session_type) # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! TODO only relevant columns must be saved into the csv files
		save_to_csv(races_csv_folder+str(session.event.gp)+'_'+str(session.event.date)+'.csv',session.results)
		log_loaded_race(event_num) # maybe move this to after the data is actually loaded into the database

# MySQL Database f1stats

In [17]:
# connect to MySQL database and return db_object
mydb = f1db.connect_to_database()



# Execute Functions
def execute_db_insert_functions():
	with mydb.cursor() as mycursor:
		try:
			# Call insert functions here

			mydb.commit()
		except Exception as e:
			print(f"An error occurred: {e}")
			mydb.rollback()
			raise



def get_standings():
    # Define the API endpoint URL
    url = "https://fantasy.formula1.com/api/f1/2021/standings"

    try:
        # Make a GET request to the API endpoint
        response = requests.get(url)

        # Check if the request was successful
        if response.status_code == 200:
            # Parse the JSON data from the response
            data = response.json()

            # Access the data as needed
            standings = data['standings']
            return standings
        else:
            # Handle the error if the request was not successful
            print(f"Failed to retrieve data from the API. Response code: {response.status_code}")
            return None
    except Exception as e:
        # Handle any unexpected exceptions
        print(f"An error occurred while connecting to the API: {e}")
        return None

# ETL Pipeline

In [18]:
# from datetime import timedelta
# from airflow import DAG
# from airflow.operators.python_operator import PythonOperator
# from airflow.utils.dates import days_ago

# default_args = {
#     'owner': 'airflow',
#     'depends_on_past': False,
#     'email_on_failure': False,
#     'email_on_retry': False,
#     'retries': 1,
#     'retry_delay': timedelta(minutes=5),
# }

# dag = DAG(
#     'f1_stats_etl',
#     default_args=default_args,
#     description='F1 Stats ETL Pipeline',
#     schedule_interval=timedelta(days=1),
#     start_date=days_ago(1),
#     catchup=False,
# )

# def extract_data():
#     extract_fastf1_data()
#     extract_selenium_data()

# def transform_and_load_data():
#     transformed_data = transform_data()
#     load_data_to_mysql(transformed_data)

# extract_data_task = PythonOperator(
#     task_id='extract_data',
#     python_callable=extract_data,
#     dag=dag,
# )

# transform_and_load_data_task = PythonOperator(
#     task_id='transform_and_load_data',
#     python_callable=transform_and_load_data,
#     dag=dag,
# )

# extract_data_task >> transform_and_load_data_task



In [19]:
# create a fibonacci counter
def fibonacci_counter(n):
	if n == 0:
		return 0
	elif n == 1:
		return 1
	else:
		return fibonacci_counter(n-1) + fibonacci_counter(n-2)

In [20]:
print(fibonacci_counter(6))



8
