Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] adding tasks/assignments dynamically #188

Merged
merged 17 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions mephisto/core/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,6 @@ def parse_and_launch_run(

blueprint = BlueprintClass(task_run, task_args)
initialization_data_array = blueprint.get_initialization_data()
# TODO(#99) extend
if not isinstance(initialization_data_array, list):
raise NotImplementedError(
"Non-list initialization data is not yet supported"
)

# Link the job together
job = self.supervisor.register_job(
Expand Down Expand Up @@ -294,6 +289,7 @@ def _track_and_kill_runs(self):
if task_run.get_is_completed():
self.supervisor.shutdown_job(tracked_run.job)
tracked_run.architect.shutdown()
tracked_run.task_launcher.shutdown()
del self._task_runs_tracked[task_run.db_id]
time.sleep(2)

Expand All @@ -302,6 +298,7 @@ def shutdown(self, skip_input=True):
self.is_shutdown = True
for tracked_run in self._task_runs_tracked.values():
logger.info("expiring units")
tracked_run.task_launcher.shutdown()
tracked_run.task_launcher.expire_units()
try:
remaining_runs = self._task_runs_tracked.values()
Expand Down
139 changes: 95 additions & 44 deletions mephisto/core/task_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@
AssignmentState,
)

from typing import Dict, Optional, List, Any, TYPE_CHECKING

from typing import Dict, Optional, List, Any, TYPE_CHECKING, Iterator
import os
import time
import enum

if TYPE_CHECKING:
from mephisto.data_model.task import TaskRun
from mephisto.data_model.database import MephistoDB

import threading
from mephisto.core.logger_core import get_logger
import types

logger = get_logger(name=__name__, verbose=True, level="info")
logger = get_logger(name=__name__, verbose=True, level="debug")

UNIT_GENERATOR_WAIT_SECONDS = 10
ASSIGNMENT_GENERATOR_WAIT_SECONDS = 0.5


class GeneratorType(enum.Enum):
NONE = 0
UNIT = 1
ASSIGNMENT = 2


class TaskLauncher:
Expand All @@ -45,63 +53,91 @@ def __init__(
self,
db: "MephistoDB",
task_run: "TaskRun",
assignment_data_list: List[InitializationData],
assignment_data_iterator: Iterator[InitializationData],
max_num_concurrent_units: int = 0,
):
"""Prepare the task launcher to get it ready to launch the assignments"""
self.db = db
self.task_run = task_run
self.assignment_data_list = assignment_data_list
self.assignment_data_iterable = assignment_data_iterator
self.assignments: List[Assignment] = []
self.units: List[Unit] = []
self.provider_type = task_run.get_provider().PROVIDER_TYPE
self.max_num_concurrent_units = max_num_concurrent_units
self.launched_units: Dict[str, Unit] = {}
self.unlaunched_units: Dict[str, Unit] = {}
self.keep_launching: bool = False
self.keep_launching_units: bool = False
self.finished_generators: bool = False
self.unlaunched_units_access_condition = threading.Condition()
if isinstance(self.assignment_data_iterable, types.GeneratorType):
self.generator_type = GeneratorType.ASSIGNMENT
else:
self.generator_type = GeneratorType.NONE
run_dir = task_run.get_run_dir()
os.makedirs(run_dir, exist_ok=True)

def create_assignments(self) -> None:
"""
Create an assignment and associated units for any data
currently in the assignment config
"""
logger.debug(f"type of assignment data: {type(self.assignment_data_iterable)}")
self.units_thread = None
self.assignments_thread = None

def _create_single_assignment(self, assignment_data) -> None:
""" Create a single assignment in the database using its read assignment_data """
task_run = self.task_run
task_config = task_run.get_task_config()
for data in self.assignment_data_list:
assignment_id = self.db.new_assignment(
assignment_id = self.db.new_assignment(
task_run.task_id,
task_run.db_id,
task_run.requester_id,
task_run.task_type,
task_run.provider_type,
task_run.sandbox,
)
assignment = Assignment(self.db, assignment_id)
assignment.write_assignment_data(assignment_data)
self.assignments.append(assignment)
unit_count = len(assignment_data["unit_data"])
for unit_idx in range(unit_count):
unit_id = self.db.new_unit(
task_run.task_id,
task_run.db_id,
task_run.requester_id,
task_run.task_type,
assignment_id,
unit_idx,
task_config.task_reward,
task_run.provider_type,
task_run.task_type,
task_run.sandbox,
)
assignment = Assignment(self.db, assignment_id)
assignment.write_assignment_data(data)
self.assignments.append(assignment)
unit_count = len(data["unit_data"])
for unit_idx in range(unit_count):
unit_id = self.db.new_unit(
task_run.task_id,
task_run.db_id,
task_run.requester_id,
assignment_id,
unit_idx,
task_config.task_reward,
task_run.provider_type,
task_run.task_type,
task_run.sandbox,
)
self.units.append(Unit(self.db, unit_id))
self.units.append(Unit(self.db, unit_id))
with self.unlaunched_units_access_condition:
self.unlaunched_units[unit_id] = Unit(self.db, unit_id)
self.keep_launching = True

def _try_generating_assignments(self) -> None:
""" Try to generate more assignments from the assignments_data_iterator"""
while not self.finished_generators:
try:
data = next(self.assignment_data_iterable)
self._create_single_assignment(data)
except StopIteration:
self.finished_generators = True
time.sleep(ASSIGNMENT_GENERATOR_WAIT_SECONDS)

def create_assignments(self) -> None:
""" Create an assignment and associated units for the generated assignment data """
self.keep_launching_units = True
if self.generator_type == GeneratorType.NONE:
for data in self.assignment_data_iterable:
self._create_single_assignment(data)
else:
self.assignments_thread = threading.Thread(
target=self._try_generating_assignments, args=()
)
self.assignments_thread.start()

def generate_units(self):
""" units generator which checks that only 'max_num_concurrent_units' running at the same time,
i.e. in the LAUNCHED or ASSIGNED states """
while self.keep_launching:
while self.keep_launching_units:
units_id_to_remove = []
for db_id, unit in self.launched_units.items():
status = unit.get_status()
Expand All @@ -114,6 +150,12 @@ def generate_units(self):
self.launched_units.pop(db_id)

num_avail_units = self.max_num_concurrent_units - len(self.launched_units)
num_avail_units = (
len(self.unlaunched_units)
if self.max_num_concurrent_units == 0
else num_avail_units
)

units_id_to_remove = []
for i, item in enumerate(self.unlaunched_units.items()):
db_id, unit = item
Expand All @@ -123,30 +165,33 @@ def generate_units(self):
yield unit
else:
break
for db_id in units_id_to_remove:
self.unlaunched_units.pop(db_id)
with self.unlaunched_units_access_condition:
for db_id in units_id_to_remove:
self.unlaunched_units.pop(db_id)

time.sleep(UNIT_GENERATOR_WAIT_SECONDS)
if not self.unlaunched_units:
break

def _launch_limited_units(self, url: str) -> None:
""" use units' generator to launch limited number of units according to (max_num_concurrent_units)"""
for unit in self.generate_units():
unit.launch(url)
while not self.finished_generators:
for unit in self.generate_units():
unit.launch(url)
if self.generator_type == GeneratorType.NONE:
break

def launch_units(self, url: str) -> None:
"""launch any units registered by this TaskLauncher"""
if self.max_num_concurrent_units > 0:
thread = threading.Thread(target=self._launch_limited_units, args=(url,))
thread.start()
else:
for db_id, unit in self.unlaunched_units.items():
unit.launch(url)
self.units_thread = threading.Thread(
target=self._launch_limited_units, args=(url,)
)
self.units_thread.start()

def expire_units(self) -> None:
"""Clean up all units on this TaskLauncher"""
self.keep_launching = False
self.keep_launching_units = False
self.finished_generators = True
for unit in self.units:
try:
unit.expire()
Expand All @@ -155,3 +200,9 @@ def expire_units(self) -> None:
f"Warning: failed to expire unit {unit.db_id}. Stated error: {e}",
exc_info=True,
)

def shutdown(self) -> None:
"""Clean up running threads for generating assignments and units"""
if self.assignments_thread is not None:
self.assignments_thread.join()
self.units_thread.join()
11 changes: 11 additions & 0 deletions mephisto/data_model/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,17 @@ def add_args_to_group(cls, group: "ArgumentGroup") -> None:
builder_group = group.add_argument_group("task_builder_args")
cls.TaskRunnerClass.add_args_to_group(runner_group)
cls.TaskBuilderClass.add_args_to_group(builder_group)

group.add_argument(
"--block-qualification",
dest="block_qualification",
help=(
"Name of qualification to use in order to soft block workers "
"from working on this task (or any task using this block "
"qualification name)."
),
required=False,
)
# group.description = 'For `Blueprint`, you can supply...'
# group.add_argument('--task-option', help='Lets you customize')
return
Expand Down
2 changes: 2 additions & 0 deletions mephisto/providers/mturk/mturk_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def setup_resources_for_task_run(
"QualificationTypeId"
] = requester._create_new_mturk_qualification(qualification_name)

qualifications += task_args.get("mturk_specific_qualifications", [])

# Set up HIT type
client = self._get_client(requester._requester_name)
hit_type_id = create_hit_type(client, task_config, qualifications)
Expand Down
2 changes: 1 addition & 1 deletion mephisto/providers/mturk/mturk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def create_hit_type(
"QualificationTypeId": MTURK_LOCALE_REQUIREMENT,
"Comparator": "In",
"LocaleValues": allowed_locales,
"RequiredToPreview": True,
"ActionsGuarded": "DiscoverPreviewAndAccept",
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, task_run: "TaskRun", opts: Any):
jsonl_file = os.path.expanduser(opts["data_jsonl"])
with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
line = jsonl_fp.readline()
while (line):
while line:
j = json.loads(line)
self._initialization_data_dicts.append(j)
line = jsonl_fp.readline()
Expand Down
6 changes: 3 additions & 3 deletions mephisto/server/blueprints/acute_eval/acute_eval_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,19 +303,19 @@ def get_new_task_data(self, worker_id: str) -> List[PairingsDict]:
tasks_per_unit = self.opts["subtasks_per_unit"]
# first add onboarding tasks
task_data = self.get_onboarding_tasks(worker_id)
logger.debug("Onboarding task data gotten: ", len(task_data))
logger.debug(f"Onboarding task data gotten: {len(task_data)}")
if len(task_data) == tasks_per_unit:
return task_data

# poll the task queue for more tasks
task_data = self._poll_task_queue(worker_id, task_data)
logger.debug("Task queue data gotten: ", len(task_data))
logger.debug(f"Task queue data gotten: {len(task_data)}")
if len(task_data) == tasks_per_unit:
return task_data

# top up the task_data if we don't hit the desired tasks_per_unit
task_data = self._top_up_task_data(worker_id, task_data)
logger.debug("Topped off data gotten: ", len(task_data))
logger.debug(f"Topped off data gotten: {len(task_data)}")
return task_data

def requeue_task_data(self, worker_id: str, task_data: List[PairingsDict]):
Expand Down
6 changes: 2 additions & 4 deletions test/core/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from mephisto.core.operator import Operator
from mephisto.server.architects.mock_architect import MockArchitect

TIMEOUT_TIME = 10


class TestOperator(unittest.TestCase):
"""
Expand Down Expand Up @@ -123,7 +125,6 @@ def test_run_job_concurrent(self):

# Give up to 5 seconds for whole mock task to complete
start_time = time.time()
TIMEOUT_TIME = 3
while time.time() - start_time < TIMEOUT_TIME:
if len(self.operator.get_running_task_runs()) == 0:
break
Expand Down Expand Up @@ -195,7 +196,6 @@ def test_run_job_not_concurrent(self):

# Give up to 5 seconds for both tasks to complete
start_time = time.time()
TIMEOUT_TIME = 3
while time.time() - start_time < TIMEOUT_TIME:
if len(self.operator.get_running_task_runs()) == 0:
break
Expand Down Expand Up @@ -328,7 +328,6 @@ def test_run_jobs_with_restrictions(self):

# Give up to 5 seconds for whole mock task to complete
start_time = time.time()
TIMEOUT_TIME = 3
while time.time() - start_time < TIMEOUT_TIME:
if len(self.operator.get_running_task_runs()) == 0:
break
Expand Down Expand Up @@ -396,7 +395,6 @@ def test_run_jobs_with_restrictions(self):

# Ensure the task run completed and that all assignments are done
start_time = time.time()
TIMEOUT_TIME = 3
while time.time() - start_time < TIMEOUT_TIME:
if len(self.operator.get_running_task_runs()) == 0:
break
Expand Down
Loading