Skip to content

Commit

Permalink
Replace usage of arrow datetime objects in favor of pure datetime ones
Browse files Browse the repository at this point in the history
Note that the humanize library is now used in the cli pretty printing
function (in place of the arrow hiumanize feature).

As a result, displayed output from some cli commands may slightly differ.

Closes T2835.
  • Loading branch information
douardda committed Dec 3, 2020
1 parent 1b390a7 commit 3c87075
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 85 deletions.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ ignore_missing_imports = True
[mypy-elasticsearch.*]
ignore_missing_imports = True

[mypy-humanize.*]
ignore_missing_imports = True

[mypy-kombu.*]
ignore_missing_imports = True

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html

arrow
attrs
attrs-strict
celery >= 4.3
Click
elasticsearch > 5.4
flask
humanize
pika >= 1.1.0
psycopg2
pyyaml
Expand Down
8 changes: 1 addition & 7 deletions swh/scheduler/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from uuid import UUID

from arrow import Arrow, utcnow
import attr
from psycopg2.extensions import AsIs
import psycopg2.extras
import psycopg2.pool

from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
from swh.scheduler.utils import utcnow

from .exc import StaleData
from .model import (
Expand All @@ -28,12 +27,7 @@
logger = logging.getLogger(__name__)


def adapt_arrow(arrow):
return AsIs("'%s'::timestamptz" % arrow.isoformat())


psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
psycopg2.extras.register_uuid()


Expand Down
2 changes: 1 addition & 1 deletion swh/scheduler/celery_backend/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import sys
import time

from arrow import utcnow
import celery
from celery.events import EventReceiver
import click
from kombu import Queue

from swh.core.statsd import statsd
from swh.scheduler.utils import utcnow


class ReliableEventReceiver(EventReceiver):
Expand Down
6 changes: 1 addition & 5 deletions swh/scheduler/celery_backend/pika_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
import json
import logging
import sys
Expand All @@ -12,14 +11,11 @@

from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
from swh.scheduler.utils import utcnow

logger = logging.getLogger(__name__)


def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)


def get_listener(broker_url, queue_name, scheduler_backend):
connection = pika.BlockingConnection(pika.URLParameters(broker_url))
channel = connection.channel()
Expand Down
7 changes: 4 additions & 3 deletions swh/scheduler/celery_backend/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

import logging

import arrow
from kombu.utils.uuid import uuid

from swh.core.statsd import statsd
from swh.scheduler import compute_nb_tasks_from, get_scheduler
from swh.scheduler.utils import utcnow

logger = logging.getLogger(__name__)

# Max batch size for tasks
MAX_NUM_TASKS = 10000

Expand All @@ -29,7 +30,7 @@ def run_ready_tasks(backend, app):
{
'task': the scheduler's task id,
'backend_id': Celery's task id,
'scheduler': arrow.utcnow()
'scheduler': utcnow()
}
The result can be used to block-wait for the tasks' results::
Expand Down Expand Up @@ -98,7 +99,7 @@ def run_ready_tasks(backend, app):
data = {
"task": task["id"],
"backend_id": backend_id,
"scheduled": arrow.utcnow(),
"scheduled": utcnow(),
}

backend_tasks.append(data)
Expand Down
55 changes: 19 additions & 36 deletions swh/scheduler/cli/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,18 @@


locale.setlocale(locale.LC_ALL, "")
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0]


class DateTimeType(click.ParamType):
name = "time and date"

def convert(self, value, param, ctx):
import arrow

if not isinstance(value, arrow.Arrow):
value = arrow.get(value)

return value


DATETIME = DateTimeType()
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
DATETIME = click.DateTime()


def format_dict(d):
"""Recursively format date objects in the dict passed as argument"""
import datetime

import arrow

