In [1]:
import random
import time
import numpy as np
import mariadb #connector for mariadb database
import sys
import networkx as nx
from getpass import getpass

In [2]:
random.seed(42) #for reproducible results
#Consecutive participations rules
MAX_CONSECUTIVE_EPISODES = 3
N_CRITICS_PER_EPISODE = 3
N_COOKS_PER_EPISODE = 10
N_CUISINES_PER_EPISODE = 10

N_EPISODES = 50
N_EPISODES_PER_SEASON = 10
N_SEASONS = N_EPISODES // N_EPISODES_PER_SEASON
VALID_SCORES = range(1, 6)

In [3]:
season_dates = ["2018/1/1", "2019/1/1", "2020/1/1", "2021/1/1", "2022/1/1", "2023/1/1"]

In [4]:
def str_time_prop(start, end, time_format, prop):
    """Get a time at a proportion of a range of two formatted times.

    start and end should be strings specifying times formatted in the
    given format (strftime-style), giving an interval [start, end].
    prop specifies how a proportion of the interval to be taken after
    start.  The returned time will be in the specified format.
    """

    stime = time.mktime(time.strptime(start, time_format))
    etime = time.mktime(time.strptime(end, time_format))

    ptime = stime + prop * (etime - stime)

    return time.strftime(time_format, time.localtime(ptime))

#USAGE: random_date("2018/1/1", "2023/1/1", random.random())
def random_date(start, end, prop):
    return str_time_prop(start, end, '%Y/%m/%d', prop)


In [5]:
# Function to generate random lorem ipsum string
def generate_lorem_ipsum():
    lorem_ipsum = ''.join(random.choices(string.ascii_letters + string.digits + ' ', k=50))
    return lorem_ipsum

# Function to generate image URL for ingredient
def generate_image_url(ingredient_name):
    return f'http://example.com/images/{ingredient_name.lower().replace(" ", "_")}.jpg'


In [6]:
#Find data that violate the rule: max 3 consecutive participations
def check_history(cur, table, column, current_episode):
    #Hold record for last 3 episodes
    check_recipes = []
    check_cuisines = []
    check_cooks = []
    check_critics = []

    banned_recipes = []
    banned_cuisines = []
    banned_cooks = []
    banned_critics = []
    
    #Get 3 last episodes
    cur.execute(f"SELECT Recipe_ID, Cook_ID, Critic_ID FROM {table} WHERE {column}='{current_episode-1}' OR {column}='{current_episode-2}' OR {column}='{current_episode-3}'")
    last3_ratings = cur.fetchall()

    #If first episode nothing is banned
    if not last3_ratings:
        return banned_recipes, banned_cuisines, banned_cooks, banned_critics
    
    for rating in last3_ratings:
        check_recipes.append(rating[0])
        check_cooks.append(rating[1])
        check_critics.append(rating[2])

    #Find corresponding cuisines for recipes
    for recipe in check_recipes:
        cur.execute(f"SELECT Cuisine_ID FROM Recipe WHERE Recipe_ID='{recipe}'")
        temp = cur.fetchall()
        check_cuisines.append(temp[0][0]) #query only returns one result, unique recipes.

    #All max values are determined on the assumption that a cuisine can only have one recipe per episode
    #and no two cooks prepare the same recipe.
    #If this isn't the case then we must shift the checks to each episode and aggregate instead of pooling them all together like above
    
    #Find recipes that break the rule: max 3 consecutive participations
    values, counts = np.unique(check_recipes, return_counts=True)
    for x in zip(values, counts):
        if x[1] >= N_CRITICS_PER_EPISODE * MAX_CONSECUTIVE_EPISODES:
            banned_recipes.append(x[0])

    #Find cuisines that break the rule: max 3 consecutive participations
    values, counts = np.unique(check_cuisines, return_counts=True)
    for x in zip(values, counts):
        if x[1] >= N_CRITICS_PER_EPISODE * MAX_CONSECUTIVE_EPISODES:
            banned_cuisines.append(x[0])

    #Find cooks that break the rule: max 3 consecutive participations
    values, counts = np.unique(check_cooks, return_counts=True)
    for x in zip(values, counts):
        if x[1] >= N_CRITICS_PER_EPISODE * MAX_CONSECUTIVE_EPISODES:
            banned_cooks.append(x[0])

    #Find critics that break the rule: max 3 consecutive participations
    values, counts = np.unique(check_critics, return_counts=True)
    for x in zip(values, counts):
        if x[1] >= N_COOKS_PER_EPISODE * MAX_CONSECUTIVE_EPISODES:
            cur.execute(f"SELECT Cook_ID FROM Critic WHERE Critic_ID='{x[0]}'")
            temp = cur.fetchall()
            banned_critics.append(temp[0][0]) #Remove Cook_ID from critic that violates rule

    return banned_recipes, banned_cuisines, banned_cooks, banned_critics

