In [1]:
import numpy as np
import threading

class Experiment:
    def __init__(self, x):
        if x == True:
            self.vacuum = 1e-7
            self.quad_pulse = "CCC"
            self.quad_voltage = 13.2
            self.quad_spark = False
            self.quad_rf = True
            self.ctags = 310
            self.K1 = 7.39
            self.K2 = 7.78 
            self.K3 = 7.77
            self.kicker_status = True
            self.ps_feedback = True
            self.ps_feedback_hold= False
            self.collimators = "in"
            #self.pp = [-4000, 793, 543]
            #self.mini_scifi = "out"
            self.kicker_spark = False
            self.pot = 1e12
            self.trolley_in = False
            self.IBMS_signal_present = True
            self.tracker_data_ok = True
            self.magnet_current = 5173.5
            self.inflector_current = 2762.3
            self.permit = True
            self.beam_CDC = True
            self.m5_magnet = 789.3
        
    def getStatus(self, x):
        """Get the experiment's status.
        
           Returns:
               json: object representing the experiments status in key:value pairs
        """
        self.update(True)
        return {'vacuum':self.vacuum,
                'quad_pulse':self.quad_pulse,
                'quad_voltage':self.quad_voltage,
                'quad_spark':self.quad_spark,
                'quad_rf':self.quad_rf,
                'ctags':self.ctags,
                'kicker_1':self.K1,
                'kicker_2':self.K2,
                'kicker_3':self.K3,
                'kicker_status':self.kicker_status,
                'kicker_spark':self.kicker_spark,
                'ps_feedback':self.ps_feedback,
                'self.ps_feedback_hold':self.ps_feedback_hold,
                'collimators':self.collimators,
                'pot':self.pot,
                'IBMS_signal_present':self.IBMS_signal_present,
                #'tracker_data_ok':self.tracker_data_ok,
                'inflector_current':self.inflector_current,
                'magnet_current':self.magnet_current,
                'beam_permit':self.permit,
                'beam_CDC':self.beam_CDC,
                'm5_current':self.m5_magnet }

    def printStatus(self, x):
        return f'''
        vacuum: {self.vacuum}
        quad_pulse: {self.quad_pulse}
        quad_voltage: {self.quad_voltage}
        quad_spark: {self.quad_spark}
        quad_rf: {self.quad_rf}
        ctags: {self.ctags}
        kicker_1: {self.K1}
        kicker_2: {self.K2}
        kicker_3: {self.K3}
        kicker_status: {self.kicker_status}
        kicker_spark: {self.kicker_spark}
        ps_feedback: {self.ps_feedback}
        self.ps_feedback_hold: {self.ps_feedback_hold}
        collimators: {self.collimators}
        pot: {self.pot}
        IBMS_signal_present: {self.IBMS_signal_present}
        inflector_current:{self.inflector_current}
        magnet_current: {self.magnet_current}
        beam_permit: {self.permit}
        beam_CDC: {self.beam_CDC}
        m5_current: {self.m5_magnet}
        '''
        

    def update(self, x):
        """Update the internal logic of the toy experiment
        """
        ctags = 310
        if self.quad_pulse != "CCC" or\
           self.quad_spark or\
           not self.kicker_status or\
           self.kicker_spark or\
           (self.magnet_current < 5130.) or\
           (self.inflector_current < 2700.) or\
           not self.permit or\
           not self.beam_CDC:
            self.ctags = 0
        else:
           self.ctags = ctags * self.pot/1e12
           if self.K1 < 7 or\
              self.K2 < 7 or\
              self.K3 < 7 or\
              self.quad_voltage < 12.:
                self.ctags = 20

        if not self.permit or\
           not self.beam_CDC or\
           self.pot < 1e3:
            self.IBMS_signal_present = False
        else:
            self.IBMS_signal_present = True

        if self.collimators != "in": # reduce ctags if collimators are out
            self.ctags = self.ctags * 0.7 

        if self.vacuum > 5e-5:
            self.kicker_status = "stopped"
        if self.vacuum > 6e-5:
            self.quadSpark()
        
    def setKickers(self,k1,k2,k3,status):
        """
        Controlls the experiment's kickers.
        
        Args:
          k1: voltage of kicker 1 in kV
          k2: voltage of kicker 2 in kV
          k3: voltage of kicker 3 in kV
          status: if True the kickers pulse, if False kicker pulsing is inhebited. Setting status to true resets the kicker_spark flag.
         
        Returns:
          error: returns 0 if the setting was successful, any non 0 return corresponds to an error code and indicates not successful iperation. 
        """
        self.K1 = k1 if k1 < 8. else 8.
        self.K2 = k2 if k2 < 8. else 8.
        self.k3 = k3 if k3 < 8. else 8.
        self.kicker_status = status
        if self.kicker_status:
            self.kicker_spark = False
        return 0 if ((k1 < 8.) and (k2 < 8.) and (k3 < 8.)) else -1
    def setCollimator(self, status):
        """
        Insert or retract the collimators.
        
        Args:
            status: "in" or "out"
        
        Returns:
            ret: 0 if the operations is a sucess, -1 otherwise
        """
        self.collimators = status
        return 0
    def setQuads(self,voltage, status):
        """
        Set the quad voltage and pulsing status.
        
        Args:
            voltage: voltage in kV the quads are set to
            status: pulsing sstatus, one of "stopped", "1Hz", "5Hz", "CCC". Setting the status resets the quad spark flag.
            
        Returns:
            ret: 0 if the operations was a sucess, non-zero error code otherwise.
                 -1: too high voltage
                 -2: unknown status
        """
        if voltage > 14.:
            return -1
        self.quad_voltage = voltage
        if status in ["stopped","1Hz","5Hz","CCC"]:
            self.quad_pulse = status
        else:
            return -2
        self.quad_spark = False
        return 0


    def resetVacuum(self, x):
        print("resetVacuum")
        self.vacuum = 1e-7
    
    def simQuadSpark(self,time_for_vacuum_recovery=3):
        self.quad_spark = True
        self.quad_pulse = "stopped"
        #if the vaccum is ok, create a spike and reset after 10s
        if self.vacuum < 1e6:
            self.vacuum = 1e-6
            print("A")
            timer = threading.Timer(time_for_vacuum_recovery, self.resetVacuum)
            timer.start()

    def simKickerSpark(self):
        self.kicker_spark = True
        self.kicker_status = False

    def simBeamDrop(self):
        self.pot = 0.      

    def simTrolleyRun(self):
        self.quad_pulse = "stopped"
        self.quad_voltage = 0.
        self.quad_rf = False
        self.K1, self.K2, self.K3 = 0,0,0
        self.kicker_status = False
        self.collimators = "out"
        self.beam_CDC = False
        self.permit = False
        self.trolley_in = True
            

