Skip to content

Commit

Permalink
Simplify rotation tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed Oct 28, 2017
1 parent 7148e8a commit 1c7e9c9
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 64 deletions.
4 changes: 2 additions & 2 deletions tests/test_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
('{module}', lambda r: r == 'test_formatters'),
('{thread}', lambda r: re.fullmatch(r'\d+', r)),
('{thread.id}', lambda r: re.fullmatch(r'\d+', r)),
('{thread.name}', lambda r: r != ""),
('{thread.name}', lambda r: isinstance(r, str) and r != ""),
('{process}', lambda r: re.fullmatch(r'\d+', r)),
('{process.id}', lambda r: re.fullmatch(r'\d+', r)),
('{process.name}', lambda r: r != ""),
('{process.name}', lambda r: isinstance(r, str) and r != ""),
('%s {message} %d', lambda r: r == '%s Message %d'),
('{{a}} {message} {{1}}', lambda r: r == '{a} Message {1}'),
('天 {message} 天', lambda r: r == '天 Message 天'),
Expand Down
146 changes: 84 additions & 62 deletions tests/test_rotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
import os
import re

def sort_files_by_num(test_log, directory):
files = [f for f in directory.listdir() if re.fullmatch(re.escape(test_log) + '\.\d+', f.basename)]
return sorted(files, key=lambda f: int(f.basename.split('.')[-1]))

