Imports

In [None]:
import sys
!{sys.executable} -m pip install -r ./requirements.txt

In [None]:
from pyarn import lockfile
import json

def openFile():
    pathfile = "./yarn.lock"
    my_lockfile = lockfile.Lockfile.from_file(pathfile)
    return my_lockfile.data


def loadRequirements():
    pathfile = './package.json'
    with open(pathfile, 'r') as f:
        jsonfile = json.load(f)
    accepted_keys = ["dependencies", "devDependencies", "peerDependencies"]
    def filterKeys(pair):
        key, value = pair
        return key in accepted_keys
    return dict(filter(filterKeys, jsonfile.items()))

data = openFile()
print(len(data.keys()))
requirements = loadRequirements()

In [None]:
# Get the latest version for each package

import requests
import time
from tqdm import tqdm

def getCurrentVersion(data, name):
    if ('resolved' not in data[name].keys()):
        return ''
    url = data[name]['resolved'].split('/-/')[0]
    resp = requests.get(url)
    package_data = json.loads(resp.content)
    if ('dist-tags' in package_data.keys()):
        return package_data['dist-tags']['latest']
    return ''


def versionnedPackages(data, requirements):
    # dependencies->names->version
    
    # get all versions by level for each package
    aggregated_names = {}
    for level in requirements.keys():
        for req in requirements[level].keys():
            if (req not in aggregated_names):
                aggregated_names[req] = {}
            aggregated_names[req][level] = requirements[level][req]
    unique_filtered_requirements = aggregated_names.keys()
    # names->dependencies->version

    # we find all matching packages in data (listing installed versions)
    matching_packages = {}
    for requirement in unique_filtered_requirements:
        for level in aggregated_names[requirement]:
            if (requirement not in matching_packages.keys()):
                matching_packages[requirement] = []
            # double set to ensure no double in final array and to use filter as array
            matching_packages[requirement] += list(set(filter(lambda x: f"{requirement}@{aggregated_names[requirement][level]}" in x, data.keys())))
        matching_packages[requirement] = list(set(matching_packages[requirement]))
    # react-dom -> [react-dom@1, react-dom@2]
    packages = {}
    #ex react-dom 
    for package in tqdm(matching_packages.keys()):
        print(package)
        latest = getCurrentVersion(data, matching_packages[package][0]) # should be the same resolver for all
        time.sleep(0.1)
        for one_version in matching_packages[package]: # loop on react-dom@1, react-dom@2
            at_level = [] # list all levels in which this version is found
            for level in aggregated_names[package]:
                if (f"{package}@{aggregated_names[package][level]}" in one_version):
                    at_level.append(level)
            for level in at_level:
                if (package in packages.keys()):
                    packages[package]['version'][level] = data[one_version]['version']
                else:
                    packages[package] = {'version': {}, 'latest': latest, 'target': aggregated_names[package]}
                    packages[package]['version'][level] = data[one_version]['version']

    return packages

v_packages = versionnedPackages(data, requirements)

In [None]:
# Make an excel file for each dependency object with it's packages (target version, current version, latest version)
def saveExcel_versions(listRequirements, listName, packages):
    import pandas as pd
    import math
    table = {}
    list_requirements = listRequirements.keys()

    def filterByKey(pair):
        key, value = pair
        return key in list_requirements

    packages_needed = dict(filter(filterByKey, packages.items()))
    attrs = ['version', 'latest', 'target']
    table['package'] = packages_needed.keys()
    for attr in attrs:
        def stepInLevelOptional(item):
            if (type(item[attr]) is dict and 'level' in item[attr].keys()):
                return item[attr][level]
            return item[attr]
        table[attr] = list(map(stepInLevelOptional, packages_needed.values()))
    print('version', len(table['version']))
    print('latest', len(table['latest']))
    print('package', len(table['package']))

    df = pd.DataFrame(table)

    def colorVersions(x, props=''):
        dependenciesOutdateColors = {
            "1": "#F5C389",            
            "0.3": "#F5E989",
            "0.1": "#BAF3DB",
            "0.0.0": "#89CCF5",
        }
        #condition
        version = max(x["version"].values()).split('.')
        latest = x["latest"].split('.')
        foundColor = '#FFD5D2'
        brokeCondition = False
        for condition, highlightColor in dependenciesOutdateColors.items():
            for precision, differenceAllowed in enumerate(condition.split('.')):
                if (int(latest[precision]) - int(version[precision]) > int(differenceAllowed)):
                    brokeCondition = True
                    break
            if (brokeCondition):
                break
            else:
                foundColor = highlightColor
        return [None, f'background-color:{foundColor}', None, None]


    df = df.style.apply(colorVersions, axis=1)
    df.to_excel(f"./{listName}.xlsx")

