-
Notifications
You must be signed in to change notification settings - Fork 26
/
StreamingChangesetMetadataUpdater.scala
124 lines (110 loc) · 4.03 KB
/
StreamingChangesetMetadataUpdater.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package osmesa.apps.streaming
import java.net.URI
import cats.implicits._
import com.monovore.decline._
import org.apache.spark.sql._
import osmesa.analytics.Analytics
import osmesa.analytics.stats.ChangesetMetadataForeachWriter
import vectorpipe.functions._
import vectorpipe.functions.osm._
import vectorpipe.sources.Source
/*
* Usage example:
*
* sbt "project apps" assembly
*
* spark-submit \
* --class osmesa.apps.streaming.StreamingChangesetMetadataUpdater \
* ingest/target/scala-2.11/osmesa-apps.jar \
* --database-url $DATABASE_URL
*/
object StreamingChangesetMetadataUpdater
extends CommandApp(
name = "osmesa-changeset-stream-processor",
header = "Update statistics from changeset replication stream",
main = {
val changesetSourceOpt =
Opts
.option[URI]("changeset-source",
short = "c",
metavar = "uri",
help = "Location of changesets to process")
.withDefault(new URI("https://planet.osm.org/replication/changesets/"))
val databaseUrlOpt =
Opts
.option[URI](
"database-url",
short = "d",
metavar = "database URL",
help = "Database URL (default: $DATABASE_URL environment variable)"
)
.orElse(Opts.env[URI]("DATABASE_URL", help = "The URL of the database"))
val startSequenceOpt =
Opts
.option[Int](
"start-sequence",
short = "s",
metavar = "sequence",
help = "Starting sequence. If absent, the current (remote) sequence will be used."
)
.orNone
val endSequenceOpt =
Opts
.option[Int](
"end-sequence",
short = "e",
metavar = "sequence",
help = "Ending sequence. If absent, this will be an infinite stream."
)
.orNone
val batchSizeOpt = Opts
.option[Int]("batch-size",
short = "b",
metavar = "batch size",
help = "Change batch size.")
.orNone
(changesetSourceOpt, databaseUrlOpt, startSequenceOpt, endSequenceOpt, batchSizeOpt).mapN {
(changesetSource, databaseUrl, startSequence, endSequence, batchSize) =>
implicit val ss: SparkSession =
Analytics.sparkSession("StreamingChangesetMetadataUpdater")
import ss.implicits._
val options = Map(
Source.BaseURI -> changesetSource.toString,
Source.DatabaseURI -> databaseUrl.toString,
Source.ProcessName -> "ChangesetMetadataUpdater"
) ++
startSequence
.map(s => Map(Source.StartSequence -> s.toString))
.getOrElse(Map.empty) ++
endSequence
.map(s => Map(Source.EndSequence -> s.toString))
.getOrElse(Map.empty) ++
batchSize
.map(x => Map(Source.BatchSize -> x.toString))
.getOrElse(Map.empty)
val changesets =
ss.readStream
.format(Source.Changesets)
.options(options)
.load
val changesetProcessor = changesets
.select(
'id,
'createdAt,
'closedAt,
'user,
'uid,
'tags.getField("created_by") as 'editor,
merge_sets(hashtags('tags.getField("comment")),
hashtags('tags.getField("hashtags"))) as 'hashtags
)
.writeStream
.queryName("update changeset metadata")
.foreach(new ChangesetMetadataForeachWriter(databaseUrl,
shouldUpdateUsernames = true))
.start
changesetProcessor.awaitTermination()
ss.stop()
}
}
)