In [378]:
!pip install simpy plotly



### Goal
- randomization of tasks duration
- structure of user scenario


In [379]:
import simpy
import datetime
import pandas as pd
import plotly.express as px
import logging
from enum import Enum

# new
import random
from itertools import repeat


In [380]:
class Metric(Enum):
    RW = "Requests Waiting"
    BS = "Busy Slots"
    AU = "Active Users"

In [381]:
log_filename = "logs-9.log"
mainLogger = logging.getLogger()
mainLogger = logging.getLogger()
fhandler = logging.FileHandler(filename=log_filename, mode='w')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fhandler.setFormatter(formatter)
mainLogger.addHandler(fhandler)
mainLogger.setLevel(logging.DEBUG)
mainLogger.debug("test")

In [382]:
class Scenario1:
    def __init__(self):
        self.name = "Sc1"
        self.tasks = []
        self.tasks.append( 
            { 'Name': "Process-1",
              'Duration': 5,
              'Res': 10
            })        
        self.tasks.append( 
            { 'Name': "Verify-1",
              'Duration': 2
            })

class Scenario2:
    def __init__(self):
        self.name = "Sc2"
        self.tasks = []
        self.tasks.append( 
            { 'Name': "Process-2.1",
              'Duration': 2,
              'Res': 5
            })        
        self.tasks.append( 
            { 'Name': "Verify-2.1",
              'Duration': 1
            })
        self.tasks.append( 
            { 'Name': "Wait-2.1",
              'Duration': 1
            })
        self.tasks.append( 
            { 'Name': "Process-2.2",
              'Duration': 3,
              'Res': 5
            })        
        self.tasks.append( 
            { 'Name': "Verify-2.2",
              'Duration': 1
            })
        
class Scenario3:
    def __init__(self):
        self.name = "Sc3"
        self.tasks = []
        self.tasks.append( 
            { 'Name': "Process-1",
              'Duration': 8,
              'Res': 40
            })        
        self.tasks.append( 
            { 'Name': "Verify-1",
              'Duration': 2
            })



In [383]:
class User:
    def __init__(self, id, scenario, world):
        self.id = id
        self.scenario = scenario
        self._world = world
        self.taskid = 0 # new
        self.create()
        # Start the run process everytime an instance is created.
        # create itself as a processs
        self.action = self._world.env.process(self.run())

    def create(self):
        self.enteringAt = self._world.env.now
        self.name = "User-%03d" % self.id
        mainLogger.info(f"user created {self.name}")
        self._world.user_monitor.report_new_user(self)

    def run(self):
        while True:
            self.taskid += 1
            for task in self.scenario.tasks:
                taskname = task['Name']
                task_duration = task['Duration']
                mark = self._world.env.now
                mainLogger.debug(f"{self.name} starts task {taskname} at %d" % mark)

                if 'Res' in task:
                    self._world.user_monitor.report_start(
                            self.name, 
                            self.scenario, #new
                            taskname, 
                            self.taskid)
                    # We yield the process that process() returns
                    # to wait for it to finish
                    amount = task['Res']
                    yield self._world.env.process(self.process_task(task_duration, amount))
                    #yield self._world.env.process(self.process_task(task_duration))
                    self._world.user_monitor.report_stop(
                            self.name, 
                            self.scenario, #new
                            taskname, 
                            self.taskid)
                else:
                    # wait some time even if no tracked
                    yield self._world.env.timeout(task_duration)

                mainLogger.debug(f"{self.name} ends task {taskname} at %d" % mark)
                 
    #def process_task(self, duration):
    def process_task(self, duration, amount):
        #with self._world.res.request() as req:
        with Job(self._world.res,amount) as req:
            yield req
            yield self._world.env.timeout(duration)
        mainLogger.debug(f"exiting process task at %d" % self._world.env.now)


In [384]:
class Clock:
    def __init__(self):
        self.base_epoch = datetime.datetime.now().timestamp()
        mainLogger.info(f"Clock created - base {self.base_epoch}")

    def to_date(self, tick):
        epoch_time = self.base_epoch + tick*60 #mn
        datetime_time = datetime.datetime.fromtimestamp(epoch_time)
        return datetime_time

