-
Notifications
You must be signed in to change notification settings - Fork 5.7k
/
CurrentConditions.java
111 lines (97 loc) · 4.32 KB
/
CurrentConditions.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.training.dataanalyst.sandiego;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
/**
* A dataflow pipeline that pulls from Pub/Sub and writes to BigQuery
*
* @author vlakshmanan
*
*/
@SuppressWarnings("serial")
public class CurrentConditions {
public static interface MyOptions extends DataflowPipelineOptions {
@Description("Also stream to Bigtable?")
@Default.Boolean(false)
boolean getBigtable();
void setBigtable(boolean b);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
String topic = "projects/" + options.getProject() + "/topics/sandiego";
String currConditionsTable = options.getProject() + ":demos.current_conditions";
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("latitude").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("longitude").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("highway").setType("STRING"));
fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
fields.add(new TableFieldSchema().setName("lane").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("speed").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("sensorId").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
PCollection<LaneInfo> currentConditions = p //
.apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
.apply("ExtractData", ParDo.of(new DoFn<String, LaneInfo>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
c.output(LaneInfo.newLaneInfo(line));
}
}));
if (options.getBigtable()) {
BigtableHelper.writeToBigtable(currentConditions, options);
}
currentConditions.apply("ToBQRow", ParDo.of(new DoFn<LaneInfo, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
LaneInfo info = c.element();
row.set("timestamp", info.getTimestamp());
row.set("latitude", info.getLatitude());
row.set("longitude", info.getLongitude());
row.set("highway", info.getHighway());
row.set("direction", info.getDirection());
row.set("lane", info.getLane());
row.set("speed", info.getSpeed());
row.set("sensorId", info.getSensorKey());
c.output(row);
}
})) //
.apply(BigQueryIO.writeTableRows().to(currConditionsTable)//
.withSchema(schema)//
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
}
}