In [7]:
def maximum_bipartite_matching(left_set, right_set, edges):
    B = nx.Graph()
    B.add_nodes_from(list(left_set.values()), bipartite=0)
    B.add_nodes_from(list(right_set.values()), bipartite=1)
    B.add_edges_from(edges)

    matching = nx.bipartite.maximum_matching(B, top_nodes=list(left_set.values()))

    # Extract only the part of the matching that goes from left to right
    left_to_right_matching = [tuple((k,v)) for k, v in matching.items() if k in list(left_set.values())]

    return left_to_right_matching

In [8]:
#Checks last 3 episode for cuisines, recipes, cooks and critics that violate
#the rule: max 3 consecutive participations and removes them from the available choices.
#Then it randomly chooses some cuisines for the episode and using Hopcroft-Karp algorithm
#finds the edges so that each cuisine is assigned to exactly one cook that specializes in it.
#If there is no such solution the process restarts with different randomly chosen cuisines.

#Function doesn't check if there are enough data in database for lottery to succeed!!
#We assume the required tables have been populated
def episode_lottery(cur):
    #Episode lottery is always done prior to insertion in Episode table
    #Therefore take max value of Episode_Order and add 1
    cur.execute(f"SELECT MAX(Episode_ID) FROM Episode")
    current_episode = cur.fetchall()
    if current_episode[0][0] == None: #If this is the first episode
        current_episode = 1
    else:
        current_episode = current_episode[0][0] + 1

    banned_recipes, banned_cuisines, banned_cooks, banned_critics = check_history(cur, "Rating", "Episode_ID", current_episode)

    #Find all available recipes in database
    cur.execute(f"SELECT Recipe_ID, Cuisine_ID FROM Recipe")
    choose_recipes = cur.fetchall()
    choose_recipes = [recipe for recipe in choose_recipes if recipe[0] not in banned_recipes]
    
    #Find all available cuisines in database
    cur.execute(f"SELECT Cuisine_ID FROM Cuisine")
    choose_cuisines = cur.fetchall()
    choose_cuisines = [cuisine[0] for cuisine in choose_cuisines] #Convert to list of elements instead of list of tuples
    choose_cuisines = [cuisine for cuisine in choose_cuisines if cuisine not in banned_cuisines]
    
    #Find all available cooks in database
    cur.execute(f"SELECT Cook_ID FROM Cook")
    choose_cooks = cur.fetchall()
    choose_cooks = [cook[0] for cook in choose_cooks] #Convert to list of elements instead of list of tuples
    choose_cooks = [cook for cook in choose_cooks if cook not in banned_cooks]
    
    #Find all available critics in database based on Cook_ID
    #All cooks are possible critics
    cur.execute(f"SELECT Cook_ID FROM Cook")
    choose_critics = cur.fetchall()
    choose_critics = [critic[0] for critic in choose_critics] #Convert to list of elements instead of list of tuples
    choose_critics = [critic for critic in choose_critics if critic not in banned_critics]

    while True:
        #print("Loop!")
        episode_cuisines = random.sample(choose_cuisines, N_CUISINES_PER_EPISODE)
        episode_recipes = []
        episode_cooks = []
        left_set = {} #cuisines
        right_set = {} #cooks
        edges = []

        cook_counter = len(episode_cuisines)
        for cuisine_counter, cuisine in enumerate(episode_cuisines, 0):
            #Don't have to check if there is at least one available recipe because the cuisine rule would be violated
            #first, since each recipe belongs to exactly one cuisine
            #While loop should restart if the above assumption changes since there is a posibility that
            #nothing is returned by random choice if one cuisine only has one recipe
            episode_recipes.append(random.choice([recipe[0] for recipe in choose_recipes if recipe[1] == cuisine]))
            
            cur.execute(f"SELECT Cook_ID FROM Specializes WHERE Cuisine_ID='{cuisine}'")
            temp = cur.fetchall()
            temp = [x[0] for x in temp]
            temp = [x for x in temp if x not in banned_cooks]
            left_set[cuisine] = cuisine_counter
            for cook in temp:
                if cook not in right_set:
                    right_set[cook] = cook_counter
                    cook_counter = cook_counter + 1
                edges.append(tuple((left_set[cuisine], right_set[cook])))

        
        temp = maximum_bipartite_matching(left_set, right_set, edges)
        
        #Unmask cuisine-cook
        place1 = 0
        place2 = 0
        for x in temp:
            for cuisine, cuisine_mask in left_set.items():
                if x[0] == cuisine_mask:
                    place1 = cuisine
                    break;

            for cook, cook_mask in right_set.items():
                if x[1] == cook_mask:
                    place2 = cook
                    break;

            episode_cooks.append(tuple((place1, place2)))
            
        #If there aren't enough expected matchings, then there is no solution for the chosen cuisines
        #Restart loop and random pick again
        if len(episode_cooks) != N_CUISINES_PER_EPISODE:
            continue;
        else:
            break;

    #Data at same index location for cuisines, recipes, cooks are tied together
    episode_cooks = [x[1] for x in episode_cooks]

    #Remove chosen cooks from available critics
    choose_critics = [critic for critic in choose_critics if critic not in episode_cooks]
    episode_critics = random.sample(choose_critics, N_CRITICS_PER_EPISODE) #Cook_ID of chosen critics not Critic_ID!!!

    return episode_cuisines, episode_recipes, episode_cooks, episode_critics, current_episode

