In [10]:
#!/usr/bin/env python3

# %env ROS_DISTRO=noetic
# %env ROS_PACKAGE_PATH=/ARChemist_ws/src/roslabware_msgs:$ROS_PACKAGE_PATH
# # export ROS_MASTER_IP=172.31.1.77
# # export ROS_MASTER_URI=http://$ROS_MASTER_IP:11311
# %env ROS_MASTER_URI=http://172.31.1.77:11311


import unittest

import time

from archemist.stations.apc_weighing_station.state import (
    APCWeighingStation, 
    APCOpenBalanceDoorOp, 
    APCCloseBalanceDoorOp,
    APCTareOp,
    APCWeighingOp,
    APCWeighResult
)

from archemist.core.state.material import Liquid

from archemist.stations.apc_weighing_station.handler import APCWeighingStationHandler
from archemist.core.state.batch import Batch
from archemist.core.state.lot import Lot
from archemist.core.util.enums import StationState, OpOutcome
from archemist.core.state.station_op_result import ProcessOpResult, MaterialOpResult
from archemist.core.persistence.models_proxy import ModelProxy, ListProxy, DictProxy
from archemist.stations.apc_weighing_station.process import APCWeighingProcess
from archemist.stations.apc_weighing_station.handler import SimAPCWeighingStationHandler

from archemist.core.state.robot_op import RobotTaskOp, RobotWaitOp
from archemist.core.state.batch import Batch
from archemist.core.state.lot import Lot
from archemist.core.util.enums import OpOutcome, ProcessStatus

from datetime import datetime

from mongoengine import connect

print("importing done")


class APCWeighingStationHandlerTest(unittest.TestCase):
    def setUp(self):
        self._db_name = 'archemist_test'
        self._client = connect(db=self._db_name, host='mongodb://localhost:27017', alias='archemist_state')

        self.station_doc = {
            'type': 'APCWeighingStation',
            'id': 35,
            'location': {'coordinates': [1,7], 'descriptor': "ApcWeighingStation"},
            'total_lot_capacity': 1,
            'handler': 'StationOpHandler',
            'properties': 
            {
                'funnel_storage_capacity': 3
            },
            'materials': None
        }

    def test_handler(self):
        station = APCWeighingStation.from_dict(self.station_doc)
        self.assertIsNotNone(station)
        self.assertEqual(station.state, StationState.INACTIVE)
            # construct lot
        batch = Batch.from_args(1)
        lot = Lot.from_args([batch])

        # add lot to station
        station.add_lot(lot)

        # construct handler
        handler = APCWeighingStationHandler(station)
        # initialise handler
        self.assertTrue(handler.initialise())

        # test IKAHeatStirBatchOp
        # construct op
        sample = lot.batches[0].samples[0]
        t_op = APCWeighingOp.from_args(target_sample=sample)
        # t_op = APCCloseBalanceDoorOp.from_args()

        self.assertIsNotNone(t_op.object_id)

        station.add_station_op(t_op)
        station.update_assigned_op()
        self.assertEqual(station.assigned_op, t_op)


        # execute op
        handler._seq_id = 40
        handler.execute_op()

        # wait for results
        handler.is_op_execution_complete()

        self.assertEqual(handler.is_op_execution_complete(), False)

        while True:
            if handler.is_op_execution_complete():
                print("got the completion msg, moving on")
                print("read mass is :", handler.read_weight , "g")
                break
            else:
                print("didn't get the completion msg, hence trying")
            time.sleep(2)

        self.assertEqual(handler.is_op_execution_complete(), True)

        outcome, op_results = handler.get_op_result()

        self.assertEqual(outcome, OpOutcome.SUCCEEDED)
        self.assertIsInstance(op_results[0], APCWeighResult)
        # self.assertEqual(op_results, None)



# if __name__ == '__main__':
#     unittest.main()


importing done


In [11]:
# Create a TestSuite
Balance_suite = unittest.TestLoader().loadTestsFromTestCase(APCWeighingStationHandlerTest)
# Run the tests
unittest.TextTestRunner().run(Balance_suite)

[Batch-65b8d2c6448157c354cdb9ff]: (APCWeighingStation_35) stamp is added.
[APCWeighingStation_35]: Lot-65b8d2c6448157c354cdba01 is added for processing at slot 0
[APCWeighingStation_35]: <archemist.stations.apc_weighing_station.state.APCWeighingOp object at 0x7f12382392b0> is added to queued_op list
[INFO] [1706611400.248998]: Reading stable weight.
didn't get the completion msg, hence trying
didn't get the completion msg, hence trying


.

got the completion msg, moving on
read mass is : 0.05000000074505806 g



----------------------------------------------------------------------
Ran 1 test in 6.238s

OK


<unittest.runner.TextTestResult run=1 errors=0 failures=0>

In [12]:
#!/usr/bin/env python3

