Skip to content

Commit

Permalink
create the class Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Koyaani committed Jun 13, 2022
1 parent 53d6687 commit dbb32f0
Showing 1 changed file with 98 additions and 84 deletions.
182 changes: 98 additions & 84 deletions py3dtiles/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,96 +99,114 @@ def make_rotation_matrix(z1, z2):
vector_product(v0, v1))


def zmq_process(activity_graph, srs_out_wkt, srs_in_wkt, node_store, octree_metadata, folder, write_rgb, verbosity):
if srs_out_wkt:
crs_out = CRS(srs_out_wkt)
crs_in = CRS(srs_in_wkt)
transformer = Transformer.from_crs(crs_in, crs_out)
else:
transformer = None

context = zmq.Context()

# Socket to receive messages on
skt = context.socket(zmq.DEALER)
skt.connect(IPC_URI)
# Worker part
def zmq_process(*args):
process = Worker(*args)
process.run()


class Worker:
def __init__(self, activity_graph, transformer, octree_metadata, folder, write_rgb, verbosity):
self.activity_graph = activity_graph
self.transformer = transformer
self.octree_metadata = octree_metadata
self.folder = folder
self.write_rgb = write_rgb
self.verbosity = verbosity

# Socket to receive messages on
self.context = zmq.Context()
self.skt = self.context.socket(zmq.DEALER)

def run(self):
self.skt.connect(IPC_URI)

startup_time = time.time()
idle_time = 0

if self.activity_graph:
activity = open('activity.{}.csv'.format(os.getpid()), 'w')

# notify we're ready
self.skt.send_multipart([ResponseType.IDLE.value])

while True:
before = time.time() - startup_time
self.skt.poll()
after = time.time() - startup_time

idle_time += after - before

message = self.skt.recv_multipart()
content = message[1:]
command = content[0]

delta = time.time() - pickle.loads(message[0])
if delta > 0.01 and self.verbosity >= 1:
print('{} / {} : Delta time: {}'.format(os.getpid(), round(after, 2), round(delta, 3)))

if command == CommandType.READ_FILE.value:
self.execute_read_file(content)
command_type = 1
elif command == CommandType.PROCESS_JOBS.value:
self.execute_process_jobs(content)
command_type = 2
elif command == CommandType.WRITE_PNTS.value:
self.execute_write_pnts(content)
command_type = 3
elif command == CommandType.SHUTDOWN.value:
break # ack
else:
raise NotImplementedError(f'Unknown command {command}')

startup_time = time.time()
idle_time = 0
# notify we're idle
self.skt.send_multipart([ResponseType.IDLE.value])

if activity_graph:
activity = open('activity.{}.csv'.format(os.getpid()), 'w')
if self.activity_graph:
print(f'{before}, {command_type}', file=activity)
print(f'{before}, 0', file=activity)
print(f'{after}, 0', file=activity)
print(f'{after}, {command_type}', file=activity)

# notify we're ready
skt.send_multipart([ResponseType.IDLE.value])
if self.activity_graph:
activity.close()

while True:
before = time.time() - startup_time

skt.poll()

after = time.time() - startup_time

idle_time += after - before
message = skt.recv_multipart()
delta = time.time() - pickle.loads(message[0])
if delta > 0.01 and verbosity >= 1:
print('{} / {} : Delta time: {}'.format(os.getpid(), round(after, 2), round(delta, 3)))
content = message[1:]
command = content[0]

if command == CommandType.READ_FILE.value:
parameters = pickle.loads(content[1])
command_type = 1

ext = PurePath(parameters['filename']).suffix
init_reader_fn = las_reader.run if ext in ('.las', '.laz') else xyz_reader.run
init_reader_fn(
parameters['id'],
parameters['filename'],
parameters['offset_scale'],
parameters['portion'],
skt,
transformer,
verbosity
)
elif command == CommandType.WRITE_PNTS.value:
command_type = 3
pnts_writer.run(skt, content[2], content[1], folder, write_rgb)
elif command == CommandType.PROCESS_JOBS.value:
command_type = 2
result = node_process.run(
content[1:],
octree_metadata,
skt,
verbosity
if self.verbosity >= 1:
print('total: {} sec, idle: {}'.format(
round(time.time() - startup_time, 1),
round(idle_time, 1))
)
elif command == CommandType.SHUTDOWN.value:
break # ack
else:
raise NotImplementedError(f'Unknown command {command}')

# notify we're idle
skt.send_multipart([ResponseType.IDLE.value])
self.skt.send_multipart([ResponseType.HALTED.value])

if activity_graph:
print('{before}, {command_type}'.format(**locals()), file=activity)
print('{before}, 0'.format(**locals()), file=activity)
print('{after}, 0'.format(**locals()), file=activity)
print('{after}, {command_type}'.format(**locals()), file=activity)
def execute_read_file(self, content):
parameters = pickle.loads(content[1])

if activity_graph:
activity.close()
ext = PurePath(parameters['filename']).suffix
init_reader_fn = las_reader.run if ext in ('.las', '.laz') else xyz_reader.run
init_reader_fn(
parameters['id'],
parameters['filename'],
parameters['offset_scale'],
parameters['portion'],
self.skt,
self.transformer,
self.verbosity
)

if verbosity >= 1:
print('total: {} sec, idle: {}'.format(
round(time.time() - startup_time, 1),
round(idle_time, 1)))
def execute_write_pnts(self, content):
pnts_writer.run(self.skt, content[2], content[1], self.folder, self.write_rgb)

skt.send_multipart([ResponseType.HALTED.value])
def execute_process_jobs(self, content):
node_process.run(
content[1:],
self.octree_metadata,
self.skt,
self.verbosity
)


def zmq_send_to_process(idle_clients, socket, message):
def zmq_send_to_process(idle_clients, socket, message): # to put in class
if not idle_clients:
raise ValueError("idle_clients is empty")
socket.send_multipart([idle_clients.pop(), pickle.dumps(time.time())] + message)
Expand Down Expand Up @@ -323,8 +341,7 @@ def convert(files,
avg_min = infos['avg_min']
rotation_matrix = None
# srs stuff
srs_out_wkt = None
srs_in_wkt = None
transformer = None
if srs_out:
crs_out = CRS('epsg:{}'.format(srs_out))
if srs_in:
Expand All @@ -334,9 +351,6 @@ def convert(files,
else:
crs_in = CRS(infos['srs_in'])

srs_out_wkt = crs_out.to_wkt()
srs_in_wkt = crs_in.to_wkt()

transformer = Transformer.from_crs(crs_in, crs_out)

bl = np.array(list(transformer.transform(
Expand Down Expand Up @@ -441,7 +455,7 @@ def convert(files,
zmq_processes = [multiprocessing.Process(
target=zmq_process,
args=(
graph, srs_out_wkt, srs_in_wkt, node_store, octree_metadata, outfolder, rgb, verbose)) for i in range(jobs)]
graph, transformer, octree_metadata, outfolder, rgb, verbose)) for _ in range(jobs)]

for p in zmq_processes:
p.start()
Expand Down

0 comments on commit dbb32f0

Please sign in to comment.