/
clone.go
59 lines (53 loc) · 2.44 KB
/
clone.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipelinex
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
)
func shallowClonePipeline(p *pipepb.Pipeline) *pipepb.Pipeline {
ret := &pipepb.Pipeline{
Components: shallowCloneComponents(p.GetComponents()),
Requirements: reflectx.ShallowClone(p.GetRequirements()).([]string),
}
ret.RootTransformIds, _ = reflectx.ShallowClone(p.GetRootTransformIds()).([]string)
return ret
}
func shallowCloneComponents(comp *pipepb.Components) *pipepb.Components {
ret := &pipepb.Components{}
ret.Transforms, _ = reflectx.ShallowClone(comp.GetTransforms()).(map[string]*pipepb.PTransform)
ret.Pcollections, _ = reflectx.ShallowClone(comp.GetPcollections()).(map[string]*pipepb.PCollection)
ret.WindowingStrategies, _ = reflectx.ShallowClone(comp.GetWindowingStrategies()).(map[string]*pipepb.WindowingStrategy)
ret.Coders, _ = reflectx.ShallowClone(comp.GetCoders()).(map[string]*pipepb.Coder)
ret.Environments, _ = reflectx.ShallowClone(comp.GetEnvironments()).(map[string]*pipepb.Environment)
return ret
}
// ShallowClonePTransform makes a shallow copy of the given PTransform.
func ShallowClonePTransform(t *pipepb.PTransform) *pipepb.PTransform {
if t == nil {
return nil
}
ret := &pipepb.PTransform{
UniqueName: t.UniqueName,
Spec: t.Spec,
DisplayData: t.DisplayData,
Annotations: t.Annotations,
}
ret.Subtransforms, _ = reflectx.ShallowClone(t.Subtransforms).([]string)
ret.Inputs, _ = reflectx.ShallowClone(t.Inputs).(map[string]string)
ret.Outputs, _ = reflectx.ShallowClone(t.Outputs).(map[string]string)
ret.EnvironmentId = t.EnvironmentId
return ret
}