Skip to content

Commit

Permalink
update multiprocessed metrics to use queue over shared dict
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshCordova committed Jun 7, 2021
1 parent 5c778b0 commit 4c95c77
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/config/pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ ignore-comments=yes
ignore-docstrings=yes

# Ignore imports when computing similarities.
ignore-imports=no
ignore-imports=yes

# Minimum lines number of a similarity.
min-similarity-lines=4
Expand Down Expand Up @@ -275,7 +275,7 @@ indent-string=' '
max-line-length=120

# Maximum number of lines in a module.
max-module-lines=1024
max-module-lines=1048

# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
Expand Down
82 changes: 54 additions & 28 deletions src/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import copy
import os.path
import random

import re
import readline
import subprocess
Expand All @@ -13,10 +13,11 @@
from collections import defaultdict
from enum import Enum
from functools import partial
from multiprocessing import Manager, Pool
from os import getpid, listdir # pylint: disable=unused-import
from multiprocessing import Manager, Pool, Lock # pylint: disable=unused-import
from os import listdir
from pathlib import Path
from typing import Iterable, Optional, Union, cast
from queue import Queue
from typing import Iterable, Optional, Union, cast, Tuple, Callable

import numpy # type: ignore
from rich.console import Console
Expand Down Expand Up @@ -96,29 +97,40 @@ def multiprocess_import(shared_dict: dict[str, ControlFlowGraph], file: str) ->
shared_dict[filepath] = graph


def multiprocess_metrics(metrics_generator: metric.MetricAbstract,
def multiprocess_metrics(metrics_generators: dict[str, metric.MetricAbstract],
shared_dict: dict[tuple[str, str], Union[int, PathComplexityRes]],
graph: ControlFlowGraph) -> None:
queue: Queue[Tuple[ControlFlowGraph, str]],
lock: Callable[[], None],
process_count: int) -> None:
"""Handle the multiprocessing of metrics."""
try:
if metrics_generator.name() == "Lines of Code" and \
graph.metadata.language is not KnownExtensions.Python:
return
print(f"Starting thread {process_count}")
while True:
with lock: # type: ignore
if not queue.empty():
graph, generator_name = queue.get()
else:
break
metrics_generator = metrics_generators[generator_name]
timeout = 1200 if metrics_generator.name() == "Path Complexity" else 180
try:
if metrics_generator.name() == "Lines of Code" and \
graph.metadata.language is not KnownExtensions.Python:
continue

print(f"Getting {metrics_generator.name()} for graph {graph.name} on process {os.getpid()}.")
with Timeout(180, "Took too long!"):
result = metrics_generator.evaluate(graph)
with Timeout(timeout, "Took too long!"):
result = metrics_generator.evaluate(graph)

if graph.name is None:
raise ValueError("No Graph name.")
if graph.name is None:
raise ValueError("No Graph name.")

shared_dict[(graph.name, metrics_generator.name())] = result
except IndexError as err:
print(graph)
print(err)
except TimeoutError as err:
print(err, graph.name, metrics_generator.name())
shared_dict[(graph.name, metrics_generator.name())] = ("NA", "Timeout")
shared_dict[(graph.name, metrics_generator.name())] = result
except IndexError as err:
print(graph)
print(err)
except TimeoutError as err:
print(err, graph.name, metrics_generator.name())
shared_dict[(graph.name, metrics_generator.name())] = ("NA", "Timeout")
print(f"Thread {process_count} is done.")


class REPLOptions():
Expand Down Expand Up @@ -552,16 +564,30 @@ def do_list(self, flags: Options, list_typename: str) -> None:

def do_metrics_multithreaded(self, cfgs: list[ControlFlowGraph]) -> None:
"""Compute all of the metrics for some set of graphs using parallelization."""
pool = Pool(8)
random.shuffle(cfgs)
pool_size = 8
pool = Pool(pool_size)
manager = Manager()
graph_queue = manager.Queue()
lock = manager.Lock() # pylint: disable=no-member
cfgs = sorted(cfgs, key=lambda cfg: len(cfg.graph.vertices()), reverse=True)
results: defaultdict[str, list[tuple[str, MetricRes]]] = defaultdict(list)
shared_dict: dict[tuple[str, str], Union[int, PathComplexityRes]] = manager.dict()
async_results = []
# Queue up all of the cfgs / metrics to execute
for metrics_generator in self.controller.metrics_generators[::-1]:
async_res = pool.map_async(partial(multiprocess_metrics, metrics_generator, shared_dict), cfgs)
async_results.append(async_res)
list(map(lambda x: x.wait(), async_results))
for cfg in cfgs:
graph_queue.put((cfg, metrics_generator.name()))

generator_dict = {generator.name(): generator for generator in self.controller.metrics_generators}

func_to_execute = partial(
multiprocess_metrics,
generator_dict,
shared_dict,
graph_queue,
lock)
args = list(range(pool_size))

result = pool.map(func_to_execute, args, chunksize=1) # pylint: disable=unused-variable
for (name, metric_generator), res in shared_dict.items():
results[name].append((metric_generator, res))
self.data.metrics.update(results)
Expand Down

0 comments on commit 4c95c77

Please sign in to comment.