@pytest.mark.parametrize('name, should_rename', [
('test.log', True),
Expand Down Expand Up @@ -116,33 +113,6 @@ def test_time_rotation(monkeypatch, tmpdir, when, hours, logger):
assert file_3.read() == m2 + '\n' + m3 + '\n'
assert file_4.read() == m1 + '\n'

@pytest.mark.parametrize('backups', [0, 1, 9, 10, 11, 50, None])
def test_backups_count(tmpdir, logger, backups):
log_count = 10
files_checked = set()

file = tmpdir.join('test.log')
logger.log_to(file.realpath(), rotation=10, backups=backups, format='{message}')
nb_backups = float('inf') if backups is None else backups

logger.debug("1")
for i in range(2, log_count + 1):
logger.debug(str(i) * 10)

assert file.read() == '%s\n' % (str(log_count) * 10)
files_checked.add(file)

files = sort_files_by_num('test.log', tmpdir)

for i in range(min(nb_backups, log_count - 1)):
file = files[i]
j = log_count - i - 1
assert file.read() == '%s\n' % (str(j) * (1 if j == 1 else 10))
files_checked.add(file)

for f in tmpdir.listdir():
assert f in files_checked

@pytest.mark.parametrize('backups', ['1 hour', '1H', 'h', datetime.timedelta(hours=1), pendulum.Interval(hours=1.0)])
def test_backups_time(monkeypatch, tmpdir, logger, backups):
monkeypatch.setattr(loguru, 'now', lambda *a, **k: pendulum.now().add(hours=23))
Expand All @@ -160,47 +130,98 @@ def test_backups_time(monkeypatch, tmpdir, logger, backups):
assert file.read() == 'test\n'

@pytest.mark.parametrize('backups', [0, 1, 9, 10, 11, 50, None])
def test_backups_with_other_files(tmpdir, logger, backups):
def test_backups_count(tmpdir, logger, backups):
log_count = 10
previous_count = 10
files_checked = set()

# Not related files should not be deleted
others = ['test_.log', 'tes.log', 'test.out', 'test.logg', 'atest.log', 'test.log.nope', 'test']
for other in others:
tmpdir.join(other).write(other)
logger.log_to(tmpdir.join('test.log'), rotation=0, backups=backups, format='{message}')

file = tmpdir.join('test.log')
for i in range(log_count):
logger.debug(str(i))

if backups is None or backups >= log_count:
max_num = log_count
else:
max_num = backups

for i in range(max_num + 1):
if i == 0:
file_name = 'test.log'
else:
z = len(str(max_num))
file_name = 'test.log.%s' % str(i).zfill(z)

if i == log_count:
expected = ''
else:
expected = str(log_count - 1 - i) + '\n'

file = tmpdir.join(file_name)
assert file.read() == expected
files_checked.add(file)

for f in tmpdir.listdir():
assert f in files_checked

# Files from previous session sould be deleted if more than backups
for i in reversed(range(1, previous_count + 1)):
tmpdir.join('test.log.%d' % i).write('previous %d' % i)
file.write('previous 0')
@pytest.mark.parametrize('previous', [1, 5, 9, 20])
def test_backups_with_previous_files(tmpdir, logger, previous):
log_count = 5
backups = 9
files_checked = set()

logger.log_to(file.realpath(), rotation=0, backups=backups, format='{message}')
nb_backups = float('inf') if backups is None else backups
for i in reversed(range(previous)):
if i == 0:
file_name = 'test.log'
else:
z = len(str(previous))
file_name = 'test.log.%s' % str(i).zfill(z)
file = tmpdir.join(file_name)
file.write('previous %d' % i)

for i in range(1, log_count + 1):
logger.log_to(tmpdir.join('test.log'), rotation=0, backups=backups, format='{message}')

for i in range(log_count):
logger.debug(str(i))

assert file.read() == '%d\n' % log_count
files_checked.add(file)
for i in range(log_count):
file_name = 'test.log' + ('.%d' % i if i else '')
file = tmpdir.join(file_name)
expected = str(log_count - 1 - i) + '\n'
assert file.read() == expected
files_checked.add(file)

files = sort_files_by_num('test.log', tmpdir)
max_num = min(backups, log_count + previous - 1)

last = min(nb_backups, log_count - 1)
for i in range(last):
file = files[i]
assert file.read() == '%d\n' % (log_count - i - 1)
for i in range(log_count, max_num + 1):
j = i - log_count
expected = 'previous %d' % j
file_name = 'test.log.%d' % i
file = tmpdir.join('test.log.%d' % i)
assert file.read() == expected
files_checked.add(file)

if nb_backups >= log_count:
remaining_backups = nb_backups - log_count
remaining_backups = min(remaining_backups, previous_count)
for i in range(last, last + 1 + remaining_backups):
file = files[i]
assert file.read() == 'previous %d' % (i - last)
files_checked.add(file)
for f in tmpdir.listdir():
assert f in files_checked

def test_backups_with_other_files(tmpdir, logger):
log_count = 9
files_checked = set()

others = ['test_.log', 'tes.log', 'test.out', 'test.logg', 'atest.log', 'test.log.nope', 'test']
for other in others:
tmpdir.join(other).write(other)

logger.log_to(tmpdir.join('test.log'), rotation=0, backups=None, format='{message}')

for i in range(log_count):
logger.debug(str(i))

for i in range(log_count + 1):
file_name = 'test.log' + ('.%d' % i if i else '')
expected = (str(log_count - 1 - i) + '\n') if i != log_count else ''
file = tmpdir.join(file_name)
assert file.read() == expected
files_checked.add(file)

for other in others:
file = tmpdir.join(other)
Expand Down Expand Up @@ -231,10 +252,11 @@ def test_backups_zfill(tmpdir, logger):
logger.debug('0')
assert tmpdir.join('test.log.001').read() == 'c\n'

files = sort_files_by_num('test.log', tmpdir)
assert len(files) == 10 + 100 + 2 * 3
assert files[-1].basename == 'test.log.%d' % len(files)
assert files[-2].read() == 'a\n'
assert tmpdir.join('test.log.103').read() == 'b\n'
assert tmpdir.join('test.log.115').read() == 'a\n'
assert tmpdir.join('test.log.116').read() == ''
assert tmpdir.join('test.log.117').check(exists=0)


@pytest.mark.parametrize('mode', ['a', 'w'])
@pytest.mark.parametrize('backups', [10, None])
Expand Down

0 comments on commit 1c7e9c9

Please sign in to comment.