Skip to content

Commit

Permalink
compute resource tests
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Oct 24, 2023
1 parent 8b800bc commit c69fab1
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 275 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ jobs:
run: cd python && pytest -m "not api" tests/ # make sure we are not depending on any of the additional packages in requirements.txt
- name: Install packages needed for api tests
run: pip install -r requirements.txt
- name: Install other packages needed for api tests
run: pip install httpx
- name: Run tests and collect coverage
run: cd python && pytest --cov protocaas --cov-report=xml --cov-report=term tests/
- uses: codecov/codecov-action@v3
Expand Down
13 changes: 9 additions & 4 deletions python/protocaas/api_helpers/clients/_get_mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
from .MockMongoClient import MockMongoClient


_globals = {
'mock_mongo_client': None
}

print('----- a1')

def _set_use_mock_mongo_client(use_mock: bool) -> None: # For testing
loop = asyncio.get_event_loop()
setattr(loop, '_use_mock_mongo_client', use_mock)
_globals['mock_mongo_client'] = MockMongoClient() if use_mock else None # type: ignore

def _get_mongo_client() -> Union[AsyncIOMotorClient, MockMongoClient]:
# We want one async mongo client per event loop
Expand All @@ -16,8 +21,8 @@ def _get_mongo_client() -> Union[AsyncIOMotorClient, MockMongoClient]:
return loop._mongo_client # type: ignore

# If we're using a mock client, return it
if hasattr(loop, '_use_mock_mongo_client') and loop._use_mock_mongo_client: # type: ignore
client = MockMongoClient()
if _globals['mock_mongo_client']:
client = _globals['mock_mongo_client'] # type: ignore
else:
# Otherwise, create a new client and store it in the global variable
mongo_uri = get_settings().MONGO_URI
Expand Down
30 changes: 22 additions & 8 deletions python/protocaas/api_helpers/routers/compute_resource/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

router = APIRouter()

_globals = {
'_use_compute_resource_mock_pubsub': False
}
def _set_use_compute_resource_mock_pubsub(use_mock_pubsub: bool):
_globals['_use_compute_resource_mock_pubsub'] = use_mock_pubsub

