Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging core #166

Merged
merged 8 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mephisto/client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


class Config:
ENV = "development"
6 changes: 6 additions & 0 deletions mephisto/core/local_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
from sqlite3 import Connection, Cursor
import threading

from mephisto.core.logger_core import get_logger

logger = get_logger(name=__name__, verbose=True, level="info")


def nonesafe_int(in_string: Optional[str]) -> Optional[int]:
"""Cast input to an int or None"""
Expand Down Expand Up @@ -257,6 +261,8 @@ def init_tables(self) -> None:
"""
# TODO(#93) maybe raise flag when the schema of existing tables isn't what we expect
# it to be?
# "How to know that schema changes?"
# logger.warning("some message")
JackUrb marked this conversation as resolved.
Show resolved Hide resolved
with self.table_access_condition:
conn = self._get_connection()
conn.execute("PRAGMA foreign_keys = 1")
Expand Down
49 changes: 49 additions & 0 deletions mephisto/core/logger_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

loggers = {}


def get_logger(
name: str, verbose: bool = True, log_file: str = None, level: str = "info"
) -> logging.Logger:
"""
Gets the logger corresponds to each module
Parameters:
name (string): the module name (__name__).
verbose (bool): INFO level activated if True.
log_file (string): path for saving logs locally.
level (string): logging level. Values options: [info, debug, warning, error, critical].

