In [1]:
from functional import seq

In [2]:
from typing import Iterator, NamedTuple

Id = int
Ids = Iterator[Id]

TestReference = str
Line = int
Lines = Iterator[Line]

FilePath = str
FilePaths = Iterator[FilePath]

TestName = str
TestNames = Iterator[TestName]

class FileTableRow(NamedTuple):
    file_id: Id
    path: FilePath

class ContextTableRow(NamedTuple):
    context_id: Id
    context: TestReference

class LineTableRow(NamedTuple):
    file_id: Id
    context_id: Id
    lineno: Line

In [3]:
db_path = '../tests/data/.coverage'
file_table_rows = seq.sqlite3(db_path, 'SELECT * FROM file').map(lambda x: FileTableRow(*x))

In [4]:
type(file_table_rows)

functional.pipeline.Sequence

In [5]:
file_table_rows.to_list()

[FileTableRow(file_id=1, path='/home/fk/github/python-tia/tia/__init__.py'),
 FileTableRow(file_id=2, path='/home/fk/github/python-tia/tia/config.py'),
 FileTableRow(file_id=3, path='/home/fk/github/python-tia/tia/env.py')]

In [6]:
# filter files by id
file_table_rows.filter(lambda x: x.file_id == 1)

file_id,path
1,/home/fk/github/python-tia/tia/__init__.py


In [7]:
# filter files by name
file_table_rows.filter(lambda x: 'config.py' in x.path)

file_id,path
2,/home/fk/github/python-tia/tia/config.py


In [8]:
context_table_rows = seq.sqlite3(db_path, 'SELECT * FROM context').map(lambda x: ContextTableRow(*x))

In [9]:
context_table_rows.to_list()

