Skip to content

Commit

Permalink
create the class ZmqManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Koyaani committed Jun 13, 2022
1 parent dbb32f0 commit a6795f1
Showing 1 changed file with 78 additions and 60 deletions.
138 changes: 78 additions & 60 deletions py3dtiles/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,61 @@ def execute_process_jobs(self, content):
)


def zmq_send_to_process(idle_clients, socket, message): # to put in class
if not idle_clients:
raise ValueError("idle_clients is empty")
socket.send_multipart([idle_clients.pop(), pickle.dumps(time.time())] + message)
# Manager
class ZmqManager:
def __init__(self, number_of_jobs, process_args):
self.context = zmq.Context()

self.number_of_jobs = number_of_jobs

self.socket = self.context.socket(zmq.ROUTER)
self.socket.bind(IPC_URI)

self.processes = [
multiprocessing.Process(target=zmq_process, args=process_args)
for _ in range(number_of_jobs)
]
[p.start() for p in self.processes]

self.activities = [p.pid for p in self.processes]
self.idle_clients = []

self.processes_killed = -1
self.time_waiting_an_idle_process = 0

def send_to_process(self, message):
if not self.idle_clients:
raise ValueError("idle_clients is empty")
self.socket.send_multipart([self.idle_clients.pop(), pickle.dumps(time.time())] + message)

def send_to_all_process(self, message):
if not self.idle_clients:
raise ValueError("idle_clients is empty")
for client in self.idle_clients:
self.socket.send_multipart([client, pickle.dumps(time.time())] + message)
self.idle_clients.clear()

def can_queue_more_jobs(self):
return len(self.idle_clients) != 0

def add_idle_client(self, client_id):
if client_id in self.idle_clients:
raise ValueError(f"The client id {client_id} is already in idle_clients")
self.idle_clients.append(client_id)

def are_all_processes_idle(self):
return len(self.idle_clients) == self.number_of_jobs

def are_all_processes_killed(self):
return self.processes_killed == self.number_of_jobs

def zmq_send_to_all_process(idle_clients, socket, message):
if not idle_clients:
raise ValueError("idle_clients is empty")
for client in idle_clients:
socket.send_multipart([client, pickle.dumps(time.time())] + message)
idle_clients.clear()
def kill_all_processes(self):
self.send_to_all_process([CommandType.SHUTDOWN.value])
self.processes_killed = 0

def terminate_all_processes(self):
for p in self.processes:
p.terminate()


def is_ancestor(name, ancestor):
Expand Down Expand Up @@ -250,10 +293,6 @@ def add_tasks_to_process(state, name, task, point_count):
tasks_to_process[name] = (tasks, count + point_count)


def can_queue_more_jobs(idles):
return idles


