In [1]:
import ast
from pathlib import Path
from typing import List, Dict, Set
from typeguard import typechecked
from itertools import zip_longest
from pyvis.network import Network
from numpy import log as ln
from pydriller import RepositoryMining, ModificationType
from colour import Color

@typechecked
def imports_from_file(file: Path) -> List[str]:
    ''' Use ast to extract all imports from file
    '''
    class ImportVisitor(ast.NodeVisitor):

        def __init__(self):
            self.imports = set()

        def visit_Import(self, import_node):
            for alias in import_node.names:
                self.imports.add(alias.name)
            super(ImportVisitor, self).generic_visit(import_node)

        def visit_ImportFrom(self, import_from_node):
            m1 = import_from_node.module
            if (m1 is not None):
                for m2 in import_from_node.names:
                    self.imports.add(m1 + '.' + m2.name)
            super(ImportVisitor, self).generic_visit(import_from_node)

    try:
        file_ast = ast.parse(open(file).read())
    except:
        print(f"Error parsing {file}")
        return []
    visitor = ImportVisitor()        
    visitor.visit(file_ast)
    imports = list(visitor.imports)
    imports.sort()
    return imports

@typechecked
def is_module_in_package(module: str, package: str) -> bool:
    ''' Check if module (e.g. 'x.y.z') is part of a package (e.g. 'x.y')
    '''
    for m, p in zip_longest(module.split('.'), package.split('.')):
        if (m != p and p != None):
            return False
    return True

assert(is_module_in_package('zeeguu_core.util.hash', 'zeeguu_core.util'))
assert(not is_module_in_package('zeeguu_core.util.hash', 'zeeguu_core.x'))
assert(not is_module_in_package('zeeguu_core.util', 'zeeguu_core.util.hash'))

@typechecked
def import_info_from_file(file: Path, info: Dict[str, int]) -> None:
    ''' Examine all imports from file and update the info dict
        
        info dict is a map from package to import count and only imports
        to packages already in the info dict is considered
    '''
    for i in imports_from_file(file):
        for p in info:
            if (is_module_in_package(i, p)):
                info[p] = info[p] + 1

@typechecked
def LOC(file: Path) -> int:
    ''' Return the number of Lines Of Code in a file
        TODO: discount comment lines?
    '''
    return sum([1 for line in open(file)])

@typechecked
def find_commit_counts(source_folder: Path) -> Dict[Path, int]:
    # Find commits
    all_commits = list(RepositoryMining(str(source_folder)).traverse_commits())
    all_commits.sort(key=lambda c: c.committer_date)

    print(f'Oldest commit: {all_commits[0].committer_date}. Newest commit: {all_commits[-1].committer_date}. Days: {(all_commits[-1].committer_date - all_commits[0].committer_date).days} ')
    print(f'Number of commits: {len(all_commits)}')
    
    commit_counts: Dict[Path, int] = {}
    
    for commit in all_commits:
        for modification in commit.modifications:

            try:
                if modification.change_type == ModificationType.RENAME:
                    new_path: Path = source_folder / modification.new_path
                    old_path: Path = source_folder / modification.old_path
                    commit_counts[new_path]=commit_counts.get(old_path,0)+1
                    commit_counts.pop(old_path)

                elif modification.change_type == ModificationType.DELETE:
                    old_path: Path = source_folder / modification.old_path
                    commit_counts.pop(old_path, '')

                elif modification.change_type == ModificationType.ADD:
                    new_path: Path = source_folder / modification.new_path
                    commit_counts[new_path] = 1

                else: # modification to existing file
                    old_path: Path = source_folder / modification.old_path
                    commit_counts [old_path] += 1
            except Exception as e: 
                print(f"Exception {e} examing commit {str(commit)}")
                pass
    
    return commit_counts

@typechecked
def mapTo01(minValue: int, maxValue: int, mapValue: int) -> float:
    return (mapValue - minValue) / (maxValue - minValue);