Returns:
logger (logging.Logger): the corresponding logger to the given module name.
"""

global loggers
if loggers.get(name):
return loggers.get(name)
else:
logger = logging.getLogger(name)

level_dict = {
"info": logging.INFO,
"debug": logging.DEBUG,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}

logger.setLevel(logging.INFO if verbose else logging.DEBUG)
logger.setLevel(level_dict[level.lower()])
if log_file is None:
handler = logging.StreamHandler()
else:
handler = logging.RotatingFileHandler(log_file)
formatter = logging.Formatter(
"[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)5s - %(message)s",
"%m-%d %H:%M:%S",
)

handler.setFormatter(formatter)
logger.addHandler(handler)
loggers[name] = logger
return logger
44 changes: 27 additions & 17 deletions mephisto/core/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
get_architect_from_type,
)

from mephisto.core.logger_core import get_logger

logger = get_logger(name=__name__, verbose=True, level="info")

if TYPE_CHECKING:
from mephisto.data_model.agent import Agent
Expand Down Expand Up @@ -172,18 +175,18 @@ def parse_and_launch_run(
# Find an existing task or create a new one
task_name = task_args.get("task_name")
if task_name is None:
# TODO warn that the task is being launched with the blueprint type
# as the task name
task_name = type_args.blueprint_type
logger.warning(
f"Task is using the default blueprint name {task_name} as a name, as no task_name is provided"
)
tasks = self.db.find_tasks(task_name=task_name)
task_id = None
if len(tasks) == 0:
task_id = self.db.new_task(task_name, type_args.blueprint_type)
else:
task_id = tasks[0].db_id

# TODO(#93) logging
print(f"Creating a task run under task name: {task_name}")
logger.info(f"Creating a task run under task name: {task_name}")

# Create a new task run
new_run_id = self.db.new_task_run(
Expand Down Expand Up @@ -254,12 +257,16 @@ def parse_and_launch_run(
if self.supervisor.sending_thread is None:
self.supervisor.launch_sending_thread()
except (KeyboardInterrupt, Exception) as e:
# TODO(#93) logging
print("Encountered error while launching run, shutting down")
logger.error(
"Encountered error while launching run, shutting down", exc_info=True
)
try:
architect.shutdown()
except (KeyboardInterrupt, Exception) as architect_exception:
print(f"Could not shut down architect: {architect_exception}")
logger.exception(
f"Could not shut down architect: {architect_exception}",
exc_info=True,
)
raise e

launcher = TaskLauncher(self.db, task_run, initialization_data_array)
Expand Down Expand Up @@ -291,10 +298,10 @@ def _track_and_kill_runs(self):
time.sleep(2)

def shutdown(self, skip_input=True):
print("operator shutting down") # TODO(#93) logger
logger.info("operator shutting down")
self.is_shutdown = True
for tracked_run in self._task_runs_tracked.values():
print("expring units") # TODO(#93) logger
logger.info("expiring units")
tracked_run.task_launcher.expire_units()
try:
remaining_runs = self._task_runs_tracked.values()
Expand All @@ -306,19 +313,20 @@ def shutdown(self, skip_input=True):
else:
next_runs.append(tracked_run)
if len(next_runs) > 0:
# TODO(#93) logger
print(
logger.info(
f"Waiting on {len(remaining_runs)} task runs, Ctrl-C ONCE to FORCE QUIT"
)
time.sleep(30)
remaining_runs = next_runs
except Exception as e:
logger.exception(
f"Encountered problem during shutting down {e}", exc_info=True
)
import traceback

traceback.print_exc()
except (KeyboardInterrupt, SystemExit) as e:
# TODO(#93) logger
print(
logger.info(
"Skipping waiting for outstanding task completions, shutting down servers now!"
)
for tracked_run in remaining_runs:
Expand All @@ -339,15 +347,15 @@ def parse_and_launch_run_wrapper(
try:
return self.parse_and_launch_run(arg_list=arg_list, extra_args=extra_args)
except (KeyboardInterrupt, Exception) as e:
# TODO(#93)
print("Ran into error while launching run: ")
logger.error("Ran into error while launching run: ", exc_info=True)
traceback.print_exc()
return None

def print_run_details(self):
"""Print details about running tasks"""
# TODO(#93) parse these tasks and get the full details
JackUrb marked this conversation as resolved.
Show resolved Hide resolved
print(f"Operator running {self.get_running_task_runs()}")
for task in self.get_running_task_runs():
logger.info(f"Operator running task ID = {task}")

def wait_for_runs_then_shutdown(
self, skip_input=False, log_rate: Optional[int] = None
Expand Down Expand Up @@ -385,6 +393,8 @@ def wait_for_runs_then_shutdown(

traceback.print_exc()
except (KeyboardInterrupt, SystemExit) as e:
print("Cleaning up after keyboard interrupt, please wait!")
logger.exception(
"Cleaning up after keyboard interrupt, please wait!", exc_info=True
)
finally:
self.shutdown()
36 changes: 16 additions & 20 deletions mephisto/core/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# LICENSE file in the root directory of this source tree.


import logging
import threading
from queue import PriorityQueue, Empty
import time
Expand Down Expand Up @@ -43,6 +42,10 @@
from mephisto.data_model.crowd_provider import CrowdProvider
from mephisto.data_model.architect import Architect

from mephisto.core.logger_core import get_logger

logger = get_logger(name=__name__, verbose=True, level="info")

# This class manages communications between the server
# and workers, ensures that their status is properly tracked,
# and also provides some helping utility functions for
Expand Down Expand Up @@ -110,7 +113,7 @@ def _on_channel_open(self, channel_id: str):

def _on_catastrophic_disconnect(self, channel_id):
# TODO(#102) Catastrophic disconnect needs to trigger cleanup
print(f"Channel {channel_id} called on_catastrophic_disconnect")
logger.error(f"Channel {channel_id} called on_catastrophic_disconnect")

def _on_channel_message(self, channel_id: str, packet: Packet):
"""Incoming message handler defers to the internal handler"""
Expand All @@ -119,10 +122,10 @@ def _on_channel_message(self, channel_id: str, packet: Packet):
self._on_message(packet, channel_info)
except Exception as e:
# TODO(#93) better error handling about failed messages
import traceback

traceback.print_exc()
print(repr(e))
logger.exception(
f"Channel {channel_id} encountered error on packet {packet}",
exc_info=True,
)
raise

def register_job(
Expand Down Expand Up @@ -199,7 +202,7 @@ def shutdown(self):
self.sending_thread.join()

def _send_alive(self, channel_info: ChannelInfo) -> bool:
print("sending alive")
logger.info("Sending alive")
return channel_info.channel.send(
Packet(
packet_type=PACKET_TYPE_ALIVE,
Expand Down Expand Up @@ -337,10 +340,7 @@ def _launch_and_run_assignment(
agent.act()
agent.mark_done()
except Exception as e:
import traceback

traceback.print_exc()
# TODO(#93) handle runtime exceptions for assignments
logger.exception(f"Cleaning up assignment: {e}", exc_info=True)
task_runner.cleanup_assignment(assignment)
finally:
task_run = task_runner.task_run
Expand All @@ -366,10 +366,7 @@ def _launch_and_run_unit(
agent.act()
agent.mark_done()
except Exception as e:
import traceback

traceback.print_exc()
# TODO(#93) handle runtime exceptions for assignments
logger.exception(f"Cleaning up unit: {e}", exc_info=True)
task_runner.cleanup_unit(unit)
finally:
task_runner.task_run.clear_reservation(unit)
Expand Down Expand Up @@ -650,7 +647,7 @@ def _try_send_agent_messages(self, agent_info: AgentInfo):
curr_obs = agent.pending_observations.pop(0)
did_send = channel_info.channel.send(curr_obs)
if not did_send:
print(f"Failed to send packet {curr_obs} to {channel_info}")
logger.error(f"Failed to send packet {curr_obs} to {channel_info}")
agent.pending_observations.insert(0, curr_obs)
return # something up with the channel, try later

Expand All @@ -661,7 +658,7 @@ def _send_message_queue(self) -> None:
channel = self.channels[curr_obs.receiver_id].channel
did_send = channel.send(curr_obs)
if not did_send:
print(
logger.error(
f"Failed to send packet {curr_obs} to server {curr_obs.receiver_id}"
)
self.message_queue.insert(0, curr_obs)
Expand Down Expand Up @@ -724,8 +721,7 @@ def _handle_updated_agent_status(self, status_map: Dict[str, str]):
"""
for agent_id, status in status_map.items():
if status not in AgentState.valid():
# TODO(#93) update with logging
print(f"Invalid status for agent {agent_id}: {status}")
logger.warning(f"Invalid status for agent {agent_id}: {status}")
continue
if agent_id not in self.agents:
# no longer tracking agent
Expand All @@ -740,7 +736,7 @@ def _handle_updated_agent_status(self, status_map: Dict[str, str]):
continue
if status != db_status:
if db_status in AgentState.complete():
print(
logger.info(
f"Got updated status {status} when already final: {agent.db_status}"
)
continue
Expand Down
10 changes: 8 additions & 2 deletions mephisto/core/task_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from mephisto.data_model.task import TaskRun
from mephisto.data_model.database import MephistoDB

from mephisto.core.logger_core import get_logger

logger = get_logger(name=__name__, verbose=True, level="info")


class TaskLauncher:
"""
Expand Down Expand Up @@ -90,5 +94,7 @@ def expire_units(self) -> None:
try:
unit.expire()
except Exception as e:
# TODO(#93) logger
print(f"Warning: failed to expire unit {unit.db_id}. Stated error: {e}")
logger.exception(
f"Warning: failed to expire unit {unit.db_id}. Stated error: {e}",
exc_info=True,
)
22 changes: 12 additions & 10 deletions mephisto/data_model/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from mephisto.data_model.blueprint import AgentState
from mephisto.core.registry import get_crowd_provider_from_type
from typing import Any, List, Optional, Mapping, Tuple, Dict, Type, Tuple, TYPE_CHECKING
from mephisto.core.logger_core import get_logger

logger = get_logger(name=__name__, verbose=True, level="info")


if TYPE_CHECKING:
from mephisto.data_model.database import MephistoDB
Expand Down Expand Up @@ -147,11 +151,10 @@ def revoke_qualification(self, qualification_name) -> bool:
self.revoke_crowd_qualification(qualification_name)
return True
except Exception as e:
# TODO(#93) logging
import traceback

traceback.print_exc()
print(f"Found error while trying to revoke qualification: {repr(e)}")
logger.exception(
f"Found error while trying to revoke qualification: {repr(e)}",
exc_info=True,
)
return False
return True

Expand All @@ -176,11 +179,10 @@ def grant_qualification(
self.grant_crowd_qualification(qualification_name, value)
return True
except Exception as e:
# TODO(#93) logging
import traceback

traceback.print_exc()
print(f"Found error while trying to grant qualification: {repr(e)}")
logger.exception(
f"Found error while trying to grant qualification: {repr(e)}",
exc_info=True,
)
return False

# Children classes can implement the following methods
Expand Down
8 changes: 8 additions & 0 deletions mephisto/providers/mock/mock_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
from mephisto.data_model.assignment import Assignment
from mephisto.providers.mock.mock_datastore import MockDatastore

from mephisto.core.logger_core import get_logger

logger = get_logger(name=__name__, verbose=True, level="info")


class MockUnit(Unit):
"""
Expand Down Expand Up @@ -43,6 +47,10 @@ def launch(self, task_url: str) -> None:
f"Mock task launched: localhost:{port} for preview, "
f"localhost:{port}/?worker_id=x&assignment_id={self.db_id} for task"
)
logger.info(
salelkafrawy marked this conversation as resolved.
Show resolved Hide resolved
f"Mock task launched: localhost:{port} for preview, "
f"localhost:{port}/?worker_id=x&assignment_id={self.db_id} for task"
)

return None

Expand Down
Loading