In [9]:
def insert_ratings(cur, ratings):
    cur.executemany("INSERT INTO Rating (Score, Recipe_ID, Cook_ID, Critic_ID, Episode_ID) VALUES (?, ?, ?, ?, ?)", ratings)

In [10]:
def find_winner(cur, episode):
    cur.execute(
        f"""
            WITH CookAverageScores AS (
            	SELECT Rating.Cook_ID, Cook_Rank_Comparison.Rank_Number, AVG(Score) AS Average_Score
            	FROM Rating JOIN Cook ON Cook.Cook_ID=Rating.Cook_ID JOIN Cook_Rank_Comparison ON Cook.Cook_Rank = Cook_Rank_Comparison.Cook_Rank 
            	WHERE Episode_ID={episode}
            	GROUP BY Rating.Cook_ID
            ),
            MaxAverage AS (
            	SELECT MAX(Average_Score) AS MaxAverageScore
            	FROM CookAverageScores
            )
            SELECT First_Name, Last_Name
            FROM CookAverageScores
            JOIN MaxAverage ON MaxAverage.MaxAverageScore = CookAverageScores.Average_Score
            JOIN Cook ON CookAverageScores.Cook_ID = Cook.Cook_ID
            JOIN Person ON Cook.Person_ID = Person.Person_ID
            ORDER BY CookAverageScores.Rank_Number ASC, RAND() LIMIT 1;
        """
    )
    winner = cur.fetchall()
    winner = winner[0][0] + " " + winner[0][1]
    return winner

