In [None]:
pip install simpy



In [None]:
import random as r
import math as m
import pandas as pd
import simpy

# Ennables this file to write to files in Google Drive:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
RANDOM_SEED = 1 # Sets the random seed to a constant for dev. Remove this line to get random data.

TAKT_TIME = 18 # The frequency, in time units, that the simulation generates new units.

SIM_UNITS = 1000 # The number of units the simulation models.
SIM_TIME = TAKT_TIME * SIM_UNITS * 2 # Sets the simulation time to much longer than required to process the number of units defined in SIM_UNITS.

PARAMETER_COUNT = 10 # Defines the number of parametic measurements.

data = [] # Variable to hold log data.

################################################################################
# Class Definitions:
################################################################################

# Define the processes and their operators:
class Process(object):

  def __init__(self, env, PID, CT, parameter_type):
    self.env = env
    self.derived_HC = m.ceil(CT / TAKT_TIME) # Derive headcount from cycle time and takt time.
    self.HC = simpy.Resource(env, self.derived_HC) # Set HC.
    self.CT_perscribed = CT # Set prescribed CT in units of time.
    self.OP_busy_states = generate_OP_busy_states(self.derived_HC) #initialize all the busy states to false
    self.PID = PID # Set process ID in integer form for loops, etc.
    self.process_ID = "P%i" % PID # Sets process ID in string form for data logging.

    # CT variation factors:
    self.factor_trueCT = r.normalvariate(1, 0.05) # The different between theoretical and true CT, same for all OPs.
    self.factor_experience = generate_factor_experience(self.derived_HC, 1, 0.05) # Experience of OP, differs between OPs.
    # TrueCT, experience, and a failure injection factor, as well as an additional random factor, are multiplied by perscribed CT to result in actual CT.

    # Parametric data stuff:
    self.parameter_type = parameter_type # Indicates what type of parametric data is recorded at this staton (0: none, 1: discrete, 2: cumulative).
    self.parameter_values = generate_parameter_values(self.derived_HC, r.randint(1, 10), 0.05) # Generates set of parametric values based on number of operators at process.

    # Failure Injection stuff:
    self.gremlins = [] # Creates a varible to hold gremlins.
    self.symptoms = [] # Creates a varible to hold gremlins.

  def process(self, unit): # Models what happens to a unit when it is at a station.

    OP_index = 0 # Creates a varible to track which OP will process the unit.
    while OP_index < self.derived_HC: # Loop through the possible OP indices at the station to identify which OP is avalible (starting from the lowest index).
      if self.OP_busy_states[OP_index] == False: # Checks if the OP is available.
        self.OP_busy_states[OP_index] = True # Sets the OP to be busy.

        self.check_unit(unit, OP_index) # Runs the check unit method.

        if unit.stop == True: # Checks if the unit is flagged to fail at this station.
          self.OP_busy_states[OP_index] = False # Frees up the OP.
          break # Stops the processing sequence without waiting for the processing time and without logging the data.

        if (self.parameter_type != 0): # Checks if this station has a parametric measurement.
          self.make_measurement(unit, OP_index) # Calculates the value of the parametric measurement at this process.

        yield self.env.timeout(round(self.CT_perscribed * self.factor_trueCT * self.factor_experience[OP_index] * unit.symptom_CT_factor * r.normalvariate(1, 0.05))) # Creates a delay to model processing time.

        self.log_data(unit, OP_index) # Logs data.

        self.OP_busy_states[OP_index] = False # Frees up the OP.

        break # Ends processing of unit.

      OP_index += 1 # Increments OP index if the available OP was not identified.

  def check_unit(self, unit, OP_index): # Cycles through the gremlins and symptoms at the station, if any, and runs the applicable check methods.
    for gremlin in self.gremlins:
      gremlin.check(unit, OP_index)

    for symptom in self.symptoms:
      symptom.check(unit)

  def make_measurement(self, unit, OP_index): # Calculates the parametric unit for this unit at this process.
    unit.parametric_dataset[self.PID - 1] = calc_parametric_output(self.parameter_type, self.parameter_values[OP_index], unit)

  def log_data(self, unit, OP_index): # Logs data.
    OP_ID = "%s-%i" % (self.process_ID, OP_index + 1) # Creates OP ID string.

    print("%s,%s,%s,%s,%s,%s" % (unit.unit_ID, self.process_ID, OP_ID, str(self.env.now), str(unit.parametric_dataset[self.PID - 1]), str(unit.wounded_at_this_station)))
    # print("%s,%s,%s,%s,%s" % (unit.unit_ID, self.process_ID, OP_ID, str(self.env.now), str(unit.parametric_dataset[self.PID - 1])))
    data.append({"unit_id": unit.unit_ID, "process_id": self.process_ID, "operator_id": OP_ID, "timestamp": str(env.now), "measurement": str(unit.parametric_dataset[self.PID - 1]), "wounded": str(unit.wounded_at_this_station)})
    # data.append({"unit_id": unit.unit_ID, "process_id": self.process_ID, "operator_id": OP_ID, "timestamp": str(env.now), "measurement": str(unit.parametric_dataset[self.PID - 1])})

    unit.wounded_at_this_station = 0 # Resets flag indicating whether the unit was wounded by a gremlin at this station (for model training).


