-
Notifications
You must be signed in to change notification settings - Fork 26
/
UserFootprintUpdater.scala
124 lines (108 loc) · 3.95 KB
/
UserFootprintUpdater.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package osmesa.apps.streaming
import java.io._
import java.net.URI
import cats.implicits._
import com.monovore.decline._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.locationtech.geomesa.spark.jts._
import osmesa.analytics.{Analytics, Footprints}
import vectorpipe.sources.Source
/*
* Usage example:
*
* sbt "project apps" assembly
*
* spark-submit \
* --class osmesa.apps.streaming.UserFootprintUpdater \
* ingest/target/scala-2.11/osmesa-apps.jar
*/
object UserFootprintUpdater
extends CommandApp(
name = "osmesa-user-footprint-updater",
header = "Consume minutely diffs to update user footprint MVTs",
main = {
val changeSourceOpt = Opts
.option[URI]("change-source",
short = "d",
metavar = "uri",
help = "Location of minutely diffs to process")
.withDefault(new URI("https://planet.osm.org/replication/minute/"))
val startSequenceOpt = Opts
.option[Int](
"start-sequence",
short = "s",
metavar = "sequence",
help =
"Minutely diff starting sequence. If absent, the current (remote) sequence will be used.")
.orNone
val endSequenceOpt = Opts
.option[Int](
"end-sequence",
short = "e",
metavar = "sequence",
help =
"Minutely diff ending sequence. If absent, the current (remote) sequence will be used.")
.orNone
val partitionCountOpt = Opts
.option[Int]("partition-count",
short = "p",
metavar = "partition count",
help = "Change partition count.")
.orNone
val tileSourceOpt = Opts
.option[URI](
"tile-source",
short = "t",
metavar = "uri",
help = "URI prefix for vector tiles to update"
)
.withDefault(new File("").toURI)
val concurrentUploadsOpt = Opts
.option[Int]("concurrent-uploads",
short = "c",
metavar = "concurrent uploads",
help = "Set the number of concurrent uploads.")
.orNone
(changeSourceOpt,
startSequenceOpt,
endSequenceOpt,
partitionCountOpt,
tileSourceOpt,
concurrentUploadsOpt).mapN {
(changeSource,
startSequence,
endSequence,
partitionCount,
tileSource,
_concurrentUploads) =>
val AppName = "UserFootprintUpdater"
val spark: SparkSession = Analytics.sparkSession(AppName)
import spark.implicits._
implicit val concurrentUploads: Option[Int] = _concurrentUploads
spark.withJTS
val changeOptions = Map(Source.BaseURI -> changeSource.toString) ++
startSequence
.map(x => Map(Source.StartSequence -> x.toString))
.getOrElse(Map.empty) ++
endSequence
.map(x => Map(Source.EndSequence -> x.toString))
.getOrElse(Map.empty) ++
partitionCount
.map(x => Map(Source.PartitionCount -> x.toString))
.getOrElse(Map.empty)
val changes = spark.read
.format(Source.Changes)
.options(changeOptions)
.load
val changedNodes = changes
.where('type === "node" and 'lat.isNotNull and 'lon.isNotNull)
.select('sequence, 'uid as 'key, st_makePoint('lon, 'lat) as 'geom)
val tiledNodes =
Footprints.update(changedNodes, tileSource)
val lastSequence =
changedNodes.select(max('sequence) as 'sequence).first.getAs[Int]("sequence")
println(s"${tiledNodes.count} tiles updated to ${lastSequence}.")
}
}
)