In [9]:
import pandas as pd
import numpy as np
import seaborn as sns
import plotly.express as px
import random
import time
import random

import numpy as np
import cv2

In [None]:
# Task:
# visualize the simulation of the supermarket (Markov Chains) that was set up before:

# read-in tiles from png file and draw supermarket into window
# run simulation of supermarket and transform output into graphic

In [2]:
# read-in Markov probabilities that have been calculated before
prob = pd.read_csv('./data/prob.csv')
# read-in probabilities of where customers enter the store
prob_first = pd.read_csv('./data/prob_first.csv')
prob.set_index('before', inplace=True)

In [14]:
prob

Unnamed: 0_level_0,checkout,dairy,drinks,fruit,spices
before,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
checkout,0.0,0.0,0.0,0.0,0.0
dairy,0.392389,0.0,0.222318,0.189852,0.195442
drinks,0.538956,0.027256,0.0,0.217794,0.215994
fruit,0.500784,0.236966,0.136417,0.0,0.125833
spices,0.251672,0.323616,0.2728,0.151912,0.0


In [4]:
class Customer():
    def __init__(self, customer_no):
        self.section = random.choices(population=prob_first['location'], weights=prob_first['prob'])
        self.sections = self.section
        self.in_shop = True
        self.customer_no = customer_no
        
    def __repr__(self):
        if self.in_shop:
            return f"Customer {self.customer_no} is in section '{self.section[0]}'."
        else:
            return f"Customer {self.customer_no} has left the shop"
        
    def compute(self):
        if self.in_shop:
            self.old_section = self.section[0]
            self.section = random.choices(population=prob.columns, weights=prob.loc[self.section].values[0])
            self.sections.append(self.section[0]) 
            if self.section[0] == 'checkout':
                    self.in_shop = False
            return f"Customer {self.customer_no} moved from '{self.old_section}' to '{self.section[0]}'."
        else:
            return f"Customer {self.customer_no} has left the shop from '{self.old_section}'."

In [5]:
class Supermarket():
    def __init__(self):
        self.customers = []
        self.n_customers = len(self.customers)
        self.running_id = 0
        
    def __repr__(self):
        return f"Supermarket with {self.n_customers} customers."
    
    def add_customers(self, n=1):
        for i in range(self.running_id, self.running_id + n):
            self.customers.append(Customer(i))
        self.n_customers = len(self.customers)
        self.running_id += n
            
    def compute_market(self):
        state = []
        have_left = []
        for customer in self.customers:
            customer.compute()
            state.append([customer.customer_no, customer.section[0]])
            if not customer.in_shop:
                have_left.append(customer)
        self.customers = [x for x in self.customers if x not in have_left]
        self.n_customers = len(self.customers)
        return state

In [6]:
class SupermarketMap:
    """Visualizes the supermarket background"""

    def __init__(self, layout, tiles):
        """
        layout : a string with each character representing a tile
        tiles   : a numpy array containing all the tile images
        """
        self.tiles = tiles
        # split the layout string into a two dimensional matrix
        self.contents = [list(row) for row in layout.split("\n")]
        self.ncols = len(self.contents[0])
        self.nrows = len(self.contents)
        self.image = np.zeros(
            (self.nrows*TILE_SIZE, self.ncols*TILE_SIZE, 3), dtype=np.uint8
        )
        self.prepare_map()

    def extract_tile(self, row, col):
        """extract a tile array from the tiles image"""
        y = row*TILE_SIZE
        x = col*TILE_SIZE
        return self.tiles[y:y+TILE_SIZE, x:x+TILE_SIZE]

    def get_tile(self, char):
        """returns the array for a given tile character"""
        if char == "#":
            return self.extract_tile(0, 0)
        
        elif char == "F": # fruits
            return self.extract_tile(0, 4)
        
        elif char == "D": # dairy
            return self.extract_tile(6, 12)
        
        elif char == "L": # drinks
            return self.extract_tile(6, 13)
        
        elif char == "S": # spices
            return self.extract_tile(1, 3)
        
        elif char == "c": # spices
            return self.extract_tile(7, 0)
                
        elif char == "G":
            return self.extract_tile(7, 3)
        elif char == "C":
            return self.extract_tile(2, 8)
        else:
            return self.extract_tile(1, 2)

    def prepare_map(self):
        """prepares the entire image as a big numpy array"""
        for row, line in enumerate(self.contents):
            for col, char in enumerate(line):
                bm = self.get_tile(char)
                y = row*TILE_SIZE
                x = col*TILE_SIZE
                self.image[y:y+TILE_SIZE, x:x+TILE_SIZE] = bm

    def draw(self, frame):
        """
        draws the image into a frame
        """
        frame[0:self.image.shape[0], 0:self.image.shape[1]] = self.image

    def write_image(self, filename):
        """writes the image into a file"""
        cv2.imwrite(filename, self.image)


In [12]:
### run the simulation and visualization

# define time range of simulation
timestamps = pd.date_range(start='2022-12-22 07:00:00', end='2022-12-22 09:00:00', freq='1T')

# instanciate supermarket
s = Supermarket()

# window graphic array
background = np.zeros((550, 710, 3), np.uint8)

# read in tiles
tiles = cv2.imread("./data/tiles.png")
TILE_SIZE = 32

# define market layout
MARKET = """
######################
##..................##
##..................##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
#S...#L...#D...#F...##
##..................##
##...C#...C#...C#...##
##...##...##...##...##
##..................##
###############GG#####
""".strip()

output = []

for timestamp in timestamps:
    # add a random no. of customers each minute
    s.add_customers(random.randint(0,3))
    # run simulation for one time step
    result = s.compute_market()
    # extract necessary data from results
    minute = pd.DataFrame([[timestamp,row[0],row[1]] for row in result], columns= ['timestamp','customer_id','location'])
    
    # how many customers are in each location?
    no_cust_fruits = minute['location'][minute['location']=='fruit'].count()
    no_cust_dairy = minute['location'][minute['location']=='dairy'].count()
    no_cust_drinks = minute['location'][minute['location']=='drinks'].count()
    no_cust_spices = minute['location'][minute['location']=='spices'].count()
    no_cust_checkout = minute['location'][minute['location']=='checkout'].count()
    
    string = ''
    
    # iterate through rows and cols of the market tiles and place customers
    for row in enumerate(MARKET.split('\n')):
        for i in range(len(row[1])):
            if i==18 and (row[0] >= 3 and row[0] < 3 + no_cust_fruits):
                string += 'c'
            elif i==13 and (row[0] >= 3 and row[0] < 3 + no_cust_dairy):
                string += 'c'
            elif i==8 and (row[0] >= 3 and row[0] < 3 + no_cust_drinks):
                string += 'c'
            elif i==3 and (row[0] >= 3 and row[0] < 3 + no_cust_spices):
                string += 'c'
                
            elif i==3 and (row[0] == 12 and no_cust_checkout > 0):
                string += 'c'
                no_cust_checkout -= 1
                
            elif i==8 and (row[0] == 12 and no_cust_checkout > 0):
                string += 'c'
                no_cust_checkout -= 1
                
            elif i==13 and (row[0] == 12 and no_cust_checkout > 0):
                string += 'c'
                
            else:
                string += row[1][i]
        string += '\n'
    
    # draw market from ascii into graphic
    market = SupermarketMap(string, tiles)
    frame = background.copy()
    # draw graphic into window
    market.draw(frame)

    # https://www.ascii-code.com/
    key = cv2.waitKey(1)
    if key == 113: # 'q' key
        cv2.destroyAllWindows()
        break

    cv2.imshow("frame", frame)
    time.sleep(1)