/
read_write_pubsub.go
158 lines (140 loc) · 6 KB
/
read_write_pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// read_write_pubsub is a pipeline example using the fhirio connector to read
// FHIR resources from GCS, write them to a GCP FHIR store, and, if a PubSub
// topic is provided, read the written resources from the FHIR store and log them
// based on the PubSub notifications about store updates.
//
// Pre-requisites:
// 1. NDJSON-encoded FHIR resources stored in GCS.
// 2. Dataflow Runner enabled: https://cloud.google.com/dataflow/docs/quickstarts.
// 3. A Google Cloud FHIR store. Optionally, PubSub notifications set up on the store.
// (see: https://cloud.google.com/healthcare-api/docs/concepts/pubsub).
//
// Running this pipeline requires providing a fully qualified GCS address
// (potentially containing wildcards) to where your FHIR resources are stored, a
// path to the FHIR store where the resources should be written to, and,
// optionally, the PubSub topic name your FHIR store is sending notifications to,
// in addition to the usual flags for the Dataflow runner.
//
// An example command for executing this pipeline on GCP is as follows:
//
// export PROJECT="$(gcloud config get-value project)"
// export TEMP_LOCATION="gs://MY-BUCKET/temp"
// export STAGING_LOCATION="gs://MY-BUCKET/staging"
// export REGION="us-central1"
// export SOURCE_GCS_LOCATION="gs://MY_BUCKET/path/to/resources/**"
// export FHIR_STORE_PATH="MY_FHIR_STORE_PATH"
// export PUBSUB_TOPIC="MY_FHIR_STORE_TOPIC"
// cd ./sdks/go
// go run ./examples/fhirio/read_write_pubsub/read_write_pubsub.go \
// --runner=dataflow \
// --temp_location=$TEMP_LOCATION \
// --staging_location=$STAGING_LOCATION \
// --project=$PROJECT \
// --region=$REGION \
// --worker_harness_container_image=apache/beam_go_sdk:latest \
// --sourceGcsLocation=$SOURCE_GCS_LOCATION \
// --fhirStore=$FHIR_STORE_PATH \
// --pubsubTopic=$PUBSUB_TOPIC
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"strings"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fhirio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
// Required flag with the source directory for GCS files to read, including
// wildcards. Directory should contain the resources files in NDJSON format.
sourceGcsLocation = flag.String("sourceGcsLocation", "", "The source directory for GCS files to read, including wildcards.")
// Required flag with target FHIR store to write data to, must be of the full format:
// "projects/project_id/locations/location/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID"
fhirStore = flag.String("fhirStore", "", "The target FHIR Store to write data to, must be of the full format.")
// Optional flag with the pubsub topic of your FHIR store to read and log upon store updates.
pubsubTopic = flag.String("pubsubTopic", "", "The PubSub topic to listen to.")
)
func init() {
register.Function1x1[string, string](WrapInBundle)
register.DoFn2x0[context.Context, string](&LoggerFn{})
}
// WrapInBundle takes a FHIR resource string and wraps it as a Bundle resource.
// Useful so we can publish the given resource through ExecuteBundles.
func WrapInBundle(resource string) string {
var r struct {
ResourceType string `json:"resourceType"`
}
json.NewDecoder(strings.NewReader(resource)).Decode(&r)
return fmt.Sprintf(`{
"resourceType": "Bundle",
"type": "batch",
"entry": [
{
"request": {
"method": "POST",
"url": "%s"
},
"resource": %s
}
]
}`, r.ResourceType, resource)
}
// LoggerFn is a helper DoFn to log elements received.
type LoggerFn struct {
LogPrefix string
}
// ProcessElement logs each element it receives.
func (fn *LoggerFn) ProcessElement(ctx context.Context, elm string) {
log.Infof(ctx, "%s: %v", fn.LogPrefix, elm)
}
// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LoggerFn) FinishBundle() {
time.Sleep(2 * time.Second)
}
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
// Read resources from GCS.
resourcesInGcs := textio.Read(s, *sourceGcsLocation)
resourceBundles := beam.ParDo(s, WrapInBundle, resourcesInGcs)
// Write resources to store.
_, failedWritesErrorMessage := fhirio.ExecuteBundles(s, *fhirStore, resourceBundles)
beam.ParDo0(s, &LoggerFn{"Failed Write"}, failedWritesErrorMessage)
if *pubsubTopic != "" {
// PubSub notifications will be emitted containing the path of the resource once
// it is written to the store. Simultaneously read notifications and resources
// from PubSub and store, respectively.
resourceNotifications := pubsubio.Read(s, *gcpopts.Project, *pubsubTopic, nil)
resourcesInFhirStore, deadLetters := fhirio.Read(s, resourceNotifications)
// Log the read resources or read errors to the server.
beam.ParDo0(s, &LoggerFn{"Read Resource"}, resourcesInFhirStore)
beam.ParDo0(s, &LoggerFn{"Got Dead Letter"}, deadLetters)
}
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}