# Control behavior of each unit:
class Unit(object):

  def __init__(self, env, unit_ID, processes):
    self.env = env
    self.unit_ID = unit_ID # Sets unit ID.
    self.processes = processes # Gives the unit the set of processes on the assembly line.

    self.parameter_memory = 0.0 # Keeps track of the past value for cumulative parametric data.
    self.parametric_dataset = generate_parametric_dataset(PARAMETER_COUNT) # Creates variable to hold parametric data.

    self.wounds = generate_woundset(PARAMETER_COUNT) # Creates variable to keep track of wounds (for failure injection).
    self.wounded_at_this_station = 0 # Creates a variable to flag if the unit was wounded at this station (for model training).

    self.stop = False # Creates a variable to flag if the unit will fail at the current process.
    self.symptom_CT_factor = 1.0 # Creates a variable to store the factor by which a failure symptom will impact the cycle time at a certain process.
    self.symptom_measurement_factor = 1.0 # Creates a variable to store the factor by which a failure symptom will impact the parametric measurement at a certain process.

  def run(self): # Takes the unit through each process on the assembly line.
    i = 0 # Index for which process the unit is at.
    while i < 10: # Loops through each process (celing based on number of processes define at setup).
      with self.processes[i].HC.request() as request: # Requests an OP at the current process.
        yield request # Waits for an OP to become available.

        yield self.env.process(self.processes[i].process(self)) # Processes the unit.

      if self.stop == True: # Takes the unit off the line if it is flagged for failure.
        break

      i += 1 # Increments the index to go to the next process.


# Creates a gremlin to sit at a process and mark units for failure.
class Gremlin(object):
  def __init__(self, failure_type, root_cause, FR, fuse_length, processes):
    self.failure_type = failure_type # Sets the type of failure (0: process, 1: OP. Process affects all units at a process while OP only affects units for a certain OP at a process.)
    self.root_cause = root_cause # Sets the station that is the root cause of the failure.
    self.FR = FR # Sets the failure rate. (Decimal between 0 and 1.)
    self.fuse_length = fuse_length # Sets the fuse length which determines how long units marked for failure will stay on the line before dropping out.

    if failure_type == 1: # OP failure case
      self.OP_target_index = r.randint(0, processes[root_cause - 1].derived_HC - 1) # Picks an OP to be the failure root cause.

  def check(self, unit, OP_index): # Depending on the type of failure and failure rate, randomly marks unit for failure.
    if self.failure_type == 1: # OP failure case
      if OP_index == self.OP_target_index: # Checks whether the unit is at the bad OP.
        if (r.randint(1,100) <= (self.FR * 100)): # Randomly determines if the unit will fail, based on the failure rate.
          self.wound(unit) # Calls the wound method.
    else: # Process failure case
      if (r.randint(1,100) <= (self.FR * 100)): # Randomly determines if the unit will fail, based on the failure rate.
        self.wound(unit) # Calls the wound method.

  def wound(self, unit): # Marks the unit for failure.
    process_index = self.root_cause - 1 # Starts the loop at the root cause station.
    while (process_index < self.root_cause + self.fuse_length): # Loops through all stations from the root cause to the end of the fuse, where units drop off the line.
      unit.wounds[process_index] = 1 # Marks the unit for each station where it will experience symptoms.
      unit.wounded_at_this_station = 1 # Flags that the unit was wounded at this station (for model training).
      process_index += 1 # Increments the process index.