[ContextTableRow(context_id=1, context=''),
 ContextTableRow(context_id=2, context='test_reading_existing_valid_config_file_returns_string'),
 ContextTableRow(context_id=3, context='test_reading_existing_invalid_config_file_raises_error'),
 ContextTableRow(context_id=4, context='test_reading_non_existing_config_file_raises_exception'),
 ContextTableRow(context_id=5, context='test_read_valid_parent_key_config'),
 ContextTableRow(context_id=6, context='test_read_valid_explicit_full_blown_pipelines_config'),
 ContextTableRow(context_id=7, context='test_read_valid_implicit_full_blown_pipelines_config'),
 ContextTableRow(context_id=8, context='test_read_valid_single_pipeline_with_dirs_only_config'),
 ContextTableRow(context_id=9, context='test_read_valid_single_pipeline_with_files_only_config'),
 ContextTableRow(context_id=10, context='test_read_invalid_pipelines_config'),
 ContextTableRow(context_id=11, context='test_is_some_ci'),
 ContextTableRow(context_id=12, context='test_is_no_ci'),
 

In [10]:
# filter contexts by id
context_table_rows.filter(lambda x: x.context_id == 4)

context_id,context
4,test_reading_non_existing_config_file_raises_exception


In [11]:
# filter contexts by name
context_table_rows.filter(lambda x: 'test_reading_existing' in x.context)

context_id,context
2,test_reading_existing_valid_config_file_returns_string
3,test_reading_existing_invalid_config_file_raises_error


In [12]:
line_table_rows = seq.sqlite3(db_path, 'SELECT * FROM line').map(lambda x: LineTableRow(*x))

In [13]:
type(line_table_rows)

functional.pipeline.Sequence

In [14]:
line_table_rows.to_list()

[LineTableRow(file_id=1, context_id=1, lineno=1),
 LineTableRow(file_id=2, context_id=1, lineno=1),
 LineTableRow(file_id=2, context_id=1, lineno=2),
 LineTableRow(file_id=2, context_id=1, lineno=3),
 LineTableRow(file_id=2, context_id=1, lineno=5),
 LineTableRow(file_id=2, context_id=1, lineno=8),
 LineTableRow(file_id=2, context_id=1, lineno=9),
 LineTableRow(file_id=2, context_id=1, lineno=12),
 LineTableRow(file_id=2, context_id=1, lineno=25),
 LineTableRow(file_id=2, context_id=1, lineno=30),
 LineTableRow(file_id=3, context_id=1, lineno=7),
 LineTableRow(file_id=3, context_id=1, lineno=10),
 LineTableRow(file_id=3, context_id=1, lineno=20),
 LineTableRow(file_id=3, context_id=1, lineno=27),
 LineTableRow(file_id=3, context_id=1, lineno=34),
 LineTableRow(file_id=3, context_id=1, lineno=41),
 LineTableRow(file_id=3, context_id=1, lineno=48),
 LineTableRow(file_id=3, context_id=1, lineno=55),
 LineTableRow(file_id=3, context_id=1, lineno=62),
 LineTableRow(file_id=3, context_id=1, 

In [15]:
# filter lines by file id
line_table_rows.filter(lambda x: x.context_id == 1)

file_id,context_id,lineno
1,1,1
2,1,1
2,1,2
2,1,3
2,1,5
2,1,8
2,1,9
2,1,12
2,1,25
2,1,30


In [16]:
Id = int
Ids = Iterator[Id]

FilePath = str
FilePaths = Iterator[FilePath]

TestName = str
TestNames = Iterator[TestName]


class CoverageMapSingle(NamedTuple):
    """
    File based coverage map (not line based w.r.t. production code functions)
    of single test name to potentially multiple production code files.
    """
    test: TestName
    production_code: FilePaths


CoverageMap = Iterator[CoverageMapSingle]
# File based coverage map (not line based) of test names to potentially multiple production code files per test name.


class ImpactMapSingle(NamedTuple):
    """
    File based impact map (not line based w.r.t. test code functions)
    of single production code file to potentially multiple test names
    """
    production_code: FilePath
    tests: TestNames


ImpactMap = Iterator[ImpactMapSingle]

In [17]:
from typing import NamedTuple

FilePath = str

Line = int
Lines = Iterator[Line]

class FileCoverage(NamedTuple):
    """
    File based line coverage.
    """
    production_code: FilePath
    covered_lines: Lines

FileCoverages = Iterator[FileCoverage]

In [18]:
def get_covered_lines(file_table_rows, line_table_rows, file_paths) -> FileCoverages:
    """
    For every file_path:
        file_path -> file_table -> file_id -> line_table -> covered_lines
    
    File path line numbers are unsorted.
    """
    for file_path in file_paths:
        file = file_table_rows.filter(lambda x: file_path in x.path)  # fuzzy search like filtering
        file_id = file.head().file_id
        covered_lines_rows = line_table_rows.filter(lambda x: x.file_id == file_id)
        covered_lines = (l.lineno for l in covered_lines_rows)
        yield FileCoverage(file_path, covered_lines)

In [19]:
path_name_iterator = iter(['tia/__init__.py', 'tia/config.py', 'tia/env.py'])
file_coverages = get_covered_lines(file_table_rows, line_table_rows, path_name_iterator)

In [20]:
type(file_coverages)

generator

In [21]:
# visualize get_covered_lines output
for fc in file_coverages:
    import pprint
    pprint.pprint(fc.production_code)
    for cl in fc.covered_lines:
        pprint.pprint(cl)

'tia/__init__.py'
1
'tia/config.py'
1
2
3
5
8
9
12
25
30
17
18
20
22
17
18
20
21
17
18
19
26
27
26
27
31
32
33
34
35
36
37
38
39
40
41
42
44
45
46
47
51
52
53
26
27
31
32
33
34
35
36
37
38
39
40
41
42
44
45
46
47
51
52
53
26
27
31
32
33
34
35
36
37
38
39
40
41
42
44
45
46
47
51
52
53
26
27
31
32
33
34
35
36
37
38
39
40
41
42
44
45
46
47
51
52
53
26
27
31
32
33
34
35
36
37
38
39
40
41
42
44
45
46
47
51
52
54
55
'tia/env.py'
7
10
20
27
34
41
48
55
62
69
76
83
90
12
23
24
91
92
15
94
30
31
37
38
44
45
51
52
13
58
59
65
66
72
73
79
80
14
86
87
12
23
24
91
94
30
31
37
38
44
45
51
52
13
58
59
65
66
72
73
79
80
14
86
87
17


In [22]:
def get_impact_map(file_table_rows, line_table_rows, context_table_rows, file_paths) -> ImpactMap:
    """
    For every file_path:
        file_path -> file_table_rows -> file_id -> line_table_row -> tests
    """
    for file_path in file_paths:
        # TODO: make search strict and fuzzy search in separate function (SRP)
        file = file_table_rows.filter(lambda x: file_path in x.path)  # fuzzy search like filtering
        file_id = file.head().file_id
        impacted_lines_rows = line_table_rows.filter(lambda x: x.file_id == file_id)
        test_ids = (l.context_id for l in impacted_lines_rows)
        tests = set()
        for test_id in test_ids:
            impacted_context_rows = context_table_rows.filter(lambda x: x.context_id == test_id)
            test = impacted_context_rows.head().context
            # workaround for coveragepy 5.02a empty and irrelevant context entries 
            if test is not "" or not "testsfailed":
                tests.add(test)
        filter(None, tests)  # get rid of empty set element
        yield ImpactMapSingle(file_path, tests)
        del tests  # free memory from set

In [23]:
changed_production_code = ['tia/config.py', 'tia/env.py']
changed_production_code_iterator = iter(changed_production_code)

In [24]:
impacted_test_code = get_impact_map(file_table_rows, line_table_rows, context_table_rows, changed_production_code_iterator)

In [25]:
impacted_test_code

# visualize get_impact_map output
for itc in impacted_test_code:
    import pprint
    pprint.pprint('prod code: ' + itc.production_code)
    for test in itc.tests:
        pprint.pprint('test: ' + test)

'prod code: tia/config.py'
'test: test_read_valid_explicit_full_blown_pipelines_config'
'test: test_read_valid_single_pipeline_with_files_only_config'
'test: test_read_valid_single_pipeline_with_dirs_only_config'
'test: test_read_valid_implicit_full_blown_pipelines_config'
'test: test_reading_existing_invalid_config_file_raises_error'
'test: test_read_invalid_pipelines_config'
'test: test_reading_non_existing_config_file_raises_exception'
'test: test_reading_existing_valid_config_file_returns_string'
'test: test_read_valid_parent_key_config'
'prod code: tia/env.py'
'test: test_is_some_ci'
'test: test_is_no_ci'


In [26]:
def get_coverage_map(context_table_rows, line_table_rows, file_table_rows, tests) -> CoverageMap:
    """
    For every test:
        test aka context -> context_table_rows -> context_id -> line_table_row -> file_paths
    """
    for test_name in tests:
        # TODO: make search strict and fuzzy search in separate function (SRP)
        test = context_table_rows.filter(lambda x: test_name in x.context)  # fuzzy search like filtering
        test_id = test.head().context_id
        covered_lines_rows = line_table_rows.filter(lambda x: x.context_id == test_id)
        file_ids = (l.file_id for l in covered_lines_rows)
        file_paths = set()
        for file_id in file_ids:
            covered_file_rows = file_table_rows.filter(lambda x: x.file_id == file_id)
            file_path = covered_file_rows.head().path
            file_paths.add(file_path)        
        yield CoverageMapSingle(test_name, file_paths)
        del file_paths  # free memory from set

In [27]:
changed_test_code = ['test_read_valid_parent_key_config', 'test_read_invalid_pipelines_config', 'test_is_no_ci']

In [28]:
impacted_production_code = get_coverage_map(context_table_rows, line_table_rows, file_table_rows, changed_test_code)

# visualize get_impact_map output
for ipc in impacted_production_code:
    import pprint
    pprint.pprint('test: ' + ipc.test)
    #pprint.pprint(ipc)
    for pc in ipc.production_code:
        pprint.pprint('production code: ' + pc)

'test: test_read_valid_parent_key_config'
'production code: /home/fk/github/python-tia/tia/config.py'
'test: test_read_invalid_pipelines_config'
'production code: /home/fk/github/python-tia/tia/config.py'
'test: test_is_no_ci'
'production code: /home/fk/github/python-tia/tia/env.py'