for depLevel in requirements.keys():
    print(depLevel)
    saveExcel_versions(requirements[depLevel], depLevel, v_packages)

Create connections between packages

In [None]:
def mergeVersion(str, data):
    return str + '@' + data['dependencies'][str]


def splitVersion(str):
    return str.rsplit('@', 1)


def makeArray(value):
    if (type(value).__name__ != "list"):
        return [value]
    return value

def addOptionalDependencies(package):
    if ('optionalDependencies' in package.keys()):
        if ('dependencies' in package.keys()):
            package['dependencies'] = {**package['dependencies'], **package['optionalDependencies']}
        else:
            package['dependencies'] = package['optionalDependencies']
    return package

def find_closest(name, packages):
    return list(filter(lambda x: name in x,  packages.keys()))[0]

In [None]:
# build a tree of the dependencies (split)

def detailledConnectPackages(data):
    raw_packages = list(data.keys())
    packages = {}
    connections = []
    for package in raw_packages:
        names = map(lambda x: x.strip(), package.split(','))
        for name in names:
            packages[name] = {'isRoot': False,  'original_name': package}
            data[package] = addOptionalDependencies(data[package])
            if ('dependencies' in data[package].keys()):
                for dep in data[package]['dependencies'].keys():
                    connections_origins = map(lambda x: x[0], connections)
                    connections_targets = map(lambda x: x[1], connections)
                    full_dep = mergeVersion(dep, data[package])

                    if not (name in connections_origins and
                            full_dep in connections_targets):
                        connections.append((name, full_dep))
            # fill attributes for package view
            attributes = ['version', 'dependencies']
            for attribute in attributes:
                if (attribute in data[package].keys()):
                    packages[name][attribute] = data[package][attribute]
    return packages, connections

packages, connections = detailledConnectPackages(data)
print(len(list(packages.keys())))

In [None]:
# other version of the root (grouped)

# build a revert tree of the dependencies
# taking each dependencies of a package and adding that package on the dependency as a root

def agglomerateAnyRootWithDetail(packages):
    agglomeratedPackages = {}

    def stepRoot(packages, name, depth, leafcrumb):
        allnames = name.split(',')
        versions = map(lambda x: splitVersion(x)[1], allnames)
        [shortName, version] = splitVersion(allnames[0])
        
        if (shortName not in agglomeratedPackages.keys()):
            agglomeratedPackages[shortName] = {'roots': {}}
        # prevent infinite loops by keeping track of visited branches
        closest_package = find_closest(name, packages)
        packages[closest_package] = addOptionalDependencies(packages[closest_package])
        if (depth < 100 and closest_package in packages.keys() and 'dependencies' in packages[closest_package].keys() and closest_package not in leafcrumb):
            leafcrumb.append(closest_package)
            for dep in packages[closest_package]['dependencies'].keys():
                if (dep not in agglomeratedPackages.keys()):
                    agglomeratedPackages[dep] = {}
                full_dep = mergeVersion(dep, packages[closest_package])
                if ('roots' not in agglomeratedPackages[dep].keys()):
                    agglomeratedPackages[dep]['roots'] = {}
                if (shortName not in agglomeratedPackages[dep]['roots'].keys()):
                    agglomeratedPackages[dep]['roots'][shortName] = []
                agglomeratedPackages[dep]['roots'][shortName] += versions
                packages = stepRoot(packages, full_dep, depth + 1, leafcrumb)
        return packages
    
    for package in packages.keys():
        packages = stepRoot(packages, package, 0, [])
    return agglomeratedPackages