In [385]:
class UsersMonitor:
    def __init__(self, world):
        self._world = world # new
        # init parameters are self reported
        # start and stop events
        self.start_data = []
        self.stop_data = []
        # list of users
        self.users = []
        
    def report_new_user(self, user):
        self.users.append(user)            
        
    #def report_start(self, username, taskname, taskid):
    def report_start(self, username, scenario, taskname, taskid):
        mark = self._world.env.now
        self.start_data.append(
            dict(  
                StartMark=mark,
                Start=world.clock.to_date(mark),
                Username=username,
                Scenario=scenario.name, # new
                Task=taskname,
                TaskId=taskid
            )
        )       
        
    #def report_stop(self, username, taskname, taskid):
    def report_stop(self, username, scenario, taskname, taskid):
        mark = self._world.env.now
        self.stop_data.append(
            dict(  
                FinishMark=mark,
                Finish=world.clock.to_date(mark),
                Username=username,
                Scenario=scenario.name, # new
                Task=taskname,
                TaskId=taskid
            )
        )
        
                
    def collect(self):
        df_start = pd.DataFrame(self.start_data)
        df_stop = pd.DataFrame(self.stop_data)
        df = pd.merge(df_start, df_stop, how='left', 
                      on = ['Username', 'Scenario', 'Task', 'TaskId'])
        df['Duration'] = df['FinishMark'] - df['StartMark']
        return df


In [386]:
# wake up every tick and collect
class UsersGenerator:
    def __init__(self, world, max_nb_users=10):
        self._world = world
        self._max_nb_users = max_nb_users
        mainLogger.info("creating user generator for %s users", self._max_nb_users)
        self.data = []
        self.active_users = []
        self.user_count = 0
        # this will be used as a process
        self.action = world.env.process(self.run())
        # new - randomize order
        random.shuffle(self._world.scenarios)

    def run(self):
        while True:

            if self.user_count < self._max_nb_users:
                self.create_user()
        
            self.report()

            tick_duration = 1
            yield self._world.env.timeout(tick_duration)

    def create_user(self):

        # new - replaced method
        #scenario = random.choice(self._world.scenarios)
        # never run rare scenarios
        
        # count start at base 0
        i_scenario = self.user_count % len(scenarios)
        scenario = self._world.scenarios[i_scenario]

        # first user is labelled -001
        self.user_count += 1
        user = User(self.user_count, 
                    scenario,
                    self._world)
        self.active_users.append(user)
        mark = self._world.env.now
        mainLogger.debug(f"{len(self.active_users)} active users at %d" % mark)
        
    def report(self):
        mark = self._world.env.now
        active_users_count = len(self.active_users)

        self.data.append(
            dict(  
                Mark=mark,
                Timestamp=self._world.clock.to_date(mark),
                Metric=Metric.AU.value,
                Value=active_users_count
            )
        )       
        
    
    def collect(self):
        return pd.DataFrame(self.data)


In [387]:
class Job:
    def __init__(self, res, items=1):
        self.res = res
        self.items = items
        mainLogger.debug(f"creating job with amount {self.items}")
        
    def __enter__(self):
        mainLogger.debug("__enter__" )
        return self.res.get(self.items).__enter__()

    def __exit__(self, exc_type, exc_val, exc_tb):
        mainLogger.debug("__exit__" )
        mainLogger.debug("exc_type {exc_type} exc_val {exc_val} exc_tb {exc_tb}" )
        self.res.put(self.items).__exit__(exc_type, exc_val, exc_tb)