@typechecked
def module_view_from_packages(packages: List[Path],
                              source_folder: Path,
                              commit_counts: Dict[Path, int],
                              network_size: str = '500px') -> [Network, Dict[str, Dict]]:
    ''' TODO
        packages must be in order of most to least specific i.e. ['x', 'y.z', 'y']
        
        returns Dict[str, Dict] with
        key 'package name' -> Dict of:
            'files' -> List[Path] the files in package
            'import_info' : Dict[str, int] where str is package name and int is number of imports
            'LOC' : int sum of Lines Of Code in all files in package
            'commits' : int number of commits to files in package
    '''
    # Test package order is correct
    for i, p in enumerate(packages):
        for previous in packages[i+1:]:
            try:
                previous.relative_to(p)
                assert False, f"Wrong package order of {previous} and {p}"
            except ValueError:
                pass
    
    # The dict of infos for each package - we will add to the inner dict below
    package_infos: Dict[str, Dict] =  { str(p.relative_to(source_folder)).replace('\\','.'): {'path': p} for p in packages}

    # Add list of files to inner package_infos dict
    for name, info in package_infos.items():
        files = [file for file in info['path'].rglob('*.py')]
        info['files'] = files
    
    # Remove duplicate files
    current_files = set()
    for k, v in package_infos.items():
        v['files'] = [f for f in v['files'] if not f in current_files]
        current_files |= set(v['files'])
    del current_files
        
    # Test that no duplicate files exist
    all_files = set()
    for k, v in package_infos.items():
        for f in v['files']:
            assert not f in all_files, f'Duplicate file {f} in {k}'
            all_files.add(f)
    del all_files
    
    # Add import info to inner dict in package_infos
    for name, info in package_infos.items():
        import_info = {k:0 for k in package_infos.keys()}
        for file in info['files']:
            import_info_from_file(file, import_info)
        info['import_info'] = import_info
       
    # Add LOC to inner dict in package_infos
    total_loc = 0
    for name, info in package_infos.items():
        loc = sum([LOC(f) for f in info['files']])
        info['LOC'] = loc
        total_loc += loc

    # Calculate commits per package
    max_commits: int = 0
    min_commits: int = 0
    for pk, pv in package_infos.items():
        commits:int = 0
        for f in pv['files']:
            if f in commit_counts:
                commits += commit_counts[f]
        pv['commits'] = commits
        max_commits = max(max_commits, commits)
        min_commits = min(min_commits, commits)
    
    # Create network
    net = Network(directed=True, notebook=True, height=network_size, width=network_size)
    net.set_edge_smooth('dynamic')
    colors = list(Color('green').range_to(Color('red'),100))
    
    # Nodes
    size_scale = 100/(len(package_infos)**0.1*total_loc)
    for name, info in package_infos.items():
        title: str = f'{name}. LOC: {str(info["LOC"])}. Commits: {str(info["commits"])}'
        color: Color = colors[int(99*mapTo01(min_commits, max_commits, info['commits']))]
        net.add_node(name, size = size_scale*info['LOC'], title = title, label=name, color = str(color))

    # Edges
    for name, info in package_infos.items():
        for dependency, count in info['import_info'].items():
            if (name != dependency and count > 0):
                net.add_edge(name, dependency, title = str(count), width = ln(count), arrowStrikethrough=False)

    return [net, package_infos]

In [2]:
# Root folder of all source code analyzed
source_folder: Path = Path('C:/Users/Carsten/source/repos/Zeeguu-API')

# Find commit_counts - only do once since it takes some time
try: commit_counts
except NameError: commit_counts = find_commit_counts(source_folder)

   
# Generate module view using top level folders
packages: List[Path] = [x for x in source_folder.iterdir() if x.is_dir() and not x.name.startswith('.')]
[net, package_infos] = module_view_from_packages(packages, source_folder, commit_counts)
net.show('module_view_simple.html')


Oldest commit: 2017-03-04 19:40:02+01:00. Newest commit: 2021-04-27 19:35:49+03:00. Days: 1514 
Number of commits: 876


In [3]:
# Find the file in zeeguu_core that has a reference to zeeguu_api
p = package_infos['zeeguu_core']
for f in p['files']:
    imports = imports_from_file(f)
    for i in imports:
        if i.startswith('zeeguu_api'):
            print(f)

C:\Users\Carsten\source\repos\Zeeguu-API\zeeguu_core\emailer\zeeguu_mailer.py


In [4]:
# Generate module view using top level folders and sub-folders of zeeguu_core
packages: List[Path] = [x for x in source_folder.iterdir() if x.is_dir() and not x.name.startswith('.') and not x.name.startswith('zeeguu_core')]
packages.extend([x for x in (source_folder/'zeeguu_core').iterdir() if x.is_dir() and not x.name.startswith('.')])

[net, package_infos] = module_view_from_packages(packages, source_folder, commit_counts, '1000px')
net.show('module_view_detailed.html')


In [5]:
# Generate module view using top level folders and sub-folders of zeeguu_core
packages: List[Path] = [source_folder / p for p in [
                        'tools',
                        'zeeguu_api',
                        'zeeguu_api_test',
                        'zeeguu_core_test',
                        'zeeguu_core/model',
                        #'zeeguu_core/language',
                        #'zeeguu_core/word_scheduling',
                        'zeeguu_core'
]]

[net, package_infos] = module_view_from_packages(packages, source_folder, commit_counts, '800px')
#net.show_buttons(filter_=['physics'])
net.barnes_hut(gravity=-2000, central_gravity=.3, spring_length=400, spring_strength=0.04, damping=0.09, overlap=1)
net.show('module_view.html')