In [65]:
class Logbook:
    def __init__(self, x):
        self.entries = [
        {"date" : "2024-07-15", "info" : "blah" }, 
        {"date" : "2024-07-18", "info" : '''
        Several small leaks, and one pretty big one.
                In the Delivery Ring:
                Q118 has (at least) three leaks we found: one on the manifold where the orange hoses connect, another
                on the manifold where one of the small black hoses connects, and one on the orange hose where it
                connects to the copper sub-header.
                H607 has a single leak where the black hose connects to the magnet conductor. This is not new - it has
                been leaking here for a couple years now.
                The copper sub-header turn around hose behind Q620 is our big leak. Right where the rubber turn
                around hose is connected to the end of the header, it is leaking badly - several heavy drops per second.
                From the looks of the hose clamp around the copper, this was a temporary fix done sometime in the
                past and has started leaking again.
                The large black hose on upstream H519 is leaking at the connection to the conductor.
                H314 is leaking where the orange hose connects to the conductor on the upstream end.
                Q308 has a leak on the header valve behind the magnet.
                The header valve behind Q205 is dripping. Very hard to fix with the magnet in place. Should be fixed
                when we swap the magnet.
                In PreTarget:
                The ceiling header over HV102A is leaking from a capped connection point. This has been a problem
                for a couple of years and seems to be slowly getting worse.
                While walking through AP-0, we found the air filters for crate $71 and $72 completely clogged with dirt.
                Chris O has been notified.
        
        '''},
        {"date" : "2024-07-18", "info" : "Iouri informed Ops that there is a new version of the M31 program 'Muon Clock' listed as Z1 'Ivanov 0067.' Should an issue arise while using M31, he is instructing users to use Z1 instead." },
        {"date" : "2024-07-19", "info": "MI-62 SB LCW Chipmunk (RD1216) tripped on Missing Pulse at 13:57 yesterday. AP-30 D:H744  Chipmunk (RD2098) tripped on FSV at 01:54 this morning. Both rad detectors are in integrate mode, and both reset OK"},
        {"date" : "2024-07-19", "info" : "The AP0 dump alarms became unsnoozed. Jim and ops snooze them for two weeks. They are snoozed until 08-05-24 at 0830."},
        {"date" : "2024-07-20", "info" : "Most recent."},
        {"date" : "2024-07-05", "info" : "Not most recent."}    

        ]
    
    def getInfo(self, date):
        for entry in self.entries:
            if entry["date"] == date:
                return entry["info"]

    def getLatest(self, x):
        newList = sorted(self.entries, key=lambda d: d["date"])
        return newList[-1]["info"]

    def getEntry(self, keyword):
        string = ""
        for entry in self.entries:
            if keyword in entry["info"]:
                string += f"Entry {entry["date"]}: " + entry["info"] + "\n\n"

        return string

