Skip to content

Commit

Permalink
stageflow skeleton for PA+A&A
Browse files Browse the repository at this point in the history
Summary:
## What

- Create a "PA for PD" stageflow for running PA with the anonymization data prep stage
- Run E2E test to validate new stageflow can be called from EP

## Why

- As part of the Q1 PD "orchestration of workflows" prototype, I am creating some skeletons inside of PCS such that the PCF team would be able to integrate with PCS with minimal assistance from the PCA and/or PSI teams (assuming MPC infra is used in the future)
- PD "orchestration of workflows" doc: https://docs.google.com/document/d/1Nnn3rrXB1PokGl8a_fhB2JulXbLyHxpZe3-fPGqJO30/edit

Reviewed By: adshastri

Differential Revision:
D44101237

Privacy Context Container: L416713

fbshipit-source-id: eebd62ce499cae80571479bc23d98aa6d82ab4dc
  • Loading branch information
jrodal98 authored and facebook-github-bot committed Mar 17, 2023
1 parent 9658a7b commit 9b24415
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 0 deletions.
1 change: 1 addition & 0 deletions fbpcs/private_computation/stage_flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"private_computation_private_id_dfca_local_test_stage_flow",
"private_computation_private_id_dfca_stage_flow",
"private_computation_pid_continuous_measurement_stage_flow",
"private_computation_pa_for_pd_stage_flow",
]

from . import * # noqa: ignore=F403
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from fbpcs.private_computation.entity.private_computation_status import (
PrivateComputationInstanceStatus,
)
from fbpcs.private_computation.service.anonymization_data_prep_stage_service import (
AnonymizationDataPrepStageService,
)
from fbpcs.private_computation.service.constants import DEFAULT_RUN_PID_TIMEOUT_IN_SEC
from fbpcs.private_computation.service.pcf2_aggregation_stage_service import (
PCF2AggregationStageService,
)
from fbpcs.private_computation.service.pcf2_attribution_stage_service import (
PCF2AttributionStageService,
)
from fbpcs.private_computation.service.private_computation_stage_service import (
PrivateComputationStageService,
PrivateComputationStageServiceArgs,
)
from fbpcs.private_computation.stage_flows.private_computation_base_stage_flow import (
PrivateComputationBaseStageFlow,
PrivateComputationStageFlowData,
)


class PrivateComputationPAforPDStageFlow(PrivateComputationBaseStageFlow):
"""
This enum lists all of the supported stage types and maps to their possible statuses.
It also provides methods to get information about the next or previous stage.
This should only be used to run PA with ad-hoc advertisers (in preparation for
the PD anonymizer game)
"""

# Specifies the order of the stages. Don't change this unless you know what you are doing.
# pyre-fixme[15]: `_order_` overrides attribute defined in `Enum` inconsistently.
_order_ = "CREATED PC_PRE_VALIDATION PID_SHARD PID_PREPARE ID_MATCH ID_MATCH_POST_PROCESS ID_SPINE_COMBINER RESHARD PCF2_ATTRIBUTION PCF2_AGGREGATION ANONYMIZATION_DATA_PREP"
# Regarding typing fixme above, Pyre appears to be wrong on this one. _order_ only appears in the EnumMeta metaclass __new__ method
# and is not actually added as a variable on the enum class. I think this is why pyre gets confused.

