Skip to content

Commit

Permalink
Merge pull request #3 from n1k0/skip
Browse files Browse the repository at this point in the history
Add history and skip support, thanks @n1k0
  • Loading branch information
Lothiraldan committed Oct 6, 2012
2 parents 7d6f4b6 + 8acfbb7 commit 768ff39
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 47 deletions.
13 changes: 13 additions & 0 deletions README.rst
Expand Up @@ -41,6 +41,19 @@ If you try to get another task, you will get the same one.
$> onetask get
Empty task list.

Sometimes you may want to skip this horrible task you *know* you can't do right now, but want to keep in the list:

$> onetask skip
Switched from "task2" to "task3", good luck.

Last, you probably feel like contemplating accomplished work:

$> onetask history

That can give some nice output like the one below:

![](http://cl.ly/image/3Y3c0w071y14/Capture%20d%E2%80%99%C3%A9cran%202012-10-06%20%C3%A0%2022.38.36.png)

Tests
-----

Expand Down
83 changes: 60 additions & 23 deletions onetask/collection.py
@@ -1,16 +1,21 @@
import json
import os
import time

from collections import deque
import utils

from collections import deque, namedtuple
from random import shuffle


class TaskError(RuntimeError):
pass


class TaskCollection(deque):
class TaskCollection(object):
"""Tasks collection object."""
tasks = deque()
archive = deque()

def __init__(self, db_path, stdout=None):
assert os.path.exists(db_path)
Expand All @@ -20,11 +25,12 @@ def __init__(self, db_path, stdout=None):
self.stdout = stdout
# load tasks
try:
tasks_list = json.load(open(self.db_path, 'r'))
tasks_data = json.load(open(self.db_path, 'r'))
except (IOError, ValueError,), err:
raise TaskError(u"Unable to load tasks from db %s: %s"
% (self.db_path, err,))
super(TaskCollection, self).__init__(tasks_list)
self.tasks = deque(tasks_data.get('tasks', []))
self.archive = deque(tasks_data.get('archive', []))

@classmethod
def load(cls, db_path, **kwargs):
Expand All @@ -47,57 +53,88 @@ def create_db(cls, db_path):
assert not os.path.exists(db_path)
try:
db_file = open(db_path, 'w')
db_file.write('[]')
db_file.write(json.dumps(dict(tasks=[], archive=[]), indent=4))
db_file.close()
except IOError, err:
raise TaskError(u"Unable to create tasks database at %s: %s"
% (db_path, err))

def add(self, task):
def add(self, title, created=None):
"Adds a new task to the collection while keeping current active one."
if task in self:
raise TaskError(u'Task "%s" already exists.' % task)
if len(self) > 0:
if title in [t['title'] for t in self.tasks]:
raise TaskError(u'Task "%s" already exists.' % title)
task = dict(title=title, created=created or time.time())
if len(self.tasks) > 0:
# pop current active task
active = self.popleft()
active = self.tasks.popleft()
# shuffle rest
shuffle(self)
shuffle(self.tasks)
# add new task
self.append(task)
self.tasks.append(task)
# restore active tasks
self.appendleft(active)
self.tasks.appendleft(active)
else:
self.append(task)
self.tasks.append(task)
self.update_db()
self.notify(u'Task "%s" added' % task)
self.notify(u'Task "%s" added' % title)

def done(self):
def done(self, closed=None):
"Marks current active task as done."
try:
task = self.popleft()
task = self.tasks.popleft()
except IndexError:
raise TaskError(u"Empty task list.")
shuffle(self)
shuffle(self.tasks)
task['closed'] = closed or time.time()
task['duration'] = task['closed'] - task['created']
self.archive.appendleft(task)
self.update_db()
self.notify(u'Task "%s" marked as done' % task)
self.notify(u'Task "%s" marked as done. Completion occured in %s.'
% (task['title'], utils.format_duration(task['duration']),))

def get(self):
"Retrieves current active task."
if len(self) == 0:
if len(self.tasks) == 0:
raise TaskError(u"No tasks.")
self.notify(self[0])
return self[0]
title = self.tasks[0]['title']
self.notify(title)
return title

def history(self):
"Generates a tasks completion report."
Row = namedtuple('row', ['Created', 'Closed', 'Duration', 'Task'])
rows = []
for task in self.archive:
rows.append(Row(Created=utils.format_timestamp(task['created']),
Closed=utils.format_timestamp(task['closed']),
Duration=utils.format_duration(task['duration']),
Task=task['title']))
self.notify(utils.pprinttable(rows))

def notify(self, message):
"Writes a message to stdout interface, if any has been provided."
if self.stdout is not None:
self.stdout.write("%s\n" % message)

def skip(self):
"Skips current active task and pull another one."
if len(self.tasks) == 0:
raise TaskError(u"No active task.")
if len(self.tasks) == 1:
raise TaskError(u"Only one task is available. Go shopping.")
old = self.tasks[0]['title']
new = list(self.tasks)[1:2][0]['title']
self.tasks.rotate(-1)
self.update_db()
self.notify(u'Switched from "%s" to "%s", good luck.' % (old, new))

def update_db(self):
"Updates the task db with current data."
try:
db_file = open(self.db_path, 'w')
db_file.write(json.dumps(list(self)))
db_file.write(json.dumps(dict(tasks=list(self.tasks),
archive=list(self.archive)),
indent=4))
db_file.close()
except IOError, err:
raise TaskError(u"Unable to save tasks database, sorry: %s" % err)
10 changes: 8 additions & 2 deletions onetask/onetask
Expand Up @@ -5,8 +5,8 @@ import sys
import argparse
import unittest

from onetask.collection import TaskCollection, TaskError
from onetask.tests import TaskCollectionTest
from collection import TaskCollection, TaskError
from tests import TaskCollectionTest


TASKS_DATABASE = os.path.expanduser('~/.OneTask.json')
Expand All @@ -28,9 +28,15 @@ def get_args(args_list):
get_subparser = subparsers.add_parser('get',
help='display currently active task')
get_subparser.set_defaults(func=tasks.get)
history_subparser = subparsers.add_parser('history',
help='display tasks completion history')
history_subparser.set_defaults(func=tasks.history)
done_subparser = subparsers.add_parser('done',
help='mark active task as done')
done_subparser.set_defaults(func=tasks.done)
skip_subparser = subparsers.add_parser('skip',
help='skip current active task')
skip_subparser.set_defaults(func=tasks.skip)
test_subparser = subparsers.add_parser('test',
help='launch test suite')
test_subparser.set_defaults(func=test)
Expand Down
66 changes: 44 additions & 22 deletions onetask/tests.py
Expand Up @@ -8,51 +8,73 @@


class TaskCollectionTest(unittest.TestCase):
def _load(self, tasks):
def _load(self, **kwargs):
temp = tempfile.NamedTemporaryFile(prefix='onetasktest', suffix='.json')
temp.write(json.dumps(tasks))
temp.write(json.dumps(dict(**kwargs)))
temp.read()
return TaskCollection.load(temp.name)

def test_load(self):
tasks = self._load(["task1", "task2"])
self.assertEquals(len(tasks), 2)
self.assertEquals(tasks[0], 'task1')
self.assertEquals(tasks[1], 'task2')
tasks = self._load(tasks=[{"title": "task1"}, {"title": "task2"}])
self.assertEquals(len(tasks.tasks), 2)
self.assertEquals(tasks.tasks[0]['title'], 'task1')
self.assertEquals(tasks.tasks[1]['title'], 'task2')

def test_add(self):
tasks = self._load([])
tasks = self._load(tasks=[])
tasks.add('task1')
self.assertEquals(len(tasks), 1)
self.assertEquals(tasks[0], 'task1')
self.assertEquals(len(tasks.tasks), 1)
self.assertEquals(tasks.tasks[0]['title'], 'task1')
tasks.add('task2')
self.assertEquals(len(tasks), 2)
self.assertEquals(tasks[0], 'task1')
self.assertEquals(len(tasks.tasks), 2)
self.assertEquals(tasks.tasks[0]['title'], 'task1')
tasks.add('task3')
self.assertEquals(len(tasks), 3)
self.assertEquals(tasks[0], 'task1')
self.assertEquals(len(tasks.tasks), 3)
self.assertEquals(tasks.tasks[0]['title'], 'task1')

def test_get(self):
tasks = self._load(["task1"])
tasks = self._load(tasks=[{"title": "task1", "created": 1000}])
for x in xrange(2, 100):
tasks.add('task%d' % x)
self.assertEqual(len(tasks), x)
self.assertEqual(len(tasks.tasks), x)
self.assertEquals(tasks.get(), 'task1')
tasks.done()
self.assertEqual(len(tasks), x - 1)
tasks.done(closed=3000)
self.assertEqual(len(tasks.tasks), x - 1)
self.assertNotEquals(tasks.get(), 'task1')
self.assertEquals(tasks.archive[0]['title'], 'task1')
self.assertEquals(tasks.archive[0]['duration'], 2000)

def test_done(self):
tasks = self._load([])
tasks = self._load(tasks=[])
tasks.add('task1')
self.assertEquals(tasks.get(), 'task1')
self.assertEquals(len(tasks), 1)
self.assertEquals(len(tasks.tasks), 1)
tasks.add('task2')
self.assertEquals(tasks.get(), 'task1')
self.assertEquals(len(tasks), 2)
self.assertEquals(len(tasks.tasks), 2)
self.assertEquals(len(tasks.archive), 0)
tasks.done()
self.assertEquals(len(tasks.tasks), 1)
self.assertEquals(tasks.tasks[0]['title'], 'task2')
self.assertEquals(len(tasks.archive), 1)
self.assertEquals(tasks.archive[0]['title'], 'task1')
tasks.done()
self.assertEquals(len(tasks), 1)
self.assertEquals(tasks[0], 'task2')
self.assertEquals(len(tasks.tasks), 0)
self.assertEquals(len(tasks.archive), 2)
self.assertEquals(tasks.archive[0]['title'], 'task2')
self.assertEquals(tasks.archive[1]['title'], 'task1')

def test_skip(self):
tasks = self._load(tasks=[{"title": "task1"},
{"title": "task2"},
{"title": "task3"}])
self.assertEquals(tasks.get(), 'task1')
tasks.skip()
self.assertEquals(tasks.get(), 'task2')
tasks.skip()
self.assertEquals(tasks.get(), 'task3')
tasks.skip()
self.assertEquals(tasks.get(), 'task1')


if __name__ == '__main__':
Expand Down
64 changes: 64 additions & 0 deletions onetask/utils.py
@@ -0,0 +1,64 @@
import datetime


def format_timestamp(ts, date_format="%Y-%m-%d"):
"Formats a timestamp to a human readable date."
return datetime.datetime.fromtimestamp(ts).strftime(date_format)


def format_duration(seconds):
"Formats a duration expressed in seconds to a human readable string."
if seconds < 60:
return "%d seconds" % seconds
if seconds < 120:
return "about a minute"
if seconds < 3600:
return "%d minutes" % (seconds / 60)
if seconds < 7200:
return "about one hour"
if seconds < 86400:
return "%d hours" % (seconds / 3600)
days = seconds / 86400
if days <= 1:
return "about one day"
if days < 7:
return "%d days" % days
if days < 31:
return "%d weeks" % (days / 7)
if days < 365:
return "%d months" % (days / 30)
return "%d years" % (days / 365)


def pprinttable(rows):
"Returns an ascii table from data."
lines = []
if len(rows) > 1:
headers = rows[0]._fields
lens = []
for i in range(len(rows[0])):
lens.append(len(max([x[i] for x in rows] + [headers[i]],
key=lambda x: len(str(x)))))
formats = []
hformats = []
for i in range(len(rows[0])):
if isinstance(rows[0][i], int):
formats.append("%%%dd" % lens[i])
else:
formats.append("%%-%ds" % lens[i])
hformats.append("%%-%ds" % lens[i])
pattern = " | ".join(formats)
hpattern = " | ".join(hformats)
separator = "-+-".join(['-' * n for n in lens])
lines.append(separator)
lines.append(hpattern % tuple(headers))
lines.append(separator)
for line in rows:
lines.append(pattern % tuple(line))
lines.append(separator)
elif len(rows) == 1:
row = rows[0]
hwidth = len(max(row._fields, key=lambda x: len(x)))
for i in range(len(row)):
lines.append(u"%*s = %s" % (hwidth, row._fields[i], row[i]))
return "\n".join(lines)

0 comments on commit 768ff39

Please sign in to comment.