In [1]:
import sys
import os
import inspect
from hashlib import md5
from pathlib import Path
from operator import methodcaller
from itertools import chain
import datetime as dt
import click
import attr
from tqdm import tqdm
import numpy as np
from toolz import curry
from functools import partial
import pandas as pd
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from pprint import pprint  #as pp

from IPython.core.display import HTML
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [2]:
import binascii
import hashlib
def hash_utf8(string):
    """given utf8 string return md5 hash value as hex string"""
    hasher = hashlib.md5()
    hasher.update(string.encode("utf-8"))
    return binascii.hexlify(hasher.digest()).decode("utf-8")

In [3]:
import logging as log
log.disable(50)

In [None]:
bllb_path = str(Path(r"../../../code/python/bllb").resolve())
sys.path.insert(0, bllb_path)
from bllb_logging import *
from bllb import pp  #, hash_utf8

LOG_ON = False
LOG_LEVEL = "WARNING"  #"DEBUG"
def start_log(enable=True, lvl='WARNING', std_lib=True):
    log = setup_logging(enable, lvl, std_lib=std_lib)
    log.info('examinator logging started')
    return log
log_on = LOG_ON
log_level = LOG_LEVEL
log = start_log(log_on, log_level, std_lib=True)

In [4]:
#cluster = LocalCluster(processes=True)
client = Client('127.0.0.1:38615')
client

0,1
Client  Scheduler: tcp://127.0.0.1:38615  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 16.78 GB


In [5]:
def md5_blocks(path, blocksize=1024 * 2048) -> str:
    path = Path(path)
    if not path.is_dir():
        try:
            hasher = md5()
            with path.open('rb') as file:
                block = file.read(blocksize)
                while len(block) > 0:
                    hasher.update(block)
                    block = file.read(blocksize)
            return hasher.hexdigest()
        except Exception as error:
            log.warning(
                f'Error trying to hash item: {str(path)}\nError:\n{error}')
            return
    else:
        dbg(f'Item is a directory and will not be hashed.  {str(path)}')
        return
def glob_paths(path):
    try:
        path = Path(path)
        if path.is_dir():
            return path.rglob('*')
        else:
            return path
    except Exception as error:
        log.warning(error)

In [6]:
def get_stat(path, opt_md5=True, opt_pid=False) -> dict:
    log.debug(path)
    try:
        path = Path(path)
        info = dict([
            _ for _ in inspect.getmembers(path.lstat())
            if not _[0].startswith('_') and not inspect.isbuiltin(_[1])
        ])
        info.update(
            dict([
                _ for _ in inspect.getmembers(path)
                if '__' not in _[0] and '<' not in str(_[1])
            ]))
        info.update(
            dict([(_[0], methodcaller(_[0])(path))
                  for _ in inspect.getmembers(path)
                  if _[0].startswith('is_') and _[0] != 'is_mount']))
        info['path'] = path
        info['path_hash'] = hash_utf8(str(path))
        info['f_atime'] = dt.datetime.fromtimestamp(info['st_atime'])
        info['f_ctime'] = dt.datetime.fromtimestamp(info['st_ctime'])
        info['f_mtime'] = dt.datetime.fromtimestamp(info['st_mtime'])
        if opt_md5:
            if not path.is_dir():
                try:
                    md5_hash = md5_blocks(path)
                    info['md5'] = md5_hash
                except:
                    log.warning(f'Could not hash item: {str(path)}')
            else:
                log.debug(f'Item is a directory and will not be hashed.  {str(path)}'
                    )
        if opt_pid:
            log.debug(f"working using OS pid: {os.getpid()}, opt_pid: {opt_pid}")
        return info
    except Exception as error:
        log.warning(error)

In [7]:
get_stat('.')