def getRootConnections(agglomeratedPackages):
    rootConnected = []

    for package in agglomeratedPackages.keys():
        for root in agglomeratedPackages[package]['roots'].keys():
            rootConnected.append((root, package))
    return rootConnected

agglomerated = agglomerateAnyRootWithDetail(data)
print(len(agglomerated))

rootConnections = getRootConnections(agglomerated)

In [None]:
# build a revert tree of the dependencies
# taking each dependencies of a package and adding that package on the dependencies as root

def addRootsOnBranches(packages, name, depth, leafcrumb):
    # prevent infinite loops by keeping track of visited branches
    if (depth < 400 and 'dependencies' in packages[name].keys() and name not in leafcrumb):
        leafcrumb.append(name)
        packages[name] = addOptionalDependencies(packages[name])
        for dep in packages[name]['dependencies'].keys():
            full_dep = mergeVersion(dep, packages[name])
            if ('roots' not in packages[full_dep].keys()):
                packages[full_dep]['roots'] = []
            if (name not in packages[full_dep]['roots']):
                packages[full_dep]['roots'].append(name)
                packages[full_dep]['roots'].sort()
            packages = addRootsOnBranches(packages, full_dep, depth + 1, leafcrumb)
    return packages

for package in packages:
    packages = addRootsOnBranches(packages, package, 0, [])
for package in packages:
    if ('roots' not in packages[package].keys()):
        packages[package]['isRoot'] = True

In [None]:
def highlightRepetitions(packages, requirements):
    for part in requirements.keys():
        print(part)
        found_included = 0
        for package in requirements[part].keys():
            if (package in packages.keys()):
                if ('roots' in packages[package] and len(packages[package]['roots']) > 0):
                    found_included += 1
                    print(f"\t {package} ({requirements[part][package]}) already imported in {packages[package]['roots']}")

highlightRepetitions(agglomerated, requirements)

In [None]:
def buildTree(packages):
    final_tree = []
    for package in packages:
        if (packages[package]['isRoot'] and packages[package]['original_name'] not in final_tree):
            final_tree.append(packages[package]['original_name'])
    return final_tree

In [None]:
def list_all_requirements(requirements):
    aggregated_names_version = {}
    aggregated_names = {}
    for level in requirements.keys():
        for req in requirements[level].keys():
            full_name = req + '@' + requirements[level][req]
            if (full_name not in aggregated_names_version.keys()):
                aggregated_names_version[full_name] = level
            if (req not in aggregated_names.keys()):
                aggregated_names[req] = level
    return aggregated_names, aggregated_names_version
all_requirements, all_requirements_version = list_all_requirements(requirements)

In [None]:
# run a Jaal window to display the graph

def makeJaalGraph(packages, edgeConnections, all_req):
    from jaal import Jaal
    import pandas as pd

    edges_raw = {}
    edges_raw['from'] = map(lambda x: x[0], edgeConnections)
    edges_raw['to'] = map(lambda x: x[1], edgeConnections)

    nodes_raw = {}
    
    # attempt to use attributes for Jaal filtering/coloring
    def full_name_package(key):
        if ('version' in packages[key].keys()):
            return key + '@' + packages[key]['version']
        return key
    nodes_raw['is_root'] = list(map(lambda v: all_req.get(full_name_package(v), ''), packages.keys()))
    nodes_raw['number_roots'] = list(map(lambda x: len(x['roots']) or 0, packages.values()))
    
    nodes_raw['id'] = list(packages.keys())
    pd.Index(nodes_raw)
    Jaal(pd.DataFrame(edges_raw), pd.DataFrame(nodes_raw)).plot(directed=True)

# use grouped
makeJaalGraph(agglomerated, rootConnections, all_requirements)

# use split
#makeJaalGraph(packages, connections, all_requirements)