ret = {}
for k, v in d.items():
if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)):
v = arrow.get(v).format()
if isinstance(v, (datetime.date, datetime.datetime)):
v = v.isoformat()
elif isinstance(v, dict):
v = format_dict(v)
ret[k] = v
Expand Down Expand Up @@ -96,7 +80,7 @@ def pretty_print_task(task, full=False):
... }
>>> print(click.unstyle(pretty_print_task(task)))
Task 1234
Next run: ... (2019-02-21 13:52:35+00:00)
Next run: ... (2019-02-21T13:52:35.407818)
Interval: 1:00:00
Type: test_task
Policy: oneshot
Expand All @@ -110,7 +94,7 @@ def pretty_print_task(task, full=False):
<BLANKLINE>
>>> print(click.unstyle(pretty_print_task(task, full=True)))
Task 1234
Next run: ... (2019-02-21 13:52:35+00:00)
Next run: ... (2019-02-21T13:52:35.407818)
Interval: 1:00:00
Type: test_task
Policy: oneshot
Expand All @@ -125,13 +109,13 @@ def pretty_print_task(task, full=False):
key2: 42
<BLANKLINE>
"""
import arrow
import humanize

next_run = arrow.get(task["next_run"])
next_run = task["next_run"]
lines = [
"%s %s\n" % (click.style("Task", bold=True), task["id"]),
click.style(" Next run: ", bold=True),
"%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()),
"%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
"\n",
click.style(" Interval: ", bold=True),
str(task["current_interval"]),
Expand Down Expand Up @@ -213,10 +197,10 @@ def schedule_tasks(ctx, columns, delimiter, file):
import csv
import json

import arrow
from swh.scheduler.utils import utcnow

tasks = []
now = arrow.utcnow()
now = utcnow()
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
Expand All @@ -230,7 +214,7 @@ def schedule_tasks(ctx, columns, delimiter, file):
"args": args,
"kwargs": kwargs,
}
task["next_run"] = DATETIME.convert(task.get("next_run", now), None, None)
task["next_run"] = task.get("next_run", now)
tasks.append(task)

created = scheduler.create_tasks(tasks)
Expand Down Expand Up @@ -273,23 +257,23 @@ def schedule_task(ctx, type, options, policy, priority, next_run):
Note: if the priority is not given, the task won't have the priority set,
which is considered as the lowest priority level.
"""
import arrow
from swh.scheduler.utils import utcnow

from .utils import parse_options

scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")

now = arrow.utcnow()
now = utcnow()

(args, kw) = parse_options(options)
task = {
"type": type,
"policy": policy,
"priority": priority,
"arguments": {"args": args, "kwargs": kw,},
"next_run": DATETIME.convert(next_run or now, None, None),
"next_run": next_run or now,
}
created = scheduler.create_tasks([task])

Expand Down Expand Up @@ -587,13 +571,13 @@ def respawn_tasks(ctx, task_ids, next_run):
swh-scheduler task respawn 1 3 12
"""
import arrow
from swh.scheduler.utils import utcnow

scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
if next_run is None:
next_run = arrow.utcnow()
next_run = utcnow()
output = []

scheduler.set_status_tasks(
Expand Down Expand Up @@ -678,10 +662,9 @@ def archive_tasks(
"""
from itertools import groupby

import arrow

from swh.core.utils import grouper
from swh.scheduler.backend_es import ElasticSearchBackend
from swh.scheduler.utils import utcnow

config = ctx.obj["config"]
scheduler = ctx.obj["scheduler"]
Expand All @@ -699,7 +682,7 @@ def archive_tasks(
logger.info("**NO CLEANUP**")

es_storage = ElasticSearchBackend(**config)
now = arrow.utcnow()
now = utcnow()

# Default to archive tasks from a rolling month starting the week
# prior to the current one
Expand Down
4 changes: 2 additions & 2 deletions swh/scheduler/tests/es/test_cli_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import random
import uuid

import arrow
from click.testing import CliRunner
import pytest

from swh.scheduler.cli import cli
from swh.scheduler.utils import utcnow

from ..common import TASK_TYPES, TEMPLATES, tasks_from_template

Expand All @@ -28,7 +28,7 @@ def test_cli_archive_tasks(swh_sched, swh_sched_config_file):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)

next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1)
next_run_start = utcnow() - datetime.timedelta(days=1)

recurring = tasks_from_template(template_git, next_run_start, 100)
oneshots = tasks_from_template(
Expand Down

0 comments on commit 3c87075

Please sign in to comment.