Skip to content

Commit

Permalink
Merge pull request #11771: [BEAM-10052] check hash and avoid duplicat…
Browse files Browse the repository at this point in the history
…ed artifacts
  • Loading branch information
chamikaramj committed May 21, 2020
2 parents 1560e02 + b8e582f commit 7cc1a7d
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Expand Up @@ -590,6 +590,7 @@ def _stage_resources(self, pipeline, options):
raise RuntimeError('The --temp_location option must be specified.')

resources = []
hashs = {}
for _, env in sorted(pipeline.components.environments.items(),
key=lambda kv: kv[0]):
for dep in env.dependencies:
Expand All @@ -602,7 +603,16 @@ def _stage_resources(self, pipeline, options):
role_payload = (
beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
dep.role_payload))
resources.append((type_payload.path, role_payload.staged_name))
if type_payload.sha256 and type_payload.sha256 in hashs:
_LOGGER.info(
'Found duplicated artifact: %s (%s)',
type_payload.path,
type_payload.sha256)
dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
staged_name=hashs[type_payload.sha256]).SerializeToString()
else:
resources.append((type_payload.path, role_payload.staged_name))
hashs[type_payload.sha256] = role_payload.staged_name

resource_stager = _LegacyDataflowStager(self)
staged_resources = resource_stager.stage_job_resources(
Expand Down

0 comments on commit 7cc1a7d

Please sign in to comment.