# get apps
class GetAppsResponse(BaseModel):
apps: List[ProtocaasComputeResourceApp]
Expand Down Expand Up @@ -66,14 +72,22 @@ async def compute_resource_get_pubsub_subscription(
compute_resource = await fetch_compute_resource(compute_resource_id)
if compute_resource is None:
raise ComputeResourceNotFoundException(f"No compute resource with ID {compute_resource_id}")
VITE_PUBNUB_SUBSCRIBE_KEY = get_settings().PUBNUB_SUBSCRIBE_KEY
if VITE_PUBNUB_SUBSCRIBE_KEY is None:
raise KeyError('Environment variable not set: VITE_PUBNUB_SUBSCRIBE_KEY')
subscription = PubsubSubscription(
pubnubSubscribeKey=VITE_PUBNUB_SUBSCRIBE_KEY,
pubnubChannel=compute_resource_id,
pubnubUser=compute_resource_id
)
if _globals['_use_compute_resource_mock_pubsub']:
subscription = PubsubSubscription(
pubnubSubscribeKey='mock-subscribe-key',
pubnubChannel=compute_resource_id,
pubnubUser=compute_resource_id
)
return GetPubsubSubscriptionResponse(subscription=subscription, success=True)
else:
VITE_PUBNUB_SUBSCRIBE_KEY = get_settings().PUBNUB_SUBSCRIBE_KEY
if VITE_PUBNUB_SUBSCRIBE_KEY is None:
raise KeyError('Environment variable not set: VITE_PUBNUB_SUBSCRIBE_KEY (compute_resource)')
subscription = PubsubSubscription(
pubnubSubscribeKey=VITE_PUBNUB_SUBSCRIBE_KEY,
pubnubChannel=compute_resource_id,
pubnubUser=compute_resource_id
)
return GetPubsubSubscriptionResponse(subscription=subscription, success=True)
except Exception as e:
traceback.print_exc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

router = APIRouter()

_globals = {
'_use_gui_mock_pubsub': False
}
def _set_use_gui_mock_pubsub(use_mock_pubsub: bool):
_globals['_use_gui_mock_pubsub'] = use_mock_pubsub

# get compute resource
class GetComputeResourceResponse(BaseModel):
computeResource: ProtocaasComputeResource
Expand Down Expand Up @@ -127,14 +133,21 @@ async def get_pubsub_subscription(compute_resource_id):
if compute_resource is None:
raise ComputeResourceNotFoundException(f"No compute resource with ID {compute_resource_id}")

VITE_PUBNUB_SUBSCRIBE_KEY = get_settings().PUBNUB_SUBSCRIBE_KEY
if VITE_PUBNUB_SUBSCRIBE_KEY is None:
raise KeyError('Environment variable not set: VITE_PUBNUB_SUBSCRIBE_KEY')
subscription = PubsubSubscription(
pubnubSubscribeKey=VITE_PUBNUB_SUBSCRIBE_KEY,
pubnubChannel=compute_resource_id,
pubnubUser=compute_resource_id
)
if _globals['_use_gui_mock_pubsub']:
subscription = PubsubSubscription(
pubnubSubscribeKey='mock-subscribe-key',
pubnubChannel=compute_resource_id,
pubnubUser=compute_resource_id
)
else:
VITE_PUBNUB_SUBSCRIBE_KEY = get_settings().PUBNUB_SUBSCRIBE_KEY
if VITE_PUBNUB_SUBSCRIBE_KEY is None:
raise KeyError('Environment variable not set: VITE_PUBNUB_SUBSCRIBE_KEY (gui)')
subscription = PubsubSubscription(
pubnubSubscribeKey=VITE_PUBNUB_SUBSCRIBE_KEY,
pubnubChannel=compute_resource_id,
pubnubUser=compute_resource_id
)
return GetPubsubSubscriptionResponse(subscription=subscription, success=True)
except Exception as e:
traceback.print_exc()
Expand Down
72 changes: 60 additions & 12 deletions python/protocaas/common/_api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@

protocaas_url = os.getenv('PROTOCAAS_URL', 'https://protocaas.vercel.app')

_globals = {
'test_client': None
}
def _use_api_test_client(test_client):
_globals['test_client'] = test_client

def _compute_resource_get_api_request(*,
url_path: str,
compute_resource_id: str,
Expand All @@ -25,9 +31,16 @@ def _compute_resource_get_api_request(*,
if compute_resource_node_id is not None:
headers['compute-resource-node-id'] = compute_resource_node_id

url = f'{protocaas_url}{url_path}'
test_client = _globals['test_client']
if test_client is None:
url = f'{protocaas_url}{url_path}'
client = requests
else:
assert url_path.startswith('/api')
url = url_path
client = test_client
try:
resp = requests.get(url, headers=headers, timeout=60)
resp = client.get(url, headers=headers, timeout=60)
resp.raise_for_status()
except: # noqa E722
print(f'Error in compute resource get api request for {url}')
Expand All @@ -49,9 +62,16 @@ def _compute_resource_post_api_request(*,
'compute-resource-signature': signature
}

url = f'{protocaas_url}{url_path}'
test_client = _globals['test_client']
if test_client is None:
url = f'{protocaas_url}{url_path}'
client = requests
else:
assert url_path.startswith('/api')
url = url_path
client = test_client
try:
resp = requests.post(url, headers=headers, json=data, timeout=60)
resp = client.post(url, headers=headers, json=data, timeout=60)
resp.raise_for_status()
except: # noqa E722
print(f'Error in compute resource post api request for {url}')
Expand All @@ -73,9 +93,16 @@ def _compute_resource_put_api_request(*,
'compute-resource-signature': signature
}

url = f'{protocaas_url}{url_path}'
test_client = _globals['test_client']
if test_client is None:
url = f'{protocaas_url}{url_path}'
client = requests
else:
assert url_path.startswith('/api')
url = url_path
client = test_client
try:
resp = requests.put(url, headers=headers, json=data, timeout=60)
resp = client.put(url, headers=headers, json=data, timeout=60)
resp.raise_for_status()
except: # noqa E722
print(f'Error in compute resource put api request for {url}')
Expand All @@ -86,9 +113,16 @@ def _processor_get_api_request(*,
url_path: str,
headers: dict
):
url = f'{protocaas_url}{url_path}'
test_client = _globals['test_client']
if test_client is None:
url = f'{protocaas_url}{url_path}'
client = requests
else:
assert url_path.startswith('/api')
url = url_path
client = test_client
try:
resp = requests.get(url, headers=headers, timeout=60)
resp = client.get(url, headers=headers, timeout=60)
resp.raise_for_status()
except: # noqa E722
print(f'Error in processor get api request for {url}')
Expand All @@ -100,9 +134,16 @@ def _processor_put_api_request(*,
headers: dict,
data: dict
):
url = f'{protocaas_url}{url_path}'
test_client = _globals['test_client']
if test_client is None:
url = f'{protocaas_url}{url_path}'
client = requests
else:
assert url_path.startswith('/api')
url = url_path
client = test_client
try:
resp = requests.put(url, headers=headers, json=data, timeout=60)
resp = client.put(url, headers=headers, json=data, timeout=60)
resp.raise_for_status()
except: # noqa E722
print(f'Error in processor put api request for {url}')
Expand All @@ -112,9 +153,16 @@ def _processor_put_api_request(*,
def _client_get_api_request(*,
url_path: str
):
url = f'{protocaas_url}{url_path}'
test_client = _globals['test_client']
if test_client is None:
url = f'{protocaas_url}{url_path}'
client = requests
else:
assert url_path.startswith('/api')
url = url_path
client = test_client
try:
resp = requests.get(url, timeout=60)
resp = client.get(url, timeout=60)
resp.raise_for_status()
except: # noqa E722
print(f'Error in client get api request for {url}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import socket
import time
import yaml
from typing import Optional
from typing import Optional, Tuple
from ..common._crypto_keys import sign_message, generate_keypair


Expand All @@ -18,7 +18,7 @@
'BATCH_AWS_REGION'
]

def register_compute_resource(*, dir: str, compute_resource_id: Optional[str] = None, compute_resource_private_key: Optional[str] = None, node_name: Optional[str] = None):
def register_compute_resource(*, dir: str, compute_resource_id: Optional[str] = None, compute_resource_private_key: Optional[str] = None, node_name: Optional[str] = None) -> Tuple[str, str]:
"""Initialize a Protocaas compute resource node.
Args:
Expand Down Expand Up @@ -74,6 +74,10 @@ def register_compute_resource(*, dir: str, compute_resource_id: Optional[str] =
print(url)
print('')

assert compute_resource_id is not None
assert compute_resource_private_key is not None
return compute_resource_id, compute_resource_private_key

def _random_string(length: int) -> str:
import random
import string
Expand Down
34 changes: 23 additions & 11 deletions python/protocaas/compute_resource/start_compute_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ def __init__(self):
compute_resource_node_name=self._node_name,
compute_resource_node_id=self._node_id
)
self._pubsub_client = PubsubClient(
pubnub_subscribe_key=pubsub_subscription['pubnubSubscribeKey'],
pubnub_channel=pubsub_subscription['pubnubChannel'],
pubnub_user=pubsub_subscription['pubnubUser'],
compute_resource_id=self._compute_resource_id
)
def start(self):
pubnub_subscribe_key = pubsub_subscription['pubnubSubscribeKey']
if pubnub_subscribe_key != 'mock-subscribe-key':
self._pubsub_client = PubsubClient(
pubnub_subscribe_key=pubnub_subscribe_key,
pubnub_channel=pubsub_subscription['pubnubChannel'],
pubnub_user=pubsub_subscription['pubnubUser'],
compute_resource_id=self._compute_resource_id
)
else:
self._pubsub_client = None
def start(self, *, timeout: Optional[float] = None): # timeout is used for testing
timer_handle_jobs = 0

# Start cleaning up old job directories
Expand All @@ -90,10 +94,11 @@ def start(self):
multiprocessing.Process(target=_cleanup_old_job_working_directories, args=(os.getcwd() + '/jobs',)).start()

print('Starting compute resource')
overall_timer = time.time()
while True:
elapsed_handle_jobs = time.time() - timer_handle_jobs
need_to_handle_jobs = elapsed_handle_jobs > 60 * 10 # normally we will get pubsub messages for updates, but if we don't, we should check every 10 minutes
messages = self._pubsub_client.take_messages()
messages = self._pubsub_client.take_messages() if self._pubsub_client is not None else []
for msg in messages:
if msg['type'] == 'newPendingJob':
need_to_handle_jobs = True
Expand All @@ -106,7 +111,14 @@ def start(self):
for slurm_job_handler in self._slurm_job_handlers_by_processor.values():
slurm_job_handler.do_work()

time.sleep(2)
overall_elapsed = time.time() - overall_timer
if timeout is not None and overall_elapsed > timeout:
print(f'Compute resource timed out after {timeout} seconds')
return
if overall_elapsed < 5:
time.sleep(0.01) # for the first few seconds we can sleep for a short time (useful for testing)
else:
time.sleep(2)
def _handle_jobs(self):
url_path = f'/api/compute_resource/compute_resources/{self._compute_resource_id}/unfinished_jobs'
if not self._compute_resource_id:
Expand Down Expand Up @@ -259,7 +271,7 @@ def _load_apps(*, compute_resource_id: str, compute_resource_private_key: str, c
ret.append(app)
return ret

def start_compute_resource(dir: str):
def start_compute_resource(dir: str, *, timeout: Optional[float] = None): # timeout is used for testing
config_fname = os.path.join(dir, '.protocaas-compute-resource-node.yaml')

if os.path.exists(config_fname):
Expand All @@ -272,7 +284,7 @@ def start_compute_resource(dir: str):
os.environ[k] = the_config[k]

daemon = Daemon()
daemon.start()
daemon.start(timeout=timeout)

def get_pubsub_subscription(*, compute_resource_id: str, compute_resource_private_key: str, compute_resource_node_name: Optional[str] = None, compute_resource_node_id: Optional[str] = None):
url_path = f'/api/compute_resource/compute_resources/{compute_resource_id}/pubsub_subscription'
Expand Down
Loading

0 comments on commit c69fab1

Please sign in to comment.