In [1]:
from os import PathLike
from pathlib import Path, PurePath
from typing import Union

In [2]:
PathType = Union[PathLike, str]

In [3]:
isinstance("foo", PathType)

True

In [4]:
isinstance(Path("foo"), PathType)

True

In [5]:
%%writefile local_file_metadata.py
"""
Object model for the metadata of local filesystem objects.
"""

from dataclasses import asdict, astuple, dataclass, fields
from datetime import datetime
from grp import getgrgid
from hashlib import md5
from os import PathLike
from pathlib import Path
from pwd import getpwuid
from queue import Empty
from stat import filemode
from typing import Iterator, Union


BLOCK_SIZE = 65536

PathType = Union[PathLike, str]


@dataclass
class LocalFileMetadata:
    path: Path
    mode: str
    num_links: int
    num_bytes: int
    user: str
    group: str
    mtime: datetime
    md5: str = None

    @classmethod
    def from_path(cls, p: Path):
        s = p.stat(follow_symlinks=False)
        result = cls(
            p,
            filemode(s.st_mode),
            s.st_nlink,
            s.st_size,
            getpwuid(s.st_uid).pw_name,
            getgrgid(s.st_gid).gr_name,
            datetime.fromtimestamp(s.st_mtime),
        )
        return result

    @property
    def type_code(self):
        return self.mode[0]

    @property
    def astuple(self):
        return astuple(self)

    @property
    def asdict(self):
        return asdict(self)

    def compute_md5(self) -> None:
        if not self.path.is_file():
            self.md5 = "NA"
            return
        h = md5()
        with self.path.open("rb") as fin:
            while data := fin.read(BLOCK_SIZE):
                h.update(data)
        self.md5 = h.hexdigest()


def scan(starting_dir: PathType) -> Iterator[LocalFileMetadata]:
    top = Path(starting_dir)
    for p in top.glob("**/*"):
        yield LocalFileMetadata.from_path(p)


Overwriting local_file_metadata.py


In [6]:
from local_file_metadata import LocalFileMetadata, scan

In [7]:
all_files = list(scan("/Users/hale/Documents/TextMate_Bundles/"))

In [8]:
len(all_files)

1253

In [9]:
all_files[:3]

[LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/.DS_Store'), mode='-rw-r--r--', num_links=1, num_bytes=12292, user='hale', group='staff', mtime=datetime.datetime(2016, 8, 22, 17, 6, 32), md5=None),
 LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/groovy.tmbundle'), mode='drwxr-xr-x', num_links=9, num_bytes=288, user='hale', group='staff', mtime=datetime.datetime(2009, 10, 6, 14, 47, 8), md5=None),
 LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/bad'), mode='drwxr-xr-x', num_links=4, num_bytes=128, user='hale', group='staff', mtime=datetime.datetime(2009, 10, 6, 14, 45, 44), md5=None)]

In [10]:
all_regular_files = [m for m in all_files if m.type_code == "-"]

In [11]:
len(all_regular_files)

934

In [12]:
for md in all_regular_files:
    md.compute_md5()

In [13]:
all_regular_files[-3:]

[LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/Graphviz.tmbundle/Syntaxes/.svn/format'), mode='-r--r--r--', num_links=1, num_bytes=2, user='hale', group='staff', mtime=datetime.datetime(2007, 8, 29, 9, 58, 58), md5='c30f7472766d25af1dc80b3ffc9a58c7'),
 LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/Graphviz.tmbundle/Syntaxes/.svn/all-wcprops'), mode='-r--r--r--', num_links=1, num_bytes=236, user='hale', group='staff', mtime=datetime.datetime(2007, 8, 29, 9, 58, 58), md5='0beac3861cd9354668f55aac6ad980a8'),
 LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/Graphviz.tmbundle/Syntaxes/.svn/text-base/DOT.plist.svn-base'), mode='-r--r--r--', num_links=1, num_bytes=3629, user='hale', group='staff', mtime=datetime.datetime(2007, 8, 29, 9, 58, 58), md5='19252177dfe15aea9b0625aa19b7ac41')]

In [14]:
m = all_regular_files[42]

In [15]:
m

LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/groovy.tmbundle/Snippets/assert(__).tmSnippet'), mode='-rw-r--r--', num_links=1, num_bytes=519, user='hale', group='staff', mtime=datetime.datetime(2009, 10, 6, 14, 47, 8), md5='48e72b7707b0fcdcfcd3b2d6c214caa8')

In [16]:
from collections import Counter

In [17]:
Counter(m.type_code for m in all_files)

Counter({'-': 934, 'd': 318, 'l': 1})

In [18]:
all_links = [m for m in all_files if m.type_code=="l"]

In [19]:
len(all_links)

1

In [20]:
all_links

[LocalFileMetadata(path=PosixPath('/Users/hale/Documents/TextMate_Bundles/active_bundles'), mode='lrwxr-xr-x', num_links=1, num_bytes=56, user='hale', group='staff', mtime=datetime.datetime(2007, 8, 29, 10, 8, 17), md5=None)]

In [21]:
m.astuple

(PosixPath('/Users/hale/Documents/TextMate_Bundles/groovy.tmbundle/Snippets/assert(__).tmSnippet'),
 '-rw-r--r--',
 1,
 519,
 'hale',
 'staff',
 datetime.datetime(2009, 10, 6, 14, 47, 8),
 '48e72b7707b0fcdcfcd3b2d6c214caa8')

In [22]:
m.asdict

{'path': PosixPath('/Users/hale/Documents/TextMate_Bundles/groovy.tmbundle/Snippets/assert(__).tmSnippet'),
 'mode': '-rw-r--r--',
 'num_links': 1,
 'num_bytes': 519,
 'user': 'hale',
 'group': 'staff',
 'mtime': datetime.datetime(2009, 10, 6, 14, 47, 8),
 'md5': '48e72b7707b0fcdcfcd3b2d6c214caa8'}

---

In [23]:
%%writefile mp_d.py
from multiprocessing import Process, Queue
from os import getpid
from queue import Empty
from sys import argv
from time import sleep

from local_file_metadata import LocalFileMetadata, scan


def main() -> None:
    output_path, *extra = argv
    start_path = "."
    if extra:
        assert len(extra) == 1, extra
        start_path, = extra
    run(start_path, output_path)

    
def run(start_path, output_path) -> None:
    print(getpid())
    engine = TaskEngine(run_worker, handle_results, 2)
    for i in range(10):
        engine.post_task(i)
    sleep(1)
    engine.start()
    for i in range(10, 20):
        engine.post_task(i)
    engine.finish()
    print("done")



def run_worker(worker_id, tasks: Queue, results: Queue) -> None:
    pid = getpid()
    print(pid, worker_id, "hello")
    while True:
        # print(pid, worker_id, "get")
        task_def = tasks.get()
        print(*task_def, pid, worker_id, "value")
        results.put((task_def + (pid, worker_id)))
        # print(pid, worker_id, "sleep")
        sleep(0.1)
        # print(pid, i, "wake")


WORK_END = "WORK END"  # sentinel object for task_defs Queue


def handle_results(worker_results: Queue, task_defs: Queue):
    print("handle_results")
    remaining_tasks = set()
    task_defs_closed = False
    while remaining_tasks or not task_defs_closed:
        if not task_defs_closed:
            for task_def in iterate_quick(task_defs):
                if not task_def == WORK_END:
                    assert isinstance(task_def, tuple), task_def
                    task_id = task_def[0]
                    remaining_tasks.add(task_id)
                else:
                    print("Got WORK_END")
                    task_defs_closed = True
        print("fetching results")
        for result in iterate_quick(worker_results):
            print("result", *result)
            task_id = result[0]
            assert task_id in remaining_tasks, (result, remaining_tasks)
            remaining_tasks.remove(task_id)
    assert not remaining_tasks, remaining_tasks


def iterate_quick(q: Queue):
    while True:
        try:
            yield q.get(True, 0.01)
        except Empty:
            return


class TaskEngine:
    def __init__(self, worker_function, results_handler, num_workers: int = 2):
        self.worker_tasks = Queue()
        self.worker_results = Queue()
        self.task_defs = Queue()
        self.workers = [
            Process(
                target=worker_function,
                args=(
                    worker_id,
                    self.worker_tasks,
                    self.worker_results,
                ),
            )
            for worker_id in range(num_workers)
        ]
        for worker in self.workers:
            worker.start()
        self.result_process = Process(
            target=results_handler, args=(self.worker_results, self.task_defs)
        )

    def post_task(self, *data):
        print("posting", *data)
        # remaining_tasks.add(data)
        self.worker_tasks.put(data)
        self.task_defs.put(data)

    def start(self):
        self.result_process.start()

    def finish(self):
        """Wait for self.result_process to exit, then kill all workers and exit."""
        self.task_defs.put(WORK_END)
        self.worker_tasks.close()
        self.task_defs.close()
        self.result_process.join()
        for worker in self.workers:
            print("terminating", worker.name)
            worker.terminate()


if __name__ == "__main__":
    main()


Overwriting mp_d.py


In [24]:
run mp_d.py

18100
posting 0
posting 1
posting 2
posting 3
posting 4
posting 5
posting 6
posting 7
posting 8
posting 9
posting 10
posting 11
posting 12
posting 13
posting 14
posting 15
posting 16
posting 17
posting 18
posting 19
handle_results
Got WORK_END
fetching results
result 0 19514 0
result 1 19515 1
result 2 19514 0
result 3 19515 1
result 4 19514 0
result 5 19515 1
result 6 19514 0
result 7 19515 1
result 8 19514 0
result 9 19515 1
result 10 19514 0
result 11 19515 1
fetching results
fetching results
result 12 19514 0
result 13 19515 1
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
result 14 19514 0
result 15 19515 1
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
fetching results
result 16 19514 0
result 17 19515 1
fetching results
fetching results
fetching results
fetching results
fetching results