class State:
def __init__(self, pointcloud_file_portions):
self.reader = Reader(input=pointcloud_file_portions, active=[])
Expand Down Expand Up @@ -440,55 +479,36 @@ def convert(files,

max_splitting_jobs_count = max(1, jobs // 2)

# zmq setup
context = zmq.Context()

zmq_skt = context.socket(zmq.ROUTER)
zmq_skt.bind(IPC_URI)

zmq_idle_clients = []

state = State(infos['portions'])

zmq_processes_killed = -1

zmq_processes = [multiprocessing.Process(
target=zmq_process,
args=(
graph, transformer, octree_metadata, outfolder, rgb, verbose)) for _ in range(jobs)]

for p in zmq_processes:
p.start()
activities = [p.pid for p in zmq_processes]

time_waiting_an_idle_process = 0
# zmq setup
zmq_manager = ZmqManager(jobs, (graph, transformer, octree_metadata, outfolder, rgb, verbose))

while True:
# state.print_debug()
now = time.time() - startup
at_least_one_job_ended = False

all_processes_busy = not can_queue_more_jobs(zmq_idle_clients)
while all_processes_busy or zmq_skt.poll(timeout=0, flags=zmq.POLLIN):
all_processes_busy = not zmq_manager.can_queue_more_jobs()
while all_processes_busy or zmq_manager.socket.poll(timeout=0, flags=zmq.POLLIN):
# Blocking read but it's fine because either all our child processes are busy
# or we know that there's something to read (zmq.POLLIN)
start = time.time()
message = zmq_skt.recv_multipart()
message = zmq_manager.socket.recv_multipart()

client_id = message[0]
result = message[1:]
return_type = result[0]

if return_type == ResponseType.IDLE.value:
assert client_id not in zmq_idle_clients
zmq_idle_clients += [client_id]
zmq_manager.add_idle_client(client_id)

if all_processes_busy:
time_waiting_an_idle_process += time.time() - start
zmq_manager.time_waiting_an_idle_process += time.time() - start
all_processes_busy = False

elif return_type == ResponseType.HALTED.value:
zmq_processes_killed += 1
zmq_manager.processes_killed += 1
all_processes_busy = False

elif return_type == ResponseType.READ.value:
Expand Down Expand Up @@ -552,22 +572,22 @@ def convert(files,
else:
raise NotImplementedError(f"The command {return_type} is not implemented")

while state.to_pnts.input and can_queue_more_jobs(zmq_idle_clients):
while state.to_pnts.input and zmq_manager.can_queue_more_jobs():
node_name = state.to_pnts.input.pop()
datas = node_store.get(node_name)
if not datas:
raise ValueError(f'{node_name} has no data')

zmq_send_to_process(zmq_idle_clients, zmq_skt, [CommandType.WRITE_PNTS.value, node_name, datas])
zmq_manager.send_to_process([CommandType.WRITE_PNTS.value, node_name, datas])
node_store.remove(node_name)
state.to_pnts.active.append(node_name)

if can_queue_more_jobs(zmq_idle_clients):
if zmq_manager.can_queue_more_jobs():
potential = sorted(
[(k, v) for k, v in state.node_process.input.items() if k not in state.node_process.active],
key=lambda f: -len(f[0]))

while can_queue_more_jobs(zmq_idle_clients) and potential:
while zmq_manager.can_queue_more_jobs() and potential:
target_count = 100000
job_list = []
count = 0
Expand All @@ -589,19 +609,19 @@ def convert(files,
idx -= 1

if job_list:
zmq_send_to_process(zmq_idle_clients, zmq_skt, [CommandType.PROCESS_JOBS.value] + job_list)
zmq_manager.send_to_process([CommandType.PROCESS_JOBS.value] + job_list)

while (state.reader.input
and (points_in_progress < 60000000 or not state.reader.active)
and (points_in_progress < 60_000_000 or not state.reader.active)
and len(state.reader.active) < max_splitting_jobs_count
and can_queue_more_jobs(zmq_idle_clients)):
and zmq_manager.can_queue_more_jobs()):
if verbose >= 1:
print('Submit next portion {}'.format(state.reader.input[-1]))
_id = 'root_{}'.format(len(state.reader.input)).encode('ascii')
file, portion = state.reader.input.pop()
points_in_progress += portion[1] - portion[0]

zmq_send_to_process(zmq_idle_clients, zmq_skt, [CommandType.READ_FILE.value, pickle.dumps({
zmq_manager.send_to_process([CommandType.READ_FILE.value, pickle.dumps({
'filename': file,
'offset_scale': (
-avg_min,
Expand All @@ -616,10 +636,9 @@ def convert(files,
state.reader.active.append(_id)

# if at this point we have no work in progress => we're done
if len(zmq_idle_clients) == jobs or zmq_processes_killed == jobs:
if zmq_processes_killed < 0:
zmq_send_to_all_process(zmq_idle_clients, zmq_skt, [CommandType.SHUTDOWN.value])
zmq_processes_killed = 0
if zmq_manager.are_all_processes_idle() or zmq_manager.are_all_processes_killed():
if zmq_manager.processes_killed < 0:
zmq_manager.kill_all_processes()
else:
if points_in_pnts != infos['point_count']:
raise ValueError("!!! Invalid point count in the written .pnts"
Expand All @@ -638,8 +657,7 @@ def convert(files,
points_in_pnts,
round(time.time() - startup, 1)))

for p in zmq_processes:
p.terminate()
zmq_manager.terminate_all_processes()
break

if at_least_one_job_ended:
Expand All @@ -665,7 +683,7 @@ def convert(files,
print('{} % points in {} sec [{} tasks, {} nodes, {} wip]'.format(
round(100 * processed_points / infos['point_count'], 2),
round(now, 1),
jobs - len(zmq_idle_clients),
jobs - len(zmq_manager.idle_clients),
len(state.node_process.active),
points_in_progress))
elif verbose >= 0:
Expand All @@ -680,7 +698,7 @@ def convert(files,
node_store.control_memory_usage(cache_size, verbose)

if verbose >= 1:
print('destroy', round(time_waiting_an_idle_process, 2))
print('destroy', round(zmq_manager.time_waiting_an_idle_process, 2))

if graph:
progression_log.close()
Expand All @@ -690,10 +708,10 @@ def convert(files,
import pygal

dateline = pygal.XY(x_label_rotation=25, secondary_range=(0, 100))
for pid in activities:
for pid in zmq_manager.activities:
activity = []
filename = 'activity.{}.csv'.format(pid)
i = len(activities) - activities.index(pid) - 1
i = len(zmq_manager.activities) - zmq_manager.activities.index(pid) - 1
# activities.index(pid) =
with open(filename, 'r') as f:
content = f.read().split('\n')
Expand Down Expand Up @@ -721,7 +739,7 @@ def convert(files,

dateline.render_to_file('activity.svg')

context.destroy()
zmq_manager.context.destroy()


def init_parser(subparser, str2bool):
Expand Down

0 comments on commit a6795f1

Please sign in to comment.