#### OLYMPICS-GPT : A Rule-Based Python Chatbot by BCS221093(Muhammad Awais)

> ##### Whole code can be found on Github repo([github.com/awais-124/Olympics-GPT](https://github.com/awais-124/Olympics-GPT.git)). The following notebook doesnot contains dataset and constants files. To run the code, get it from github. Adding whole dataset and constants file to this notebook and printing it is not possible. Because data set more than 300,000 records. 

In [1]:
import pandas as pd
import random
import string
import json

#### IMPORTING DATASETS

In [2]:
bios_df = pd.read_csv('./cleaned_data/bios.csv')
results_df = pd.read_csv('./cleaned_data/results.csv')

##### FILE HANDLING FUNCTIONS

In [3]:
def add_key_value_to_file(file_path, key, value):
    data = get_file_data(file_path)
    if data == -1: return
    data[key] = value

    try:
        with open(file_path, 'w') as file:
            json.dump(data, file, indent=4)
    except Exception as e:
        print("-" * 50)
        print(f"Error writing to file: {str(e)}")
        print("-" * 50)

def get_file_data(file_path):
    try:
        with open(file_path, 'r') as file:
            data = json.load(file)
        return data
    except FileNotFoundError:
        print("-" * 50)
        print(f"Error: File not found at '{file_path}'.")
        print("-" * 50)
        return -1
    except json.JSONDecodeError:
        print("-" * 50)
        print(f"Error: Invalid JSON format in file at '{file_path}'.")
        print("-" * 50)
        return -1
    except Exception as e:
        print("-" * 50)
        print(f"Error: {str(e)}")
        print("-" * 50)
        return -1

##### ACTUATOR FUNCTIONS FOR FORMATTING OUTPUTS

In [4]:
from dataExtractorMethods import * 
from chatbotConstants import *

def show_athlete_stats(discipline):
    avg_data = get_athlete_stats(discipline)
    if  type(avg_data) == bool:
        return "_"*50
    output = f"{avg_data["count"]} players participated with average height = {avg_data["avg_height"]} and average weight = {avg_data["avg_weight"]}"
    return output

def show_athlete_medals(athlete_name):
    all_medals = get_athlete_medals(athlete_name)
    if type(all_medals) == bool:
        return "_"*50
    output=f"Following medals are won by {all_medals["name"]}: \n"
    for index, medal in all_medals["medals"].iterrows():
        output+=f"{medal['medal']} medal in {medal['discipline']} in the year {int(medal['year'])} at {medal["event"]}."
    return output

def show_country_performance(noc, year=None):
    medals_count=get_country_performance(noc, year)
    if  type(medals_count) == bool:
        return "_"*50
    output = f"Silver medals = {medals_count.get('Silver',0)}\nGold medals = {medals_count.get('Gold',0)}\nBronze medals = {medals_count.get("Bronze",0)}"
    return output

def show_athlete_participations(athlete_name):
    parts=get_athlete_participations(athlete_name)
    if type(parts) == bool:
        return "_"*50
    output=""
    for _, row in parts["part_of"].iterrows():
        output += f"In {int(row['year'])}, {parts["name"]} competed in the {row['discipline']} {row['event']} event, finishing in {int(row['place'])}th place and winning a {row['medal']} medal.\n"
    return output

def show_athletes_by_sport(sport, year=None):
    athletes_by_sport=find_athletes_by_sport(sport, year)
    if type(athletes_by_sport) == bool:
        return "_"*50
    counter=1
    output=""
    for index, athletes in athletes_by_sport.iterrows():
        output+=f"{counter}: Athlete id={index} -> {athletes["name"]}  participated from {athletes["NOC"]}.\n"
        counter+=1
    return output

def show_medalists_by_year(year, discipline=None): 
    medalists = get_medalists_by_year(year, discipline)
    if  type(medalist) == bool:
        return "_"*50
    output=""
    for index, medalist in medalists.iterrows():
        output+=f"{index}. {medalist['name']} from {medalist['NOC']} won a {medalist['medal']} medal in {medalist['event']} ({medalist['discipline']}).\n"
    return output

