# Update circuits

This script searches for new circuits from the 'circuits.csv' file in the Minio object store. It compares them against circuits already listed in the database and inserts any new ones.

#### ToDo:
- Improve the matching logic, it is susceptible to false positives.
- Quarantine records that may be incorrect.
- Add a results reporting function.
- Add additional metadata.
- Make logic performance improvements.

In [1]:
import pandas as pd
import numpy as np
from io import BytesIO
from minio import Minio
from sqlalchemy import create_engine, text
from fuzzywuzzy import fuzz
from datetime import datetime
import re
import logging
import psycopg2
# from psycopg2.extras import execute_batch

In [2]:
# logging.basicConfig(format='%(asctime)s %(levelname)s - %(message)s', level=logging.INFO)
logger =logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger(__name__)
# logger=logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s')
# logger = logging.getLogger()

In [3]:
try:
    logger.info("Trying MinIO Initilization.")
    # Initialize Minio client
    minio_client = Minio(
        "minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )
    logger.info("MinIO Initialized .")
except Exception as e:
    logger.error("MinIO client initialisation error: %s", e)

try:
    # Download the CSV file from the Minio bucket into a pandas DataFrame
    data = minio_client.get_object("track.data-raw", "circuits.csv")
    data = BytesIO(data.read())
    df_csv = pd.read_csv(data)
    logger.info("Circuits file with %s records downloaded and read into DataFrame 'df_csv'", len(df_csv))
except Exception as e:
    logger.error("Circuits file download/read error: %s", e)
pd.set_option('display.max_rows', None)

2023-10-02 08:00:42.434 INFO 1018797224 - <module>: Trying MinIO Initilization.
2023-10-02 08:00:42.439 INFO 1018797224 - <module>: MinIO Initialized .
2023-10-02 08:00:42.504 INFO 1018797224 - <module>: Circuits file with 77 records downloaded and read into DataFrame 'df_csv'


In [4]:
df_csv.sort_values(by=['location','circuitRef', 'name'])

Unnamed: 0,circuitId,circuitRef,name,location,country,lat,lng,alt,url
23,24,yas_marina,Yas Marina Circuit,Abu Dhabi,UAE,24.4672,54.6031,3,http://en.wikipedia.org/wiki/Yas_Marina_Circuit
28,29,adelaide,Adelaide Street Circuit,Adelaide,Australia,-34.9272,138.617,58,http://en.wikipedia.org/wiki/Adelaide_Street_C...
75,78,losail,Losail International Circuit,Al Daayen,Qatar,25.49,51.4542,\N,http://en.wikipedia.org/wiki/Losail_Internatio...
46,47,anderstorp,Scandinavian Raceway,Anderstorp,Sweden,57.2653,13.6042,153,http://en.wikipedia.org/wiki/Scandinavian_Raceway
68,69,americas,Circuit of the Americas,Austin,USA,30.1328,-97.6411,161,http://en.wikipedia.org/wiki/Circuit_of_the_Am...
71,73,baku,Baku City Circuit,Baku,Azerbaijan,40.3725,49.8533,-7,http://en.wikipedia.org/wiki/Baku_City_Circuit
48,49,montjuic,Montjuïc,Barcelona,Spain,41.3664,2.15167,79,http://en.wikipedia.org/wiki/Montju%C3%AFc_cir...
66,67,pedralbes,Circuit de Pedralbes,Barcelona,Spain,41.3903,2.11667,85,http://en.wikipedia.org/wiki/Pedralbes_Circuit
60,61,avus,AVUS,Berlin,Germany,52.4806,13.2514,53,http://en.wikipedia.org/wiki/AVUS
65,66,bremgarten,Circuit Bremgarten,Bern,Switzerland,46.9589,7.40194,551,http://en.wikipedia.org/wiki/Circuit_Bremgarten


In [5]:
# Initialize connection to the PostgreSQL database using SQLAlchemy
try:
    logger.info("Trying database connection.")
    engine = create_engine('postgresql://admin:admin@pgdb/postgres')
    logger.info("Database connection successful.")
except Exception as e:
    logger.error("Error during database connection: %s", e)

2023-10-02 08:00:42.549 INFO 1615689807 - <module>: Trying database connection.
2023-10-02 08:00:42.608 INFO 1615689807 - <module>: Database connection successful.


In [6]:
# Query the database to get the current circuits
df_db = pd.read_sql("SELECT * FROM race_data.circuits", engine)

In [7]:
df_db.sort_values(by=['location','circuit_reference', 'name'])

Unnamed: 0,circuit_id,circuit_reference,name,location,lat,lng
22,23,Abu Dhabi,Yas Marina Circuit,Abu Dhabi,24.47,54.603
19,20,Austin,Circuit of The Americas,Austin,30.133,-97.641
14,15,Baku,Baku City Circuit,Baku,40.372,49.853
18,19,Hanoi,Hanoi Street Circuit,Hanoi,21.021,105.843
12,13,Imola,Autodromo Internazionale Enzo e Dino Ferrari,Imola,44.343,11.716
21,22,Jeddah,Jeddah Street Circuit,Jeddah,21.527,39.187
23,24,Las Vegas,Las Vegas Street Circuit,Las Vegas,36.169,-115.136
15,16,Paul Ricard,Circuit Paul Ricard,Le Castellet,43.251,5.793
1,2,Melbourne,Melbourne Grand Prix Circuit,Melbourne,-37.8497,144.968
20,21,Mexico,Autódromo Hermanos Rodríguez,Mexico City,19.406,-99.09


In [8]:
# Define a threshold for the fuzz.ratio. This depends on how strict you want your matching to be.
threshold = 80

# Logics

## Lookup Logic:
- Clean and normalise the data. Standardise all the non-englisht characters into english, replace special characters from data with a blank space, strip the data set.
- alter the table and add a new column to it to add created_date metadata information
- Keeping df_csf as left and df_db as right, perform a left join on location.

#### If records only in db_csv(left df):
- Initialise a blank df named 'df_insert' having the schema same as that of target table
- Initialise a blank df named 'df_quarantine' having the schema same as that of target table
- Perform a check to find potentially incorrect records and quarantine them
- Add all the non Quarantined records to 'df_quarantine'
- Insert rest of the records in 'df_insert'

#### If records in both the df:
- Since one location can have more than one circuits, find fuzzy match on name, circuit_ref and cordinates, if match not found, append the records to 'df_insert'.
- If match not found, do not load the records.

In [9]:
try:
    logger.info("Cleansing and normalisation initiated.")
    # remove special characters and non-alphanumeric characters
    remove_special_chars = lambda text: re.sub(r'[^a-zA-Z0-9\s]', '', text)
    
    # data cleansing and normalization for accent characters
    
    clean_normalize = lambda s: re.sub(r'[àáâãäå]', 'a', re.sub(r'[éèêë]', 'e', re.sub(r'[íìîïí]', 'i', re.sub(r'[òóôõö]', 'o', re.sub(r'[úùûü]', 'u', re.sub(r'[ýÿ]', 'y',s)))))) \
                                     .replace('-', ' ') \
                                    .replace('_', ' ') \
                                     .strip()
    
    # Creating list of columns to clean
    columns_to_clean_db = ['circuit_reference', 'name', 'location']
    columns_to_clean_csv = ['circuitRef', 'name', 'location']
    
    # Apply the lambda functions to the specified columns in both DataFrames
    for column in columns_to_clean_db:
        # fill na values with bulls before cleaning
        df_db[column] = df_db[column].fillna('').apply(str)
        #normalise and cleanse
        df_db[column] = df_db[column].apply(clean_normalize).apply(remove_special_chars)
    for column in columns_to_clean_csv:
        # fill na values with bulls before cleaning
        df_csv[column] = df_csv[column].fillna('').apply(str)
        #normalise and cleanse
        df_csv[column] = df_csv[column].apply(clean_normalize).apply(remove_special_chars)
    logger.info("Cleansing and normalisation completed.")
except Exception as e:
    logger.error("Error %s occured during cleansing and standardisation", e)

2023-10-02 08:00:42.718 INFO 2151163984 - <module>: Cleansing and normalisation initiated.
2023-10-02 08:00:42.731 INFO 2151163984 - <module>: Cleansing and normalisation completed.


In [10]:
df_csv.drop(['alt', 'url'], axis=1,inplace=True)
df_csv.sort_values(by=['location','circuitRef', 'name'])

Unnamed: 0,circuitId,circuitRef,name,location,country,lat,lng
23,24,yas marina,Yas Marina Circuit,Abu Dhabi,UAE,24.4672,54.6031
28,29,adelaide,Adelaide Street Circuit,Adelaide,Australia,-34.9272,138.617
75,78,losail,Losail International Circuit,Al Daayen,Qatar,25.49,51.4542
46,47,anderstorp,Scandinavian Raceway,Anderstorp,Sweden,57.2653,13.6042
68,69,americas,Circuit of the Americas,Austin,USA,30.1328,-97.6411
71,73,baku,Baku City Circuit,Baku,Azerbaijan,40.3725,49.8533
48,49,montjuic,Montjuic,Barcelona,Spain,41.3664,2.15167
66,67,pedralbes,Circuit de Pedralbes,Barcelona,Spain,41.3903,2.11667
60,61,avus,AVUS,Berlin,Germany,52.4806,13.2514
65,66,bremgarten,Circuit Bremgarten,Bern,Switzerland,46.9589,7.40194


In [11]:
df_db.sort_values(by=['location','circuit_reference', 'name'])

Unnamed: 0,circuit_id,circuit_reference,name,location,lat,lng
22,23,Abu Dhabi,Yas Marina Circuit,Abu Dhabi,24.47,54.603
19,20,Austin,Circuit of The Americas,Austin,30.133,-97.641
14,15,Baku,Baku City Circuit,Baku,40.372,49.853
18,19,Hanoi,Hanoi Street Circuit,Hanoi,21.021,105.843
12,13,Imola,Autodromo Internazionale Enzo e Dino Ferrari,Imola,44.343,11.716
21,22,Jeddah,Jeddah Street Circuit,Jeddah,21.527,39.187
23,24,Las Vegas,Las Vegas Street Circuit,Las Vegas,36.169,-115.136
15,16,Paul Ricard,Circuit Paul Ricard,Le Castellet,43.251,5.793
1,2,Melbourne,Melbourne Grand Prix Circuit,Melbourne,-37.8497,144.968
20,21,Mexico,Autodromo Hermanos Rodriguez,Mexico City,19.406,-99.09


Performing a left join on Location between the two data frames

- Here we will use location column to merge the dataframes


In [12]:
merged_df_csv=df_csv.merge(df_db, on='location', how='left', indicator=True)[['circuitId','circuitRef','name_x','location','country','lat_x','lng_x','_merge']].sort_values(by=['location'])

# To get better performance, we can also set the index of both dtaframes as location and Later we will reset the index

In [13]:
merged_df_csv.dtypes 

circuitId        int64
circuitRef      object
name_x          object
location        object
country         object
lat_x          float64
lng_x          float64
_merge        category
dtype: object

#### Old data loader

Improvements made:
- Removed itterrows as it is very inefficient
- Iterating over numpy array instead, hightly effecient
- Using data based on the merge perfomed earlier to filter data and improve performance
- Added logic to identify and quarantine potentially error records
- Removee the logic that was inserting each row at a time. New logic creates a dataframe of all the records to be inserted and then inserts the whole dataframe all at once to the table. Inserting each row everytime in each iteration is a very costly logic.

In [14]:
def result_report(insert_list: list , quarantine_lst: list):
    try:
        logger.info("Report - Total # of new circuits inserted is: %d, Total # of circuits quarantined for analysis is: %d", len(insert_list), len(quarantine_lst))
    except Exception as e:
        logger.error("Error in report_results function: %s", e)

In [15]:
try:
    logger.info("Analysing and filtering datasets")
    quarantine_lst=[]
    insert_list=[]
    for row_csv in merged_df_csv.to_numpy():
        csv_circuit_ref = row_csv[1]
        csv_circuit_name = row_csv[2]
        csv_circuit_lat = round(row_csv[5],2)
        csv_circuit_lng = round(row_csv[6],2)
        
        match_found = False
        
        for row_db in df_db.to_numpy():
            db_circuit_ref = row_db[1]
            db_circuit_name = row_db[2]
            db_circuit_lat = round(row_db[4],2)
            db_circuit_lng = round(row_db[5],2)
    
            circuit_ref_score=fuzz.ratio(csv_circuit_ref.lower(), db_circuit_ref.lower())
            name_score=fuzz.ratio(csv_circuit_name.lower(), db_circuit_name.lower())
            lat_score=fuzz.ratio(str(csv_circuit_lat).lower(), str(db_circuit_lat).lower())
            lng_score=fuzz.ratio(str(csv_circuit_lng).lower(), str(db_circuit_lng).lower())
            
            
            # Quarantine the potentially incorrect if the names or coordinates of csv record 
            # are similar to db record but location in csv file is different
            # The records are termed as errored if the location is different but still either coordinates or name or circuit reference is same
            # This is because one location can have multiple circuits and multiple names or coordinates or circuit references
            # But same coordinates or circuit reference should not be present in diffrent locations
            if row_csv[7] == 'left_only':
                if ((lat_score>=threshold and lng_score>=threshold and circuit_ref_score>=threshold) or name_score>=95 or circuit_ref_score>=threshold):
                    # add a logger saying potentially incorrect record found, quarantining the same
                    csv_circuit_quarantine = {'circuitId': row_csv[0],'circuitRef': row_csv[1],'name': row_csv[2],'location': row_csv[3],'lat': row_csv[5],'lng': row_csv[6]}
                    quarantine_lst.append(csv_circuit_quarantine)
                    # print("==================")
                    # print(row_csv)
                    # print(row_db)
                    match_found = True
                    break
            
            # Records matched if the names or coordinates of csv record are similar to db record
            # and location of both is same. So no action on them
            elif row_csv[7] == 'both':
                if ((lat_score>=threshold and lng_score>=threshold and circuit_ref_score>=threshold) or name_score>=threshold):
                    match_found = True
                    break
    
        if not match_found:
            csv_circuit = {'circuit_reference': row_csv[1],'name': row_csv[2],'location': row_csv[3],'lat': row_csv[5],'lng': row_csv[6]}
            insert_list.append(csv_circuit)
    # quarantine_lst
    # insert_list
    result_report(insert_list, quarantine_lst)
except Exception as e:
    logger.error("Error in processing the data: %s", e)

2023-10-02 08:00:42.875 INFO 1410904026 - <module>: Analysing and filtering datasets
2023-10-02 08:00:42.903 INFO 3631209349 - result_report: Report - Total # of new circuits inserted is: 53, Total # of circuits quarantined for analysis is: 4


Converting the list of dictionaries into dataframes for efficient inserting into database and for better analysis of quarantined records

In [16]:
df_insert = pd.DataFrame(insert_list)
df_quarantine = pd.DataFrame(quarantine_lst)

Logic to fetch the last circuits id in the postgres table and then insert the unmatched records with incremented ids

In [17]:
conn_string = 'postgresql://admin:admin@pgdb/postgres'
  
db = create_engine(conn_string)
conn = db.connect()
conn = psycopg2.connect(conn_string)
conn.autocommit = True
cursor = conn.cursor()
  
# sql1 = '''select max(circuit_id) from race_data.circuits;'''
# cursor.execute(sql1)
# for i in cursor.fetchall():
#     max_id=i[0]
# df_insert.index += max_id
# df_insert_full=df_insert.reset_index().rename(columns={'index':'circuit_id'})
table_name = 'circuits'
df_insert.to_sql(table_name, engine, if_exists='append', index=False, schema='race_data')
conn.close()

In [18]:
## Check for how the dataframe lookslike now
df_db = pd.read_sql("SELECT * FROM race_data.circuits", engine)
df_db

Unnamed: 0,circuit_id,circuit_reference,name,location,lat,lng
0,1,Montreal,Circuit Gilles-Villeneuve,Montreal,45.506,-73.525
1,2,Melbourne,Melbourne Grand Prix Circuit,Melbourne,-37.8497,144.968
2,3,Spielberg,Red Bull Ring,Spielberg,47.223,14.761
3,4,Silverstone,Silverstone Circuit,Silverstone,52.072,-1.017
4,5,Barcelona,Circuit de Barcelona-Catalunya,Montmelo,41.569,2.261
5,6,Spa,Circuit de Spa-Francorchamps,Spa Francorchamps,50.436,5.971
6,7,Monza,Autodromo Nazionale Monza,Monza,45.621,9.29
7,8,Sochi,Sochi Autodrom,Sochi,43.407,39.96
8,9,Nurburgring,Nürburgring,Nürburg,50.334,6.943
9,10,Portimao,Autódromo Internacional do Algarve,Portimão,37.232,-8.628


In [19]:
## Check for false positives

In [20]:
# Define the last_index variable to track the last index used in the DataFrame
last_index = 0

# Define an empty DataFrame
duplicates = pd.DataFrame(columns=['index1', 'index2', 'circuit_reference1', 'circuit_reference2', 'name1', 'name2', 'location1', 'location2', 'score'])

for i in range(len(df_db)):
    for j in range(i+1, len(df_db)):
        circuit_reference1 = df_db.iloc[i]['circuit_reference']
        circuit_reference2 = df_db.iloc[j]['circuit_reference']
        name1 = df_db.iloc[i]['name']
        name2 = df_db.iloc[j]['name']
        location1 = df_db.iloc[i]['location']
        location2 = df_db.iloc[j]['location']
        
        # Calculate the fuzzy match score for circuit_reference, name and location fields
        circuit_reference_score = fuzz.ratio(circuit_reference1.lower(), circuit_reference2.lower())
        name_score = fuzz.ratio(name1.lower(), name2.lower())
        location_score = fuzz.ratio(location1.lower(), location2.lower())
        
        # If the score is above a threshold (e.g. 80) for circuit_reference, name or location, consider them as potential duplicates
        if ((circuit_reference_score > 80 or name_score > 80) and location_score > 80):
            duplicates.loc[last_index] = {
                'index1': df_db.iloc[i]['circuit_id'],
                'index2': df_db.iloc[j]['circuit_id'],
                'circuit_reference1': circuit_reference1,
                'circuit_reference2': circuit_reference2,
                'name1': name1,
                'name2': name2,
                'location1': location1,
                'location2': location2,
                'score': max(circuit_reference_score, name_score, location_score)
            }
            last_index += 1

# Show the potential duplicates
duplicates

Unnamed: 0,index1,index2,circuit_reference1,circuit_reference2,name1,name2,location1,location2,score


                                                         # End of the notebook