Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize triangulation #619

Merged
merged 8 commits into from
Apr 14, 2023
Merged

Parallelize triangulation #619

merged 8 commits into from
Apr 14, 2023

Conversation

travisdriver
Copy link
Collaborator

@travisdriver travisdriver commented Apr 11, 2023

Running on my machine with 8 workers for door-12, runtime goes from 1.24 minutes without parallelism to 1.03 with parallelism. I would expect larger datasets to have even more significant runtime reductions.

Copy link
Collaborator

@akshay-krishnan akshay-krishnan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for a better speedup and to run on bigger datasets we will need to batch this. Instead of 1 delayed operation per track we would need 1 delayed operation for a batch of tracks (say 10-50). each delayed operation adds some latency/overhead on dask, and we want to make sure the added overhead is << time for each batch triangulation.

# Loop through tracks and and generate delayed triangulation tasks.
triangulation_results = []
for track_2d in tracks_2d:
triangulation_results.append(dask.delayed(point3d_initializer.triangulate)(track_2d))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do an nout=3 in dask.delayed, and fill in sfm_tracks, avg_track_reproj_errors, triangulation_exit_codes here instead of one more loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprisingly, separating the variables before calling compute is slower. The following code took 1.17 minutes:

def run_triangulation(
        self,
        cameras: Dict[int, gtsfm_types.CAMERA_TYPE],
        tracks_2d: List[SfmTrack2d],
    ) -> Tuple[List[Delayed], List[Delayed], List[Delayed]]:
        """Performs triangulation of 2D tracks in parallel.

        Ref: https://docs.dask.org/en/stable/delayed-best-practices.html#compute-on-lots-of-computation-at-once

        Args:
            cameras: list of cameras wrapped up as Delayed.
            tracks_2d: list of tracks wrapped up as Delayed.

        Returns:
            sfm_tracks: List of triangulated tracks.
            avg_track_repoj_errors: List of average reprojection errors per track.
            triangulation_exit_codes: exit codes for each triangulation call.
        """
        # Initialize 3D landmark for each track
        point3d_initializer = Point3dInitializer(cameras, self.triangulation_options)

        # Loop through tracks and and generate delayed triangulation tasks.
        delayed_sfm_tracks, delayed_avg_track_reproj_errors, delayed_triangulation_exit_codes = [], [], []
        for track_2d in tracks_2d:
            delayed_sfm_track, delayed_avg_track_reproj_error, delayed_triangulation_exit_code = dask.delayed(
                point3d_initializer.triangulate, nout=3
            )(track_2d)
            delayed_sfm_tracks.append(delayed_sfm_track)
            delayed_avg_track_reproj_errors.append(delayed_avg_track_reproj_error)
            delayed_triangulation_exit_codes.append(delayed_triangulation_exit_code)

        # Perform triangulation in parallel.
        sfm_tracks, avg_track_reproj_errors, triangulation_exit_codes = dask.compute(
            delayed_sfm_tracks, delayed_avg_track_reproj_errors, delayed_triangulation_exit_codes
        )

        return sfm_tracks, avg_track_reproj_errors, triangulation_exit_codes

Maybe the scheduler isn't intelligent enough to realize the three inputs are from the same function call?

gtsfm/data_association/data_assoc.py Show resolved Hide resolved
@akshay-krishnan
Copy link
Collaborator

Let me know what you think of batching, it should be easy to implement. Once you have that I can run on larger datasets. We also want to make sure it works on the cluster.

@travisdriver
Copy link
Collaborator Author

I think for a better speedup and to run on bigger datasets we will need to batch this. Instead of 1 delayed operation per track we would need 1 delayed operation for a batch of tracks (say 10-50). each delayed operation adds some latency/overhead on dask, and we want to make sure the added overhead is << time for each batch triangulation.

Great suggestion. I added the MAX_DELAYED_TRIANGULATION_CALLS variable to decide the batch size, i.e., batch_size = int(np.ceil(len(tracks_2d) / MAX_DELAYED_TRIANGULATION_CALLS)). Setting max calls to 1e4 runs in 1.02 minutes, and 1e3 and 1e2 max calls runs in 0.99 minutes. I kept the default value at 1e3.