def show_athlete_bio(athlete_name):
    athlete=get_athlete_bio(athlete_name)
    if type(athlete) == bool:
        return "_"*50
    output=f"Athlete ID: {athlete['athlete_id']}"
    output+=f"\nName: {athlete['name']}"
    output+=f"\nDate of Birth: {athlete['born_date']} (Born in {athlete['born_city']}, {athlete['born_region']}, {athlete['born_country']})"
    output+=f"\nHeight: {athlete['height_cm']} cm"
    output+=f"\nWeight: {athlete['weight_kg']} kg"
    output+=f"\nDate of Death: {athlete['died_date']}"
    return output
    

def interpret_question(question: str):
    """Match user question to appropriate dynamic queries"""
    avg_in_question = any(word in question.lower() for word in ["average", "avg", "mean", "averages"]) 
    medals_in_question = any(word in question.lower() for word in ["medals", "medal", "awards", "award","trophies","trophy"])   
    athlete_bio_in_question = any(word in question.lower() for word in ["bio","information about","personal information", "personal info","bio data","biodata"])
    sport_in_question = any(word.lower() in question.lower() for word in results_df["discipline"].unique())
    country_in_question = any(word.lower() in question.lower() for word in bios_df["NOC"])
    participation_in_question=any(word in question.lower() for word in ["participated","took part in","participate","was part of","participations","included","played in"])
    

    if avg_in_question:
        for sport in results_df['discipline'].unique():
            if sport.lower() in question.lower():
                return show_athlete_stats(sport)
        return show_athlete_stats("Tennis")
    
    if country_in_question and medals_in_question:
        for country in bios_df['NOC']:
            if country.lower() in question.lower():
                return show_country_performance(country)
            
    if medals_in_question:
        if any(year in question for year in map(str, range(1900, 2024))):
            year = int(''.join(filter(str.isdigit, question)))
            if any(sport.lower() in question.lower() for sport in results_df["discipline"].unique()):
                sport = next(s for s in results_df["discipline"].unique() if s.lower() in question.lower())
                return show_medalists_by_year(year, sport)
            return show_medalists_by_year(year)
            
        if any(name.lower() in question.lower() for name in bios_df['name']):
            name = next(name for name in bios_df['name'] if name in question)
            return show_athlete_medals(name)
    
    if athlete_bio_in_question:
        for name in bios_df['name']:
            if name.lower() in question.lower():
                return show_athlete_bio(name)
    
    if sport_in_question:
        for sport in results_df['discipline'].unique():
            if sport.lower() in question.lower():
                if any(year in question for year in map(str, range(1900, 2024))):
                    year = int(''.join(filter(str.isdigit, question)))
                    return show_athletes_by_sport(sport, year)
                return show_athletes_by_sport(sport)
            
    if participation_in_question:
        for name in bios_df['name']:
            if name.lower() in question.lower():
                return show_athlete_participations(name)
            
    return default_response

#### DATA EXTRACTOR METHODS

In [5]:
def get_athlete_medals(athlete_name):
    try:
        athlete_data = bios_df[bios_df['name'].str.lower() == athlete_name.lower()]
        if athlete_data.empty:
            print(f"Error: Athlete '{athlete_name}' not found in database")
            return False
        athlete_id = athlete_data['athlete_id'].iloc[0]
        
        medals = results_df[
            (results_df['athlete_id'] == athlete_id) & 
            (results_df['medal'].notna())
        ]
        
        if medals.empty:
            print(f"No medals found for athlete '{athlete_name}'")
            return False
            
        return { 
                "name":athlete_name,
                "medals":medals[['year', 'discipline', 'event', 'medal']]
            }
        
    except Exception as e:
        print(f"Error processing data for athlete '{athlete_name}': {str(e)}")
        return False

def get_country_performance(noc, year=None):
    try:
        with open('noc_mappings.json', 'r') as json_file:
            country_noc_map = json.load(json_file)
        noc_code = country_noc_map.get(noc, "404")  
        print(f"NOC CODE FOUND is {noc_code} for {noc}")  
        if noc_code == "404":
            print(f"Error: Country '{noc}' not found in database")
            return False
        
        if noc_code not in results_df['noc'].unique():
            print(f"Error: Country '{noc}' not found in database")
            return False
            
        query = results_df['noc'] == noc_code
        if year:
            if year not in results_df['year'].unique():
                print(f"Error: No data available for year {year}")
                return False
            query &= results_df['year'] == year
            
        medals = results_df[query & results_df['medal'].notna()]
        
        if medals.empty:
            print(f"No medals found for country '{noc}'{' in ' + str(year) if year else ''}")
            return False
            
        return medals['medal'].value_counts()
        
    except Exception as e:
        print(f"Error processing data for country '{noc}': {str(e)}")
        return False

def get_athlete_participations(athlete_name):
    try:
        athlete_data = bios_df[bios_df['name'] == athlete_name]
        if athlete_data.empty:
            print(f"Error: Athlete '{athlete_name}' not found in database")
            return False
            
        athlete_id = athlete_data['athlete_id'].iloc[0]
        
        participations = results_df[results_df['athlete_id'] == athlete_id]
        if participations.empty:
            print(f"No participation records found for athlete '{athlete_name}'")
            return False
            
        return  { 
                "name":athlete_name,
                "part_of":participations[['year', 'discipline', 'event', 'place', 'medal']]
                }
        
    except Exception as e:
        print(f"Error processing participations for athlete '{athlete_name}': {str(e)}")
        return False

def find_athletes_by_sport(sport, year=None):
    try:
        if sport not in results_df['discipline'].unique():
            print(f"Error: Sport '{sport}' not found in database")
            return False
            
        query = results_df['discipline'] == sport
        if year:
            if year not in results_df['year'].unique():
                print(f"Error: No data available for year {year}")
                return False
            query &= results_df['year'] == year
            
        athlete_ids = results_df[query]['athlete_id'].unique()
        
        if len(athlete_ids) == 0:
            print(f"No athletes found for sport '{sport}'{' in ' + str(year) if year else ''}")
            return False
            
        return bios_df[bios_df['athlete_id'].isin(athlete_ids)][['name', 'NOC']]
        
    except Exception as e:
        print(f"Error processing athletes for sport '{sport}': {str(e)}")
        return False

def get_medalists_by_year(year, discipline=None):   
    try:
        if year not in results_df['year'].unique():
            print(f"Error: No data available for year {year}")
            return False
            
        query = (results_df['year'] == year) & results_df['medal'].notna()
        
        if discipline:
            if discipline not in results_df['discipline'].unique():
                print(f"Error: Sport '{discipline}' not found in database")
                return False
            query &= results_df['discipline'] == discipline
            
        medalists = results_df[query]
        
        if medalists.empty:
            print(f"No medalists found for year {year}{' in ' + discipline if discipline else ''}")
            return False
            
        return pd.merge(
            medalists[['discipline', 'event', 'medal', 'athlete_id']], 
            bios_df[['athlete_id', 'name', 'NOC']], 
            on='athlete_id'
        )
        
    except Exception as e:
        print(f"Error processing medalists for year {year}: {str(e)}")
        return False

def get_athlete_bio(athlete_name):
    try:
        athlete_data = bios_df[bios_df['name'] == athlete_name]
        
        if athlete_data.empty:
            print(f"Error: Athlete '{athlete_name}' not found in database")
            return False
            
        return athlete_data.iloc[0]
        
    except Exception as e:
        print(f"Error retrieving bio for athlete '{athlete_name}': {str(e)}")
        return False