# %env ROS_DISTRO=noetic
# %env ROS_PACKAGE_PATH=/ARChemist_ws/src/roslabware_msgs:$ROS_PACKAGE_PATH
# # export ROS_MASTER_IP=172.31.1.77
# # export ROS_MASTER_URI=http://$ROS_MASTER_IP:11311
# %env ROS_MASTER_URI=http://172.31.1.77:11311


import unittest

import time

from archemist.stations.apc_weighing_station.state import (
    APCWeighingStation, 
    APCOpenBalanceDoorOp, 
    APCCloseBalanceDoorOp,
    APCTareOp,
    APCWeighingOp,
    APCWeighResult
)

from archemist.core.state.material import Liquid

from archemist.stations.apc_weighing_station.handler import APCWeighingStationHandler
from archemist.core.state.batch import Batch
from archemist.core.state.lot import Lot
from archemist.core.util.enums import StationState, OpOutcome
from archemist.core.state.station_op_result import ProcessOpResult, MaterialOpResult
from archemist.core.persistence.models_proxy import ModelProxy, ListProxy, DictProxy
from archemist.stations.apc_weighing_station.process import APCWeighingProcess
from archemist.stations.apc_weighing_station.handler import SimAPCWeighingStationHandler

from archemist.core.state.robot_op import RobotTaskOp, RobotWaitOp
from archemist.core.state.batch import Batch
from archemist.core.state.lot import Lot
from archemist.core.util.enums import OpOutcome, ProcessStatus

from datetime import datetime

from mongoengine import connect

print("importing done")


class APCWeighingStationDoorHandlerTest(unittest.TestCase):
    def setUp(self):
        self._db_name = 'archemist_test'
        self._client = connect(db=self._db_name, host='mongodb://localhost:27017', alias='archemist_state')

        self.station_doc = {
            'type': 'APCWeighingStation',
            'id': 35,
            'location': {'coordinates': [1,7], 'descriptor': "ApcWeighingStation"},
            'total_lot_capacity': 1,
            'handler': 'StationOpHandler',
            'properties': 
            {
                'funnel_storage_capacity': 3
            },
            'materials': None
        }

    def test_handler(self):
        station = APCWeighingStation.from_dict(self.station_doc)
        self.assertIsNotNone(station)
        self.assertEqual(station.state, StationState.INACTIVE)
            # construct lot
        batch = Batch.from_args(1)
        lot = Lot.from_args([batch])

        # add lot to station
        station.add_lot(lot)

        # construct handler
        handler = APCWeighingStationHandler(station)
        # initialise handler
        self.assertTrue(handler.initialise())

        # test IKAHeatStirBatchOp
        # construct op
        sample = lot.batches[0].samples[0]
        # t_op = APCWeighingOp.from_args(target_sample=sample)
        t_op = APCOpenBalanceDoorOp.from_args()
        # t_op = APCCloseBalanceDoorOp.from_args()

        self.assertIsNotNone(t_op.object_id)

        station.add_station_op(t_op)
        station.update_assigned_op()
        self.assertEqual(station.assigned_op, t_op)


        # execute op
        handler._seq_id = 2
        handler.execute_op()

        # wait for results
        handler.is_op_execution_complete()

        self.assertEqual(handler.is_op_execution_complete(), False)

        while True:
            if handler.is_op_execution_complete():
                print("got the completion msg, moving on")
                break
            else:
                print("didn't get the completion msg, hence trying")
            time.sleep(2)

        self.assertEqual(handler.is_op_execution_complete(), True)

        outcome, op_results = handler.get_op_result()

        self.assertEqual(outcome, OpOutcome.SUCCEEDED)
        # self.assertIsInstance(op_results[0], APCWeighResult)
        self.assertEqual(op_results, None)



# if __name__ == '__main__':
#     unittest.main()


importing done


In [13]:
# Create a TestSuite
# Balance_suite = unittest.TestLoader().loadTestsFromTestCase(APCWeighingStationHandlerTest)

Door_suite = unittest.TestLoader().loadTestsFromTestCase(APCWeighingStationDoorHandlerTest)


In [14]:
# Run the tests
unittest.TextTestRunner().run(Door_suite)

[Batch-65b8d2ec448157c354cdba05]: (APCWeighingStation_35) stamp is added.
[APCWeighingStation_35]: Lot-65b8d2ec448157c354cdba07 is added for processing at slot 0
[APCWeighingStation_35]: <archemist.stations.apc_weighing_station.state.APCOpenBalanceDoorOp object at 0x7f123ba11760> is added to queued_op list
[INFO] [1706611438.923251]: Opening balance door.
didn't get the completion msg, hence trying
didn't get the completion msg, hence trying
didn't get the completion msg, hence trying
didn't get the completion msg, hence trying
didn't get the completion msg, hence trying


.

got the completion msg, moving on



----------------------------------------------------------------------
Ran 1 test in 12.169s

OK


<unittest.runner.TextTestResult run=1 errors=0 failures=0>