In [388]:
class SystemResource(simpy.resources.container.Container):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        mainLogger.info(f"create resource with capacity {self.capacity}")
        self.processing_data = []
        self.waiting_data = []
        mainLogger.info(f"initial level {self.level}")

    def get(self, *args, **kwargs):
        amount = args[0]
        mainLogger.debug(f"received request resource - amount {amount} at %d" % self._env.now)
        mainLogger.debug(f"level (available) {self.level} at %d" % self._env.now)
        mainLogger.debug(f"{len(self.get_queue)} waiting at %d" % self._env.now)
        mainLogger.debug(f"{self.used()} processing at %d" % self._env.now)
        self.processing_data.append((self._env.now, self.used()))
        self.waiting_data.append((self._env.now, len(self.get_queue)))
        return super().get(*args, **kwargs)

    def put(self, *args, **kwargs):
        amount = args[0]
        mainLogger.debug(f"received release resource - amount {amount} at %d" % self._env.now)
        mainLogger.debug(f"level (available) {self.level} at %d" % self._env.now)
        mainLogger.debug(f"{len(self.get_queue)} waiting at %d" % self._env.now)
        mainLogger.debug(f"{self.used()} processing at %d" % self._env.now)
        self.processing_data.append((self._env.now, self.used()))
        self.waiting_data.append((self._env.now, len(self.get_queue)))
        return super().put(*args, **kwargs)

    def used(self):
        return self.capacity - self.level