def get_athlete_stats(discipline):
    try:
        if discipline not in results_df['discipline'].unique():
            print(f"Error: Sport '{discipline}' not found in database")
            return False
            
        athlete_ids = results_df[results_df['discipline'] == discipline]['athlete_id'].unique()
        athletes = bios_df[bios_df['athlete_id'].isin(athlete_ids)]
        
        if athletes.empty:
            print(f"No athletes found for sport '{discipline}'")
            return False
            
        # Handle potential NaN values in height/weight
        avg_height = round(athletes['height_cm'].mean(), 2)
        avg_weight = round(athletes['weight_kg'].mean(), 2)
        
        if pd.isna(avg_height) and pd.isna(avg_weight):
            print(f"No height/weight data available for athletes in '{discipline}'")
            return False
            
        return {
            'avg_height': avg_height if not pd.isna(avg_height) else None,
            'avg_weight': avg_weight if not pd.isna(avg_weight) else None,
            'count': len(athletes)
        }
        
    except Exception as e:
        print(f"Error processing stats for sport '{discipline}': {str(e)}")
        return False

### OTHER REQUIRED FILES

- `noc_mappings.json` 
- `learning_data.json`
- `chatbotConstants.py`

> These can be found on Github at [Olympics-GPT --- Github-repo](https://github.com/awais-124/Olympics-GPT.git)

### MAIN CHATBOT FILE

In [6]:
from actuators import interpret_question
from chatbotConstants import *
from fileHandling import *

def remove_punctuation(user_string):
    translator = str.maketrans('', '', string.punctuation)
    return user_string.translate(translator)

def show_sample_questions():
    print("-" * 50)
    output="You can ask similar questions like these: \n"
    for i in range(0,5):
        output+=f"{i+1}. {random.sample(sample_questions,1)}\n"
    return output+'\n'+("-" * 50)

def end_conversation():
    print("-" * 50)
    print("Olympics-GPT: ", random.choice(farewell_messages))
    print("-" * 50)
    
def start_conversation():
    print("-" * 50)
    print("OlympicGPT Here: " + random.choice(greetings))
    print("You can ask questions about Olympic history, athletes, \nand medals, countries participated, average stats etc.")
    print("Type 'quit', 'exit', or 'goodbye' to end the conversation.")
    print("-" * 50)   

def main():
    start_conversation()
    
    end_chat=False
    chatbot_response=""
    is_about_olympics = True
    already_in_file=False
    has_response_generated=False
    
    while not end_chat:
        # reset variables
        is_about_olympics = True
        already_in_file=False
        has_response_generated=False
        chatbot_response=""
        
        print('-'*60)
        user_query=(input("\nYou: "))
        user_query=remove_punctuation(user_query)
        
        if user_query in exit_words:
            end_conversation()
            end_chat=True
            return
        
        if user_query in get_file_data('./learning_data.json'):
            chatbot_response=get_file_data('./learning_data.json')[user_query]
            if chatbot_response == -1: return
            already_in_file=True
            has_response_generated=True    

        ### Checks conditions used below
        greeting=any(word.lower() in user_query.lower() for word in greeting_keywords)
        fact=any(word.lower() in user_query.lower() for word in fun_fact_keywords)
        ask_self=any(word.lower() in user_query.lower() for word in self_keywords)
        sample_quest=any(word.lower() in user_query.lower() for word in sample_keywords)
        
        if greeting and not has_response_generated:
            chatbot_response=random.choice(greeting_responses)
            is_about_olympics=False
            has_response_generated=True
            already_in_file=True
            
        if fact and not has_response_generated:
            chatbot_response=random.choice(fun_facts)
            is_about_olympics=False
            has_response_generated=True
            
        if ask_self and not has_response_generated:
            chatbot_response=chatbot_intro
            already_in_file=True
            is_about_olympics=False
            has_response_generated=True
        
        if sample_quest and not has_response_generated:
            chatbot_response=show_sample_questions()   
            is_about_olympics=False
            has_response_generated=True 
            already_in_file=True                           
            
        if is_about_olympics and not has_response_generated:
            chatbot_response=interpret_question(user_query)
            has_response_generated=True
                
        print(f"Olympics-GPT: {chatbot_response}")
        
        if not already_in_file and chatbot_response != default_response:
            add_key_value_to_file('./learning_data.json',user_query,chatbot_response)
            

In [None]:
if __name__ == '__main__':
    main()

#### THE END