CREATED = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.CREATION_INITIALIZED,
started_status=PrivateComputationInstanceStatus.CREATION_STARTED,
completed_status=PrivateComputationInstanceStatus.CREATED,
failed_status=PrivateComputationInstanceStatus.CREATION_FAILED,
is_joint_stage=False,
)
PC_PRE_VALIDATION = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.PC_PRE_VALIDATION_INITIALIZED,
started_status=PrivateComputationInstanceStatus.PC_PRE_VALIDATION_STARTED,
completed_status=PrivateComputationInstanceStatus.PC_PRE_VALIDATION_COMPLETED,
failed_status=PrivateComputationInstanceStatus.PC_PRE_VALIDATION_FAILED,
is_joint_stage=False,
)
PID_SHARD = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.PID_SHARD_INITIALIZED,
started_status=PrivateComputationInstanceStatus.PID_SHARD_STARTED,
completed_status=PrivateComputationInstanceStatus.PID_SHARD_COMPLETED,
failed_status=PrivateComputationInstanceStatus.PID_SHARD_FAILED,
is_joint_stage=False,
)
PID_PREPARE = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.PID_PREPARE_INITIALIZED,
started_status=PrivateComputationInstanceStatus.PID_PREPARE_STARTED,
completed_status=PrivateComputationInstanceStatus.PID_PREPARE_COMPLETED,
failed_status=PrivateComputationInstanceStatus.PID_PREPARE_FAILED,
is_joint_stage=False,
)
ID_MATCH = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.ID_MATCHING_INITIALIZED,
started_status=PrivateComputationInstanceStatus.ID_MATCHING_STARTED,
completed_status=PrivateComputationInstanceStatus.ID_MATCHING_COMPLETED,
failed_status=PrivateComputationInstanceStatus.ID_MATCHING_FAILED,
is_joint_stage=True,
is_retryable=True,
timeout=DEFAULT_RUN_PID_TIMEOUT_IN_SEC,
)
ID_MATCH_POST_PROCESS = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.ID_MATCHING_POST_PROCESS_INITIALIZED,
started_status=PrivateComputationInstanceStatus.ID_MATCHING_POST_PROCESS_STARTED,
completed_status=PrivateComputationInstanceStatus.ID_MATCHING_POST_PROCESS_COMPLETED,
failed_status=PrivateComputationInstanceStatus.ID_MATCHING_POST_PROCESS_FAILED,
is_joint_stage=False,
)
ID_SPINE_COMBINER = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.ID_SPINE_COMBINER_INITIALIZED,
started_status=PrivateComputationInstanceStatus.ID_SPINE_COMBINER_STARTED,
completed_status=PrivateComputationInstanceStatus.ID_SPINE_COMBINER_COMPLETED,
failed_status=PrivateComputationInstanceStatus.ID_SPINE_COMBINER_FAILED,
is_joint_stage=False,
)
RESHARD = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.RESHARD_INITIALIZED,
started_status=PrivateComputationInstanceStatus.RESHARD_STARTED,
completed_status=PrivateComputationInstanceStatus.RESHARD_COMPLETED,
failed_status=PrivateComputationInstanceStatus.RESHARD_FAILED,
is_joint_stage=False,
)
PCF2_ATTRIBUTION = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.PCF2_ATTRIBUTION_INITIALIZED,
started_status=PrivateComputationInstanceStatus.PCF2_ATTRIBUTION_STARTED,
completed_status=PrivateComputationInstanceStatus.PCF2_ATTRIBUTION_COMPLETED,
failed_status=PrivateComputationInstanceStatus.PCF2_ATTRIBUTION_FAILED,
is_joint_stage=True,
)
PCF2_AGGREGATION = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.PCF2_AGGREGATION_INITIALIZED,
started_status=PrivateComputationInstanceStatus.PCF2_AGGREGATION_STARTED,
completed_status=PrivateComputationInstanceStatus.PCF2_AGGREGATION_COMPLETED,
failed_status=PrivateComputationInstanceStatus.PCF2_AGGREGATION_FAILED,
is_joint_stage=True,
)
ANONYMIZATION_DATA_PREP = PrivateComputationStageFlowData(
initialized_status=PrivateComputationInstanceStatus.ANONYMIZATION_DATA_PREP_INITIALIZED,
started_status=PrivateComputationInstanceStatus.ANONYMIZATION_DATA_PREP_STARTED,
completed_status=PrivateComputationInstanceStatus.ANONYMIZATION_DATA_PREP_COMPLETED,
failed_status=PrivateComputationInstanceStatus.ANONYMIZATION_DATA_PREP_FAILED,
is_joint_stage=True,
)

def get_stage_service(
self, args: PrivateComputationStageServiceArgs
) -> PrivateComputationStageService:
"""
Maps PrivateComputationStageFlow instances to StageService instances
Arguments:
args: Common arguments initialized in PrivateComputationService that are consumed by stage services
Returns:
An instantiated StageService object corresponding to the StageFlow enum member caller.
Raises:
NotImplementedError: The subclass doesn't implement a stage service for a given StageFlow enum member
"""
if self is self.PCF2_ATTRIBUTION:
return PCF2AttributionStageService(
args.onedocker_binary_config_map,
args.mpc_svc,
)
elif self is self.PCF2_AGGREGATION:
return PCF2AggregationStageService(
args.onedocker_binary_config_map,
args.mpc_svc,
)
elif self is self.ANONYMIZATION_DATA_PREP:
return AnonymizationDataPrepStageService(
args.onedocker_svc, args.onedocker_binary_config_map
)
else:
return self.get_default_stage_service(args)

0 comments on commit 9b24415

Please sign in to comment.