In [389]:
class SystemResource_old(simpy.Resource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        mainLogger.info(f"create resource with capacity {self.capacity}")

    def request(self, *args, **kwargs):
        mainLogger.debug("request resource at %d" % self._env.now)
        return super().request(*args, **kwargs)

    def release(self, *args, **kwargs):
        mainLogger.debug("release resource at %d" % self._env.now)
        return super().release(*args, **kwargs)


In [390]:
# wake up every tick and collect
class SystemMonitoringAgent:
    def __init__(self, world):
        self._world = world
        mainLogger.info("creating agent")
        self.data = []
        # this will be used as a process
        self.action = world.env.process(self.run())

    def run(self):
        while True:

            mark = self._world.env.now
            #occupied_slots = self._world.res.count
            occupied_slots = self._world.res.used()
            #requests_waiting = len(self._world.res.queue)
            requests_waiting = len(self._world.res.get_queue)
            
            #new
            mainLogger.debug(f"level {self._world.res.level} at %d" % mark)
            mainLogger.debug(f"{occupied_slots} occupied slots at %d" % mark)
            mainLogger.debug(f"{requests_waiting} requests waiting at %d" % mark)

            self.data.append(
                dict(  
                    Mark=mark,
                    Timestamp=self._world.clock.to_date(mark),
                    Metric=Metric.BS.value,
                    Value=occupied_slots
                )
            )       
            self.data.append(
                dict(  
                    Mark=mark,
                    Timestamp=self._world.clock.to_date(mark),
                    Metric=Metric.RW.value,
                    Value=requests_waiting
                )
            ) 
            
            tick_duration = 1
            yield self._world.env.timeout(tick_duration)

        
                
    def collect(self):
        return pd.DataFrame(self.data)


In [391]:
class World:
    
    def __init__(self, scenarios, nb_users=20, resource_capacity=5):
        mainLogger.info(f"creating simulation")
        self.scenarios = scenarios
        self.env = simpy.Environment()
        self.clock = Clock()
        self.res = SystemResource(self.env, 
                                  init=resource_capacity, #new
                                  capacity=resource_capacity)
        self.user_monitor = UsersMonitor(self)
        self.user_gen = UsersGenerator(self, max_nb_users=nb_users)
        self.res_agent = SystemMonitoringAgent(self)

    def start(self, sim_duration = 20):
        mainLogger.info(f"starting simulation")
        self.env.run(until=sim_duration)

## Configuration

In [392]:
mainLogger.setLevel(logging.INFO)
#mainLogger.setLevel(logging.DEBUG)
        
scenarios = []
scenarios.extend(repeat(Scenario1(), 8))
scenarios.extend(repeat(Scenario2(), 20))
scenarios.extend(repeat(Scenario3(), 2))

world = World(scenarios, # new
              nb_users = 60,
              resource_capacity = 80 ) # 240 half a cluster

# debug
#world = World(scenarios, nb_users = 1, resource_capacity = 5 )


In [393]:
world.start(sim_duration = 180) # 1 = 1mn
# debug
#world.start(sim_duration = 20)

# Time Series

In [394]:
df_system = world.res_agent.collect()
display(df_system)

Unnamed: 0,Mark,Metric,Timestamp,Value
0,0,Busy Slots,2021-09-12 12:32:58.604829,0
1,0,Requests Waiting,2021-09-12 12:32:58.604829,0
2,1,Busy Slots,2021-09-12 12:33:58.604829,10
3,1,Requests Waiting,2021-09-12 12:33:58.604829,0
4,2,Busy Slots,2021-09-12 12:34:58.604829,10
5,2,Requests Waiting,2021-09-12 12:34:58.604829,0
6,3,Busy Slots,2021-09-12 12:35:58.604829,10
7,3,Requests Waiting,2021-09-12 12:35:58.604829,0
8,4,Busy Slots,2021-09-12 12:36:58.604829,10
9,4,Requests Waiting,2021-09-12 12:36:58.604829,0


## Occupied slots

In [395]:
#df = dfs['occupied_slots']
df = df_system[ df_system.Metric == Metric.BS.value ]
fig = px.bar(df, x='Timestamp', y='Value')
fig.show()

## Requests waiting

In [396]:
#df = dfs['requests_waiting']
df = df_system[ df_system.Metric == Metric.RW.value ]
fig = px.bar(df, x='Timestamp', y='Value')
fig.show()

## Active Users

In [397]:
df_users_active = world.user_gen.collect()
#display(df_users_active)

In [398]:
fig = px.bar(df_users_active, x='Timestamp', y='Value')
fig.show()

# Users activity

In [399]:
# debug
#df_start = pd.DataFrame(world.user_monitor.start_data)
#display(df_start)

In [400]:
# debug
#df_stop = pd.DataFrame(world.user_monitor.stop_data)
#display(df_stop)

In [401]:
# debug
#df_stop.shape

## Users chronogram

In [402]:
df_users = world.user_monitor.collect()
#display(df_users)

In [403]:
#new
#fig = px.timeline(df_users, x_start="Start", x_end="Finish", y="Username")
fig = px.timeline(df_users, x_start="Start", x_end="Finish", y="Username", color="Scenario")
#fig.update_yaxes(autorange="reversed") # otherwise tasks are listed from the bottom up
fig.update_yaxes(categoryorder='category descending')
fig.show()

## Users activity distribution

In [404]:
df_users[["Duration"]].describe()

Unnamed: 0,Duration
count,346.0
mean,22.127168
std,10.633881
min,2.0
25%,13.0
50%,26.0
75%,31.0
max,35.0


In [405]:
mainLogger.debug(f"max={max}")
fig = px.histogram(df_users, x="Duration", nbins=20)
fig.show()

In [406]:
# when starts waiting - more or less 8 users
# as much as processes - more or less 13 users

# Logs

In [407]:
with open(log_filename) as log:
            print(log.read())

2021-09-12 12:32:58,269 - root - DEBUG - test
2021-09-12 12:32:58,337 - root - DEBUG - __exit__
2021-09-12 12:32:58,338 - root - DEBUG - exc_type {exc_type} exc_val {exc_val} exc_tb {exc_tb}
2021-09-12 12:32:58,338 - root - DEBUG - received release resource - amount 40 at 180
2021-09-12 12:32:58,338 - root - DEBUG - level (available) 35 at 180
2021-09-12 12:32:58,338 - root - DEBUG - 0 waiting at 180
2021-09-12 12:32:58,339 - root - DEBUG - 205 processing at 180
2021-09-12 12:32:58,339 - root - DEBUG - __exit__
2021-09-12 12:32:58,339 - root - DEBUG - exc_type {exc_type} exc_val {exc_val} exc_tb {exc_tb}
2021-09-12 12:32:58,340 - root - DEBUG - received release resource - amount 10 at 180
2021-09-12 12:32:58,340 - root - DEBUG - level (available) 75 at 180
2021-09-12 12:32:58,340 - root - DEBUG - 0 waiting at 180
2021-09-12 12:32:58,341 - root - DEBUG - 165 processing at 180
2021-09-12 12:32:58,341 - root - DEBUG - __exit__
2021-09-12 12:32:58,341 - root - DEBUG - exc_type {exc_type} e