{'n_fields': 19,
 'n_sequence_fields': 10,
 'n_unnamed_fields': 3,
 'st_atime': 1552428817.488937,
 'st_atime_ns': 1552428817488936900,
 'st_blksize': 16384,
 'st_blocks': 8,
 'st_ctime': 1552428817.488937,
 'st_ctime_ns': 1552428817488936900,
 'st_dev': 107,
 'st_gid': 0,
 'st_ino': 35488986,
 'st_mode': 16895,
 'st_mtime': 1552428817.488937,
 'st_mtime_ns': 1552428817488936900,
 'st_nlink': 2,
 'st_rdev': 0,
 'st_size': 4096,
 'st_uid': 0,
 '_closed': False,
 '_cparts': [],
 '_drv': '',
 '_parts': [],
 '_root': '',
 '_str': '.',
 'anchor': '',
 'drive': '',
 'name': '',
 'parent': PosixPath('.'),
 'parts': (),
 'root': '',
 'stem': '',
 'suffix': '',
 'suffixes': [],
 'is_absolute': False,
 'is_block_device': False,
 'is_char_device': False,
 'is_dir': True,
 'is_fifo': False,
 'is_file': False,
 'is_reserved': False,
 'is_socket': False,
 'is_symlink': False,
 'path': PosixPath('.'),
 'path_hash': '5058f1af8388633f609cadb75a75dc9d',
 'f_atime': datetime.datetime(2019, 3, 12, 22, 13,

In [8]:
def flatten(lists):
    return reduce(lambda res, x: res + (flatten(x) if isinstance(x, list) else [x]), lists, [])

In [None]:
%%time
basepaths = ['..']
opt_md5=False
#def proc_paths(basepaths, opt_md5=True):
"""proc_paths uses Dask client to map path_stat over basepaths."""
paths = chain.from_iterable(map(glob_paths, basepaths))
pstat = partial(get_stat, opt_md5=opt_md5, opt_pid=True)
results = client.map(pstat, paths)
data = [_.result() for _ in results]
ddf = dd.from_pandas(pd.DataFrame(data), npartitions=4)
df = ddf.compute()
#df['idx'] = df.index
#df['path_hash'] = df.path.map(str).map(hash_utf8)
#times = df.loc[:, ['idx', 'path', 'f_ctime', 'f_mtime', 'f_atime']].melt(id_vars=['idx', 'path'])

In [397]:
path = Path('..')
basepaths = [str(path)]
def proc_item(path):
    return [*map(str, path.iterdir())] + [*map(proc_item, filter(Path.is_dir, path.iterdir()))]
result = proc_item(path)
print(len([*path.rglob('*')]))
print(len(result))
results = [*result]
print(len(results))
final = flatten(results)
print(len(final))

507
34
34
507


In [402]:
def flatten(lists):
    return reduce(lambda res, x: res + (flatten(x.iterdir()) if x.is_dir() else [str(x)]), lists, [])
flatten(Path('.').iterdir())

['.ipynb_checkpoints/cli-checkpoint.py',
 '.ipynb_checkpoints/cli2-checkpoint.py',
 '.ipynb_checkpoints/examinator-checkpoint.py',
 '.ipynb_checkpoints/get_file_info-checkpoint.ipynb',
 '.ipynb_checkpoints/get_file_info-checkpoint.py',
 '.ipynb_checkpoints/get_file_info2-checkpoint.py',
 '.ipynb_checkpoints/scratch-checkpoint.ipynb',
 '.ipynb_checkpoints/script-checkpoint.py',
 '.ipynb_checkpoints/test_joblib-checkpoint.py',
 '.ipynb_checkpoints/Untitled-checkpoint.ipynb',
 '.ipynb_checkpoints/untitled-checkpoint.py',
 'cli.py',
 'cli2.py',
 'dask-worker-space/global.lock',
 'dask-worker-space/purge.lock',
 'dask-worker-space/worker-in4yz8hl.dirlock',
 'dask-worker-space/worker-j3n9yjz3.dirlock',
 'dask-worker-space/worker-vcemojda.dirlock',
 'daskerator.py',
 'examinator.py',
 'examinator2.py',
 'get_file_info.ipynb',
 'get_file_info.py',
 'get_file_info2.py',
 'get_file_info3.py',
 'scratch/.ipynb_checkpoints/asyncio_example-checkpoint.py',
 'scratch/.ipynb_checkpoints/ex_async-check

In [432]:
is_iter = lambda item: item.is_dir()
rfunc = lambda res, x: res + (flatten(x.iterdir()) if is_iter(x) else [str(x)])
def flatten(iterator):
    return reduce(rfunc, iterator, [])
flatten(Path('.').iterdir())

['.ipynb_checkpoints/cli-checkpoint.py',
 '.ipynb_checkpoints/cli2-checkpoint.py',
 '.ipynb_checkpoints/examinator-checkpoint.py',
 '.ipynb_checkpoints/get_file_info-checkpoint.ipynb',
 '.ipynb_checkpoints/get_file_info-checkpoint.py',
 '.ipynb_checkpoints/get_file_info2-checkpoint.py',
 '.ipynb_checkpoints/scratch-checkpoint.ipynb',
 '.ipynb_checkpoints/script-checkpoint.py',
 '.ipynb_checkpoints/test_joblib-checkpoint.py',
 '.ipynb_checkpoints/Untitled-checkpoint.ipynb',
 '.ipynb_checkpoints/untitled-checkpoint.py',
 'cli.py',
 'cli2.py',
 'dask-worker-space/global.lock',
 'dask-worker-space/purge.lock',
 'dask-worker-space/worker-1ewi8jgm.dirlock',
 'dask-worker-space/worker-6iukt8s7.dirlock',
 'dask-worker-space/worker-7_r9up0f.dirlock',
 'daskerator.py',
 'examinator.py',
 'examinator2.py',
 'get_file_info.ipynb',
 'get_file_info.py',
 'get_file_info2.py',
 'get_file_info3.py',
 'scratch/.ipynb_checkpoints/asyncio_example-checkpoint.py',
 'scratch/.ipynb_checkpoints/ex_async-check

In [467]:
import operator
is_iter = lambda item: item.is_dir()
get_kids = lambda parent: parent.iterdir()
get_val = lambda item: flatten(get_kids(item)) if is_iter(item) else [item]
def flatten(iterator):
    results = []
    for i in iterator:
        results = partial(operator.add, results)(get_val(i))
    return results
[*flatten(Path('.').iterdir())]

[PosixPath('.ipynb_checkpoints/cli-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/cli2-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/examinator-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/get_file_info-checkpoint.ipynb'),
 PosixPath('.ipynb_checkpoints/get_file_info-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/get_file_info2-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/scratch-checkpoint.ipynb'),
 PosixPath('.ipynb_checkpoints/script-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/test_joblib-checkpoint.py'),
 PosixPath('.ipynb_checkpoints/Untitled-checkpoint.ipynb'),
 PosixPath('.ipynb_checkpoints/untitled-checkpoint.py'),
 PosixPath('cli.py'),
 PosixPath('cli2.py'),
 PosixPath('dask-worker-space/global.lock'),
 PosixPath('dask-worker-space/purge.lock'),
 PosixPath('dask-worker-space/worker-1ewi8jgm.dirlock'),
 PosixPath('dask-worker-space/worker-6iukt8s7.dirlock'),
 PosixPath('dask-worker-space/worker-7_r9up0f.dirlock'),
 PosixPath('daskerator.py'),
 PosixPath('examinato

In [9]:
def get_dir(d):
    path = Path(d)
    if path.is_dir():
        return [str(_) for _ in path.iterdir()]
get_dir('.')

['.ipynb_checkpoints',
 'cli.py',
 'cli2.py',
 'dask-worker-space',
 'daskerator.py',
 'examinator.py',
 'examinator2.py',
 'get_file_info.ipynb',
 'get_file_info.py',
 'get_file_info2.py',
 'get_file_info3.py',
 'scratch',
 'script.py',
 'test_joblib.py',
 'Untitled.ipynb',
 'untitled.py',
 '__init__.py',
 '__pycache__']

In [10]:
from queue import Queue
from threading import Thread

def multiplex(n, q, **kwargs):
    """ Convert one queue into several equivalent Queues

    >>> q1, q2, q3 = multiplex(3, in_q)
    """
    out_queues = [Queue(**kwargs) for i in range(n)]
    def f():
        while True:
            x = q.get()
            for out_q in out_queues:
                out_q.put(x)
    t = Thread(target=f)
    t.daemon = True
    t.start()
    return out_queues

def push(in_q, out_q):
    while True:
        x = in_q.get()
        out_q.put(x)

def merge(*in_qs, **kwargs):
    """ Merge multiple queues together

    >>> out_q = merge(q1, q2, q3)
    """
    out_q = Queue(**kwargs)
    threads = [Thread(target=push, args=(q, out_q)) for q in in_qs]
    for t in threads:
        t.daemon = True
        t.start()
    return out_q

In [11]:
%%time

from queue import Queue
from threading import Thread
from time import sleep

q = Queue()
remote_q = client.scatter(q)
q1, q2 = multiplex(2, remote_q)
list_q = client.map(get_dir, q1)
l_q = client.gather(list_q)

opt_md5 = True

pstat = partial(get_stat, opt_md5=opt_md5, opt_pid=False)
q3 = client.map(pstat, q2)
result_q = client.gather(q3)

qs = [q, remote_q, q1, q2, list_q, l_q, q3, result_q]

def load_dir(from_q, to_q, stop):
    limit = 300
    i = limit
    while True and ((i and not stop()) or from_q.qsize()):
        if from_q.qsize():
            l = from_q.get()
            if isinstance(l, list):
                for item in l:
                    to_q.put(item)
            i = min(i+1, limit)
        else:
            i -= 1
            sleep(.1)
        #if stop():
            #break

def unloadq(q, stop, limit=2000, rest=.1, check=100):
    i = limit
    loops = 0
    results = []
    while True and ((i and not stop()) or q.qsize()):
        loops += 1
        if loops % check == 0:
            print(i, loops, len(results))
        if q.qsize():
            x = q.get()
            #print(x)
            results.append(x)
            i = min(i+1, limit)
        else:
            i -= 1
            if i % check == 0:
                print(i)
            sleep(rest)
    return results

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 5.37 ms


In [13]:
%%time
#load_thread = Thread(target=load_dir, args=(l_q, q,), daemon = True)
#load_thread.start()
from concurrent.futures import ThreadPoolExecutor
basepaths = ['.']
with ThreadPoolExecutor() as t:
    stop_threads = False
    stop = lambda: stop_threads
    t.submit(load_dir, l_q, q, stop)
    [q.put(str(Path(path).resolve())) for path in basepaths]
    results_future = t.submit(unloadq, result_q, stop, limit=300)
    ilimit = 10
    i = ilimit
    while i:
        alive = sum([_q.qsize() for _q in qs])
        if alive:
            i = min(i+1, ilimit)
            print(alive, i)
        else:
            i -= 1
            print(f'i: {i}')
        sleep(.1)
    stop_threads = True
    #results_list = unloadq(result_q, limit=300)
    results_list = results_future.result()
    results = pd.DataFrame(results_list)
    print(results.info())
    print(results.sample(5))
#t.shutdown(False)
#del(load_thread)

i: 9
i: 8
i: 7
34 8
34 9
33 10
48 10
49 10
46 10
45 10
47 10
43 10
39 10
31 10
43 10
41 10
37 10
52 10
51 10
45 10
38 10
32 10
27 10
30 10
29 10
24 10
15 10
6 10
i: 9
i: 8
i: 7
i: 6
i: 5
i: 4
i: 3
i: 2
i: 1
i: 0
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 54 entries, 0 to 53
Data columns (total 49 columns):
_closed              54 non-null bool
_cparts              54 non-null object
_drv                 54 non-null object
_parts               54 non-null object
_root                54 non-null object
_str                 54 non-null object
anchor               54 non-null object
drive                54 non-null object
f_atime              54 non-null datetime64[ns]
f_ctime              54 non-null datetime64[ns]
f_mtime              54 non-null datetime64[ns]
is_absolute          54 non-null bool
is_block_device      54 non-null bool
is_char_device       54 non-null bool
is_dir               54 non-null bool
is_fifo              54 non-null bool
is_file              54 non-null 