# Symptoms change the behavior of units at processes.
class Symptom(object):
  def __init__(self, process_index, CT_impact_probability, measurement_impact_probability):
    self.process_index = process_index # Sets the process that this symptom is assigned to.
    self.CT_impact_probability = CT_impact_probability # Sets the probability that this symptom will impact cycle time.
    self.measurement_impact_probability = measurement_impact_probability # Sets the probability that this symptom will impact parametric measurements.

    self.symptom_CT_factor = 1.0 # Sets the factor by which the CT will be impacted by this symptom.
    self.symptom_measurement_factor = 1.0 # Sets the factor by which the measurement will be impacted by this symptom.
    self.straw = False # Sets whether this is the process where failure units drop off the line.


  def generate_symptom_factors(self, target_process):

    if target_process.parameter_type != 0: # Only attempts to change measurement factor if it is a process where a measurement occurs.
      if (r.randint(1,100) <= (self.measurement_impact_probability * 100)): # Determines if this symptom affects parametric measurements, based on the probability.
        self.symptom_measurement_factor *= r.normalvariate(1, 0.5) # Changes the measurement factor.

    if (r.randint(1,100) <= (self.CT_impact_probability * 100)): # Determines if this symptom affects cycle time, based on the probability.
      self.symptom_CT_factor *= r.normalvariate(1, 0.5) # Changes the CT factor.

  def check(self, unit):
    if unit.wounds[self.process_index] == 1: # Checks if the unit has been flagged by the gremlin.
      self.affect_unit(unit) # Calls the affect unit method.

  def affect_unit(self, unit):
    unit.symptom_CT_factor = self.symptom_CT_factor # Changes the CT factor for the unit.
    unit.symptom_measurement_factor = self.symptom_measurement_factor # Changes the parametric measurement factor for the unit.
    if self.straw == True: # Checks if this process is where units will drop off the line.
      unit.stop = True # Marks units to be dropped off the assembly line.


################################################################################
# Function Definitions:
################################################################################


# Generate a list of experience factors (for CT variation):
def generate_factor_experience(HC, mu, sigma):
  factor_experince = []

  i = 0
  while i < HC:
    factor_experince.append(r.normalvariate(mu, sigma))
    i += 1

  return factor_experince


# Generate a list of OP busy states:
def generate_OP_busy_states(HC):
  OP_busy_states = []

  i = 0
  while i < HC:
    OP_busy_states.append(False)
    i += 1

  return OP_busy_states


# Generate a list of parametric measurement values for a process:
def generate_parameter_values(HC, mu, sigma):
  parameter_values = []

  i = 0
  while i < HC:
    parameter_values.append(r.normalvariate(mu, sigma))
    i += 1

  return parameter_values


# Calculates the parametric measurement for a process:
def calc_parametric_output(parameter_type, parameter_value, unit):
  if parameter_type == 0:
    return None
  elif parameter_type == 1: # discrete case
    return parameter_value
  else: # continuous case
    unit.parameter_memory += parameter_value
    return unit.parameter_memory


# Generate a list to store parametric measurements for units:
def generate_parametric_dataset(parameter_count):

  parametric_dataset = []

  i = 0
  while i < parameter_count:
    parametric_dataset.append(None)
    i += 1

  return parametric_dataset


# Generate a list to store wounds for units (for failure injection):
def generate_woundset(n):

  woundset = []

  i = 0
  while i < n:
    woundset.append(None)
    i += 1

  return woundset


# Generates a gremlin and sypmtoms based on input parameters:
def gremlin_spawner(failure_type, root_cause, FR, fuse_length, CT_impact_probability, measurement_impact_probability, processes):
  # failure_type: 0 for process failure (can affect all units for a certain process), 1 for OP failure (can only affect units of a certain operator at a certain process)
  # root_cause:  the source of the failures
  # FR: the failure rate
  # fuse_length: how many stations before the affected units start dropping out

  # Checks to make sure the specs are not out of bounds (this should never run and intentionally causes a runtime error):
  if root_cause + fuse_length > 10:
    runtime_error = 1/0

  new_gremlin = Gremlin(failure_type, root_cause, FR, fuse_length, processes) # Creates a new gremlin object
  processes[root_cause - 1].gremlins.append(new_gremlin) # Assigns the gremlin to the root cause process.

  i = root_cause - 1 # Sets the index to the root cause process.
  while (i < root_cause + fuse_length): # Loops through the stations from the root cause to the end of the fuse where units drop off the line.

    new_symptom = Symptom(i, CT_impact_probability, measurement_impact_probability) # Creates a new symptom object.

    new_symptom.generate_symptom_factors(processes[i]) # Generates sypmtom factors for the new symptom.

    if (i == root_cause + fuse_length - 1): # Checks if the index represents the process where problem will manifest and cause failures
      new_symptom.straw = True # Marks the symptom object to cause objects to fail (straw that broke the camel's back)

    processes[i].symptoms.append(new_symptom) # Assigns the symptoms to the process.

    i += 1 # Increments to go to the next process.