@@ -196,6 +204,68 @@ def run(

return connected_data, data_assoc_metrics

def run_triangulation(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between run_triangulation() and run_da()? is one parallelized, and the other is not?

Technically, neither does any data association, since we broke DSF into another module, so can we use a different name than run_da()? (I know the module is mis-named currently)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's still associating landmarks to keypoints. The class is also called DataAssociation, so we'd need to change the whole class name as well. Maybe we can change the naming conventions in another PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yeah I suppose it depends a bit on our definition of data association. Agreed it's orthogonal to this PR, but I still am a bit lost by the function naming, for the new function we're adding. Do we mean run_da vs. run_da_batched? or run_triangulation vs. run_triangulation_batched?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's one definition of data association from Szeliski, that would suggest we rename this module:

In SFM, “data association” is the problem of determining correspondences, either between feature points or whole images. In the case of whole images, it can be seen as the validity of hypotheses that image pairs contain
sets of matching features corresponding to the same 3D points.

http://szeliski.org/papers/Roberts_DuplicateSfM_CVPR11.pdf

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but of course module renaming can be for the future)

Copy link
Collaborator Author

@travisdriver travisdriver Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batching is only enforced for triangulation when the MAX_DELAYED_TRIANGULATION_CALLS threshold is violated. I think a simple if-else statement is sufficient as opposed to a separate run function.

I think the function naming is straightforward. run_triangulation triangulates the 2D tracks. run_da performs the DA step (or at least what we were previously referring to as DA), i.e., validating the triangulated tracks and building the input to BA.

Yes, data association is the process of determining correspondences. This module still plays a role in determining correspondences, as we filter correspondences based on the triangulation geometry.

You're more than welcome to rename this module to whatever you'd like in a separate PR :)

@travisdriver
Copy link
Collaborator Author

@stepanyanhayk Would you mind running this on the cluster and letting us know whether or not it fixes the DA runtime issues?

# Unpack results.
sfm_tracks, avg_track_reproj_errors, triangulation_exit_codes = [], [], []
if batch_size == 1:
for result in triangulation_results:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be more readable as

for (sfm_track, avg_track_reproj_error,exit_code) in triangulation_results:
  sfm_tracks.append(sfm_track)
  avg_track_reproj_errors.append(avg_track_reproj_error)
  triangulation_exit_codes.append(exit_code)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -115,9 +122,12 @@ def run(
exit_codes_wrt_computed: List[TriangulationExitCode] = []
per_accepted_track_avg_errors = []
per_rejected_track_avg_errors = []
for track_2d in tracks_2d:
assert len(tracks_2d) == len(sfm_tracks)
for j in range(len(tracks_2d)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we don't triangulate anymore here, I believe the comment below needs to be updated

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cameras_gt: List[Optional[gtsfm_types.CALIBRATION_TYPE]],
relative_pose_priors: Dict[Tuple[int, int], Optional[PosePrior]],
images: Optional[List[Image]] = None,
) -> Tuple[GtsfmData, GtsfmMetricsGroup]:
"""Perform the data association.
"""Perform the data association and compute metrics.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't triangulate here, and don't compute correspondences here, I propose we totally rename this method to compute_da_metrics().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and edit the docstring also to remove the mention of "Perform the data association", since all we do is compute metrics here know if I understand correctly)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reading this, looks like we do also validate the tracks here, and compute a largest connected component. How about we consider renaming this as

def validate_tracks_and_compute_metrics()

or

def validate_triangulated_tracks_and_compute_metrics()
  """ """

or

def assemble_gtsfm_data_from_tracks()

How about this as a docstring:
"""Assembles GtsfmData, validates triangulated tracks, computes metrics.

Only the largest connected component of cameras, represented in 3d tracks, is retained.
"""

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are good suggestions, but again I think it makes more sense to save this for another PR in which you can refactor the module names and functions to your liking.


Args:
num_images: Number of images in the scene.
cameras: dictionary, with image index -> camera mapping.
tracks_2d: list of 2D tracks.
sfm_tracks: List of triangulated tracks.
avg_track_repoj_errors: List of average reprojection errors per track.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo on "reproj errors"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@johnwlambert johnwlambert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Travis!

@stepanyanhayk
Copy link
Collaborator

stepanyanhayk commented Apr 14, 2023

Hi @travisdriver,

Thanks for this PR. I run your changes against the master on the cluster and it is much faster!

Skydio-32, deep_front_end, total wall time:

  • Master: 1.78m
  • This branch: 1.21m

Also, here is a screenshot from the dask dashboard demonstrating that the tasks are indeed distributed between all the machines.

Screenshot 2023-04-14 at 12 42 45 AM

Copy link
Collaborator

@akshay-krishnan akshay-krishnan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is covered by unit tests, please check artifacts for at least one dataset before merging, just to be sure nothing breaks.

@travisdriver
Copy link
Collaborator Author

not sure if this is covered by unit tests, please check artifacts for at least one dataset before merging, just to be sure nothing breaks.

newplot (22)

visual_comparison_dashboard.zip

@travisdriver travisdriver merged commit d81eeb5 into master Apr 14, 2023
@travisdriver travisdriver deleted the parallel-triang branch April 14, 2023 20:06
@akshay-krishnan
Copy link
Collaborator

Are all differences in that visualization above coming from randomness? Or did we change anything in this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants