In [1]:
from tabulate import tabulate
from typing import NamedTuple, Optional

import operator
import threading
import time
import toolz


class Data(NamedTuple):
  name: str
  size: Optional[float]
  state: Optional[bool]
  date: Optional[str]
  time: Optional[str]


def generate_data():
  yield Data("3D Objects", 35.1, True, "06/06/2020", "00:55")
  yield Data(".pylint.d", 28.85, False, "03/17/2021", "23:46")
  yield Data("Favorites", None, None, None, None)
  yield Data("temp", 19.64, True, "05/14/2020", "22:39")
  yield Data(".gitconfig", 0.74, True, "02/28/2021", "22:55")
  yield Data(".bashrc", 4.12, True, "02/28/2021", "21:55")
  # print('all data generated')


@toolz.curry
def with_default(f, default, value, need_default=lambda v: v is None):
  if need_default(value):
    return default
  return f(value)


@toolz.curry
def by_formatter(formatter, value):
  return formatter.format(value)


@toolz.curry
def by_table(table, value):
  for callback, result in table.items():
    if callback(value):
      return result
  raise ValueError(f'cannot find a match in {table} for {value}')


@toolz.curry
def by_joiner(sep, segments):
  return sep.join(segments)


format_name = operator.attrgetter('name')

format_abs_size = toolz.compose(
  with_default(
    by_formatter('{:.1f}MB'),
    '0.0MB'
  ),
  operator.attrgetter('size')
)

relative_size_table = {
  lambda v: v < 10: 'Small',
  lambda v: v >= 10 and v < 20: 'Medium',
  lambda v: v >= 20: 'Big'
}
format_relative_size = toolz.compose(
  with_default(
    by_table(relative_size_table),
    'Small'
  ),
  operator.attrgetter('size')
)

state_table = {
  toolz.identity: 'Success',
  toolz.complement(toolz.identity): 'Falure'
}
format_state = toolz.compose(
  with_default(
    by_table(state_table),
    'Success'
  ),
  operator.attrgetter('state')
)

format_date_time = toolz.compose(
  with_default(
    by_joiner(' '),
    'Unknown',
    need_default = toolz.compose(any, toolz.curried.map(lambda v: v is None))
  ),
  toolz.juxt(operator.attrgetter('date'), operator.attrgetter('time'))
)


In [12]:
def debug_print_thread(data):
  thread_id = f'0x{threading.current_thread().ident:x}'
  # print(f'handle data in {thread_id}')
  time.sleep(1)
  return thread_id
  

juxt_a_row = toolz.juxt(
  format_name,
  format_abs_size,
  format_relative_size,
  format_state,
  format_date_time,
  debug_print_thread,
)

In [13]:
headers = 'Name|Abs. Size|Rel. Size|State|Date Time|Thread ID'.split('|')
print_table = toolz.compose(
  print, toolz.partial(tabulate, headers=headers), list
)

普通的map函数是阻塞、顺序处理：

In [14]:
print_table(
  map(juxt_a_row, generate_data())
)

Name        Abs. Size    Rel. Size    State    Date Time         Thread ID
----------  -----------  -----------  -------  ----------------  --------------
3D Objects  35.1MB       Big          Success  06/06/2020 00:55  0x7f5661a0f780
.pylint.d   28.9MB       Big          Falure   03/17/2021 23:46  0x7f5661a0f780
Favorites   0.0MB        Small        Success  Unknown           0x7f5661a0f780
temp        19.6MB       Medium       Success  05/14/2020 22:39  0x7f5661a0f780
.gitconfig  0.7MB        Small        Success  02/28/2021 22:55  0x7f5661a0f780
.bashrc     4.1MB        Small        Success  02/28/2021 21:55  0x7f5661a0f780


引入[多线程map](https://github.com/python/cpython/blob/ce4d25f3cd0a1c6e65b64015140fb5e1397c8ac5/Lib/multiprocessing/pool.py#L468)，可以快速形成并发操作：

In [15]:
from multiprocess.pool import ThreadPool

In [16]:
with ThreadPool(processes=4) as pool:
  table = pool.map(juxt_a_row, generate_data())
  print_table(table)

Name        Abs. Size    Rel. Size    State    Date Time         Thread ID
----------  -----------  -----------  -------  ----------------  --------------
3D Objects  35.1MB       Big          Success  06/06/2020 00:55  0x7f563de7e700
.pylint.d   28.9MB       Big          Falure   03/17/2021 23:46  0x7f563ac33700
Favorites   0.0MB        Small        Success  Unknown           0x7f563826e700
temp        19.6MB       Medium       Success  05/14/2020 22:39  0x7f5637a6d700
.gitconfig  0.7MB        Small        Success  02/28/2021 22:55  0x7f563de7e700
.bashrc     4.1MB        Small        Success  02/28/2021 21:55  0x7f563826e700


多线程map可能会预取所有任务。需要防止预取的话可以使用`pool.imap`或提前进行partition。

In [17]:
with ThreadPool(processes=4) as pool:
  for chunk in toolz.partition_all(4, generate_data()):
    table = pool.map(juxt_a_row, chunk)
    print_table(table)

Name        Abs. Size    Rel. Size    State    Date Time         Thread ID
----------  -----------  -----------  -------  ----------------  --------------
3D Objects  35.1MB       Big          Success  06/06/2020 00:55  0x7f563de7e700
.pylint.d   28.9MB       Big          Falure   03/17/2021 23:46  0x7f563722c700
Favorites   0.0MB        Small        Success  Unknown           0x7f563826e700
temp        19.6MB       Medium       Success  05/14/2020 22:39  0x7f5637a6d700
Name        Abs. Size    Rel. Size    State    Date Time         Thread ID
----------  -----------  -----------  -------  ----------------  --------------
.gitconfig  0.7MB        Small        Success  02/28/2021 22:55  0x7f563de7e700
.bashrc     4.1MB        Small        Success  02/28/2021 21:55  0x7f563826e700


In [18]:
next(toolz.partition_all(4, generate_data()))

(Data(name='3D Objects', size=35.1, state=True, date='06/06/2020', time='00:55'),
 Data(name='.pylint.d', size=28.85, state=False, date='03/17/2021', time='23:46'),
 Data(name='Favorites', size=None, state=None, date=None, time=None),
 Data(name='temp', size=19.64, state=True, date='05/14/2020', time='22:39'))

In [21]:
with ThreadPool(processes=4) as pool:
  async_formatter = toolz.compose(
    print_table,
    toolz.partial(pool.map, juxt_a_row)
  )
  list(map(async_formatter, toolz.partition_all(4, generate_data())))

Name        Abs. Size    Rel. Size    State    Date Time         Thread ID
----------  -----------  -----------  -------  ----------------  --------------
3D Objects  35.1MB       Big          Success  06/06/2020 00:55  0x7f563ac33700
.pylint.d   28.9MB       Big          Falure   03/17/2021 23:46  0x7f563de7e700
Favorites   0.0MB        Small        Success  Unknown           0x7f5638a6f700
temp        19.6MB       Medium       Success  05/14/2020 22:39  0x7f563722c700
Name        Abs. Size    Rel. Size    State    Date Time         Thread ID
----------  -----------  -----------  -------  ----------------  --------------
.gitconfig  0.7MB        Small        Success  02/28/2021 22:55  0x7f563ac33700
.bashrc     4.1MB        Small        Success  02/28/2021 21:55  0x7f563722c700
