Skip to content

Commit

Permalink
move conditions outside methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Koyaani committed Jul 4, 2022
1 parent 1042d09 commit 2638c31
Showing 1 changed file with 61 additions and 59 deletions.
120 changes: 61 additions & 59 deletions py3dtiles/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,14 @@ def convert(self):
if not self.zmq_manager.can_queue_more_jobs() or self.zmq_manager.socket.poll(timeout=0, flags=zmq.POLLIN):
at_least_one_job_ended = self.process_message()

self.send_pnts_to_write()
self.send_points_to_process(now)
self.send_file_to_read()
while self.state.pnts_to_writing and self.zmq_manager.can_queue_more_jobs():
self.send_pnts_to_write()

if self.zmq_manager.can_queue_more_jobs():
self.send_points_to_process(now)

while self.state.can_add_reading_jobs() and self.zmq_manager.can_queue_more_jobs():
self.send_file_to_read()

# if at this point we have no work in progress => we're done
if self.zmq_manager.are_all_processes_idle() and not self.zmq_manager.killing_processes:
Expand Down Expand Up @@ -636,67 +641,64 @@ def dispatch_processed_nodes(self, content):
self.state.waiting_writing_nodes.clear()

def send_pnts_to_write(self):
while self.state.pnts_to_writing and self.zmq_manager.can_queue_more_jobs():
node_name = self.state.pnts_to_writing.pop()
data = self.node_store.get(node_name)
if not data:
raise ValueError(f'{node_name} has no data')
node_name = self.state.pnts_to_writing.pop()
data = self.node_store.get(node_name)
if not data:
raise ValueError(f'{node_name} has no data')

self.zmq_manager.send_to_process([CommandType.WRITE_PNTS.value, node_name, data])
self.node_store.remove(node_name)
self.state.number_of_writing_jobs += 1
self.zmq_manager.send_to_process([CommandType.WRITE_PNTS.value, node_name, data])
self.node_store.remove(node_name)
self.state.number_of_writing_jobs += 1

def send_points_to_process(self, now):
if self.zmq_manager.can_queue_more_jobs():
potentials = sorted(
# a key (=task) can be in node_to_process and processing_nodes if the node isn't completely processed
[(k, v) for k, v in self.state.node_to_process.items() if k not in self.state.processing_nodes],
key=lambda f: -len(f[0]))

while self.zmq_manager.can_queue_more_jobs() and potentials:
target_count = 100_000
job_list = []
count = 0
idx = len(potentials) - 1
while count < target_count and idx >= 0:
name, (tasks, point_count) = potentials[idx]
count += point_count
job_list += [
name,
self.node_store.get(name),
struct.pack('>I', len(tasks)),
] + tasks
del potentials[idx]

del self.state.node_to_process[name]
self.state.processing_nodes[name] = (len(tasks), point_count, now)

if name in self.state.waiting_writing_nodes:
self.state.waiting_writing_nodes.pop(self.state.waiting_writing_nodes.index(name))
idx -= 1

if job_list:
self.zmq_manager.send_to_process([CommandType.PROCESS_JOBS.value] + job_list)
potentials = sorted(
# a key (=task) can be in node_to_process and processing_nodes if the node isn't completely processed
[(k, v) for k, v in self.state.node_to_process.items() if k not in self.state.processing_nodes],
key=lambda f: -len(f[0]))

while self.zmq_manager.can_queue_more_jobs() and potentials:
target_count = 100_000
job_list = []
count = 0
idx = len(potentials) - 1
while count < target_count and idx >= 0:
name, (tasks, point_count) = potentials[idx]
count += point_count
job_list += [
name,
self.node_store.get(name),
struct.pack('>I', len(tasks)),
] + tasks
del potentials[idx]

del self.state.node_to_process[name]
self.state.processing_nodes[name] = (len(tasks), point_count, now)

if name in self.state.waiting_writing_nodes:
self.state.waiting_writing_nodes.pop(self.state.waiting_writing_nodes.index(name))
idx -= 1

if job_list:
self.zmq_manager.send_to_process([CommandType.PROCESS_JOBS.value] + job_list)

def send_file_to_read(self):
while self.state.can_add_reading_jobs() and self.zmq_manager.can_queue_more_jobs():
if self.verbose >= 1:
print(f'Submit next portion {self.state.point_cloud_file_parts[-1]}')
file, portion = self.state.point_cloud_file_parts.pop()
self.state.points_in_progress += portion[1] - portion[0]

self.zmq_manager.send_to_process([CommandType.READ_FILE.value, pickle.dumps({
'filename': file,
'offset_scale': (
-self.avg_min,
self.root_scale,
self.rotation_matrix[:3, :3].T if self.rotation_matrix is not None else None,
self.infos['color_scale'].get(file) if self.infos['color_scale'] is not None else None,
),
'portion': portion,
})])

self.state.number_of_reading_jobs += 1
if self.verbose >= 1:
print(f'Submit next portion {self.state.point_cloud_file_parts[-1]}')
file, portion = self.state.point_cloud_file_parts.pop()
self.state.points_in_progress += portion[1] - portion[0]

self.zmq_manager.send_to_process([CommandType.READ_FILE.value, pickle.dumps({
'filename': file,
'offset_scale': (
-self.avg_min,
self.root_scale,
self.rotation_matrix[:3, :3].T if self.rotation_matrix is not None else None,
self.infos['color_scale'].get(file) if self.infos['color_scale'] is not None else None,
),
'portion': portion,
})])

self.state.number_of_reading_jobs += 1

def write_tileset(self):
# compute tile transform matrix
Expand Down

0 comments on commit 2638c31

Please sign in to comment.