# Setup the simulation:

def setup(env, takt_time):

  # Initialize the processes: (env, process id, CT, parameter type[0:none, 1:discrete, 2:cumulative])
  processes = []
  processes.append(Process(env, 1, 42, 1))
  processes.append(Process(env, 2, 53, 2))
  processes.append(Process(env, 3, 88, 0))
  processes.append(Process(env, 4, 45, 0))
  processes.append(Process(env, 5, 73, 1))
  processes.append(Process(env, 6, 108, 1))
  processes.append(Process(env, 7, 86, 0))
  processes.append(Process(env, 8, 31, 2))
  processes.append(Process(env, 9, 94, 0))
  processes.append(Process(env, 10, 98, 1))

  # Spawn Gremlins (inject failures):
  gremlin_spawner(0, 6, 0.50, 3, 1.00, 1.00, processes) # Creates an process-type failure at process 6 with 50% Failure Rate that manifests at process 9, that has a 100% chance to impact CT, and 100% chance to impact a measurement.
  # gremlin_spawner(1, 3, 1.00, 2, 0.60, 0.90, processes) # Creates an OPs-type failure at process 3 with 100% Failure Rate that manifests at process 5, that has a 60% chance to impact CT, and 90% chance to impact a measurement.

  # Spawn units throught the simulation:
  i = 0
  while i < SIM_UNITS:
    i += 1
    new_unit = Unit(env, 'Unit %d' % i, processes) # Creates a new unit.
    env.process(new_unit.run()) # Calls the run method for the unit.
    yield env.timeout(takt_time) # Waits to space units out on the line.


print('Start Data Synthesis')

r.seed(RANDOM_SEED)  # Set the random seed.

# Create an environment and start the setup process:
env = simpy.Environment()
env.process(setup(env, TAKT_TIME))

# Runs the simulation:
env.run(until=SIM_TIME)

print('End Data Synthesis')

# Saves data to a csv:
df = pd.DataFrame(data)
# df.to_csv("/content/drive/MyDrive/Data Mining Project/Notebooks/data.csv")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Unit 473,P2,P2-1,8688,2.035120646881631,0
Unit 460,P5,P5-2,8691,3.9698577293958057,0
Unit 451,P7,P7-1,8693,None,0
Unit 455,P6,P6-3,8696,4.892144164124808,1
Unit 469,P3,P3-3,8698,None,0
Unit 475,P2,P2-3,8702,1.9606895096908905,0
Unit 462,P5,P5-4,8705,4.0577190846058535,0
Unit 466,P4,P4-2,8705,None,0
Unit 482,P1,P1-2,8706,1.0633234517059733,0
Unit 474,P2,P2-2,8707,1.948473969973142,0
Unit 377,P8,P8-2,8711,10.985164630547093,0
Unit 375,P8,P8-1,8713,10.96066407155643,0
Unit 467,P4,P4-3,8716,None,0
Unit 471,P3,P3-4,8721,None,0
Unit 453,P7,P7-2,8721,None,0
Unit 483,P1,P1-3,8724,0.9500131862917971,0
Unit 468,P4,P4-1,8728,None,0
Unit 464,P5,P5-5,8731,4.030327718373632,0
Unit 461,P5,P5-1,8732,4.025847089966985,0
Unit 484,P1,P1-1,8733,1.0228956463972365,0
Unit 454,P6,P6-6,8734,5.052844671705214,1
Unit 470,P3,P3-5,8734,None,0
Unit 370,P10,P10-2,8735,9.962389387788699,0
Unit 450,P7,P7-4,8735,None,0
Unit 476,P2,P2-1,8743,2.03512064688