In [11]:
def fill_episode(cur, dummy_episode_release_date=None, dummy_flag=False):
    cuisines, recipes, cooks, critics, episode = episode_lottery(cur)

    if not dummy_flag:
        print("Ask application for episode release date")
    else:
        episode_release_date = dummy_episode_release_date

    #First insert episode without winner to satisfy referencial integrity
    cur.execute("INSERT INTO Episode (Episode_ID, Episode_Release_Date, Winner, Image, Image_Desc) VALUES (?, ?, ?, ?, ?)", (episode, episode_release_date, "NULL", generate_image_url("episode" + str(episode)), generate_lorem_ipsum()))
    
    cur = conn.cursor() #Refresh cursor
    involved = [tuple((episode, cuisine)) for cuisine in cuisines] #insert into database after episode
    cur.executemany("INSERT INTO Involved (Episode_ID, Cuisine_ID) VALUES (?, ?)", involved)

    assign = list(zip(recipes, cooks))
    #Each time the INSERT faces no error the value of Critic_ID increases because it is AUTO_INCREMENT.
    cur.executemany("INSERT INTO Assign (Recipe_ID, Cook_ID) VALUES (?, ?) ON DUPLICATE KEY UPDATE Recipe_ID=Recipe_ID, Cook_ID=Cook_ID", assign)
    cur.executemany("INSERT INTO Critic (Cook_ID) VALUES (?) ON DUPLICATE KEY UPDATE Cook_ID=Cook_ID", [tuple((critic,)) for critic in critics])

    cur = conn.cursor() #Refresh cursor
    cur.execute("SELECT Critic_ID FROM Critic WHERE Cook_ID IN ({0})".format(", ".join("?" for _ in critics)), critics)
    critics = cur.fetchall()
    critics = [x[0] for x in critics]
    
    ratings = []
    for recipe, cook in zip(recipes, cooks):
        for critic in critics: #Critic_ID not Cook_ID!
            ratings.append([recipe, cook, critic, episode])

 
    #--TODO!!!! Ask somehow the user for the scores after providing them the ratings and complete them
    #at the application level
    if not dummy_flag:
        print("Do something")
    #Create dummy data
    else:
        ratings = [tuple([random.choice(VALID_SCORES)] + rating) for rating in ratings]

    insert_ratings(cur, ratings)
    
    cur = conn.cursor() #Refresh cursor
    winner = find_winner(cur, episode)

    #Now insert winner to complete episode record
    cur.execute(f"INSERT INTO Episode (Episode_ID, Episode_Release_Date, Winner) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE Winner=?", (episode, episode_release_date, winner, winner))

In [12]:
def create_admins(cur):
    cur.execute(
        f"""
            SELECT Person.Username, Person.Password
            FROM Admin
            JOIN Person ON Admin.Person_ID = Person.Person_ID
        """
    )
    admin_data = cur.fetchall()
    for admin in admin_data:
        full_username = f"'{admin[0]}'@'localhost'"
        password = f"'{admin[1]}'"
        query1 = f"CREATE OR REPLACE USER {full_username} IDENTIFIED BY {password};"
        query2 = f"GRANT Admin TO {full_username};"
        query3 = f"SET DEFAULT ROLE Admin FOR {full_username};"
        cur.execute(query1)
        cur.execute(query2)
        cur.execute(query3)

In [13]:
def create_cooks(cur):
    cur.execute(
        f"""
            SELECT Person.Username, Person.Password
            FROM Cook
            JOIN Person ON Cook.Person_ID = Person.Person_ID
        """
    )
    cook_data = cur.fetchall()
    for cook in cook_data:
        full_username = f"'{cook[0]}'@'localhost'"
        password = f"'{cook[1]}'"
        query1 = f"CREATE OR REPLACE USER {full_username} IDENTIFIED BY {password};"
        query2 = f"GRANT Cook TO {full_username};"
        query3 = f"SET DEFAULT ROLE Cook FOR {full_username};"
        cur.execute(query1)
        cur.execute(query2)
        cur.execute(query3)