Skip to content
This repository was archived by the owner on Apr 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions pyworkflow/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
from __future__ import print_function

from subprocess import Popen
from StringIO import StringIO

import pyworkflow.utils as pwutils
from pyworkflow.utils.process import killWithChilds
from pyworkflow.tests import *
from pyworkflow.utils import utils, prettyDict
from pyworkflow.utils import ProgressBar


class TestBibtex(BaseTest):
""" Some minor tests to the bibtexparser library. """

@classmethod
def setUpClass(cls):
setupTestOutput(cls)


def test_Parsing(self):
bibtex = """

Expand Down Expand Up @@ -61,7 +63,7 @@ def test_Parsing(self):
prettyDict(utils.parseBibTex(bibtex))


def wait (condition, timeout=30):
def wait(condition, timeout=30):
""" Wait until "condition" returns False or return after timeout (seconds) param"""
t0 = time.time()

Expand Down Expand Up @@ -103,22 +105,25 @@ def test_getListFromRangeString(self):
s2 = s.replace(',', ' ')
self.assertEqual(o, pwutils.getListFromRangeString(s2))

class TessProgressBar(unittest.TestCase):

class TestProgressBar(unittest.TestCase):

def caller(self, total, step, fmt, resultGold):
ti = time.time()
progressBar = ProgressBar(total=total, fmt=fmt, objectId=33)
result = StringIO()
pb = ProgressBar(total=total, fmt=fmt, output=result,
extraArgs={'objectId': 33})

result=''
pb.start()
for i in xrange(total):
if i%step==0:
result += progressBar(); sys.stdout.flush()
progressBar.current += step
result += progressBar.done(); sys.stdout.flush()
self.assertEqual(resultGold, result)
tf = time.time()
print ("%d iterations in %f sec"%(total, tf - ti))
if i % step == 0:
pb.update(i+1)
pb.finish()

self.assertEqual(resultGold, result.getvalue())
result.close()
tf = time.time()
print ("%d iterations in %f sec" % (total, tf - ti))

def test_dot(self):
total = 1000000
Expand All @@ -128,7 +133,6 @@ def test_dot(self):
self.caller(total=total, step=step,
fmt=ProgressBar.DOT, resultGold=resultGold)


def test_default(self):
total = 3
step = 1
Expand Down
166 changes: 109 additions & 57 deletions pyworkflow/utils/progressbar.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,133 @@
# **************************************************************************
# *
# * Authors: Roberto Marabini (roberto@cnb.csic.es) [1]
# * J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [2]
# *
# * [1] Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# * [2] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

from __future__ import print_function
import sys
import re

###############################################
# progressbar
# EXAMPLE:
#
# from time import sleep

# progress = ProgressBar(1000, fmt=ProgressBar.FULL)
# for x in xrange(progress.total):
# progress.current += 1
# print progress()
# sleep(0.1)
# print progress.done()


class ProgressBar(object):
"""Text progress bar class for Python.
""" Text progress bar class for Python.

A text progress bar is typically used to display the progress of a long
running operation, providing a visual cue that processing is underway.
"""

Example:
N = 1000
pb = ProgressBar(N, fmt=ProgressBar.FULL)
pb.start()
for x in xrange(N):
pb.update(x+1)
sleep(0.1)
pb.finish()
"""
DEFAULT = 'Progress: %(bar)s %(percent)3d%%'
FULL = '%(bar)s %(current)d/%(total)d (%(percent)3d%%) %(remaining)d to go'
OBJID = '%(bar)s %(current)d/%(total)d (%(percent)3d%%) (objectId=%(objectId)d)'
DOT = '.'

def __init__(self, total, width=40, fmt=DEFAULT, symbol='=',
output=sys.stderr, objectId=None):
# total = maximum number of character to be written per line. Usually the current terminal width
# width = progress bar width (without the percentange and number of iterations loop)
# predefined format string, so far DEFAULT, FULL, OBJID and DOT are defined.
# symbol: progress bar is made with this symbol
output=None, extraArgs=None):
"""
Create a new ProgressBar object.
:param total: The total amount that will be running the progress bar.
The value in the update() method can no go greater than this value.
:param width: progress bar width (without the percentage and number of
iterations loop)
:param fmt: predefined format string, so far DEFAULT, FULL, OBJID and
DOT are defined.
:param symbol: progress bar is made with this symbol
:param output:
:param extraArgs: Additional arguments that can be passed to be used
the fmt format. (e.g extraArgs={'objectId': 1} for fmt=OBJID
"""
if len(symbol) != 1:
raise Exception("Symbol should be only 1 character length. ")

assert len(symbol) == 1
self._total = total
self._width = width
self._symbol = symbol
self._output = output or sys.stdout
self._current = -1
self._extraArgs = extraArgs or {}
self._fmt = fmt
self._directPrint = fmt == self.DOT

self.total = total
self.width = width
self.symbol = symbol
self.output = output
self.objectId = objectId

if fmt == self.DOT:
self.directPrint = True
self.fnt = self.DOT
else:
self.directPrint = False
if not self._directPrint:
# This line computes the number of digits
# in total and rewrites the fmt string
# so if total = 100, %d is converted to %3d
self.fmt = re.sub(r'(?P<name>%\(.+?\))d',
r'\g<name>%dd' % len(str(total)), fmt)
self.current = 0
self._fmt = re.sub(r'(?P<name>%\(.+?\))d',
r'\g<name>%dd' % len(str(total)), fmt)

def __getStr(self):
""" Internal function to return the current string value.
It should be called after the value has being set.
"""
if self._directPrint: # print just a dot
return self._fmt if self._current else ''

percent = self._current / float(self._total)
size = int(self._width * percent)
remaining = self._total - self._current
bar = '[' + self._symbol * size + ' ' * (self._width - size) + ']'

args = {
'total': self._total,
'bar': bar,
'current': self._current,
'percent': percent * 100,
'remaining': remaining,
}
args.update(self._extraArgs)

return '\r' + self._fmt % args

def __call__(self):
# print just a dot
if self.directPrint:
return self.fnt
else: # print percentages
percent = self.current / float(self.total)
size = int(self.width * percent)
remaining = self.total - self.current
bar = '[' + self.symbol * size + ' ' * (self.width - size) + ']'
def start(self):
""" Print empty progress bar. """
self.update(0)

args = {
'total': self.total,
'bar': bar,
'current': self.current,
'percent': percent * 100,
'remaining': remaining,
'objectId': self.objectId
}
return('\r' + self.fmt % args)
def update(self, value):
"""
Update the current value and print the progress.
:param value: New value, should be greater than the previous
value and less or equal the total value/
:return:
"""
if value < 0 or value <= self._current or value > self._total:
raise Exception("Incorrect value provided. It should be greater "
"than previous value and between 0 and total. ")
self._current = value
self._output.write(self.__getStr())
self._output.flush()

def done(self):
# print last update with 100% complete message
self.current = self.total
return(self())
def finish(self):
""" Finalize the progress and
print last update with 100% complete message """
if self._current < self._total:
self.update(self._total)