/
progress.scala
280 lines (245 loc) · 11.5 KB
/
progress.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming
import java.{util => ju}
import java.lang.{Long => JLong}
import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
/**
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
*/
@Evolving
class StateOperatorProgress private[sql](
val operatorName: String,
val numRowsTotal: Long,
val numRowsUpdated: Long,
val allUpdatesTimeMs: Long,
val numRowsRemoved: Long,
val allRemovalsTimeMs: Long,
val commitTimeMs: Long,
val memoryUsedBytes: Long,
val numRowsDroppedByWatermark: Long,
val numShufflePartitions: Long,
val numStateStoreInstances: Long,
val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
) extends Serializable {
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
private[sql] def copy(
newNumRowsUpdated: Long,
newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
new StateOperatorProgress(
operatorName = operatorName, numRowsTotal = numRowsTotal, numRowsUpdated = newNumRowsUpdated,
allUpdatesTimeMs = allUpdatesTimeMs, numRowsRemoved = numRowsRemoved,
allRemovalsTimeMs = allRemovalsTimeMs, commitTimeMs = commitTimeMs,
memoryUsedBytes = memoryUsedBytes, numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
numShufflePartitions = numShufflePartitions, numStateStoreInstances = numStateStoreInstances,
customMetrics = customMetrics)
private[sql] def jsonValue: JValue = {
("operatorName" -> JString(operatorName)) ~
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
("numRowsRemoved" -> JInt(numRowsRemoved)) ~
("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
("commitTimeMs" -> JInt(commitTimeMs)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
("numShufflePartitions" -> JInt(numShufflePartitions)) ~
("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
("customMetrics" -> {
if (!customMetrics.isEmpty) {
val keys = customMetrics.keySet.asScala.toSeq.sorted
keys.map { k => k -> JInt(customMetrics.get(k).toLong) : JObject }.reduce(_ ~ _)
} else {
JNothing
}
})
}
override def toString: String = prettyJson
}
/**
* Information about progress made in the execution of a [[StreamingQuery]] during
* a trigger. Each event relates to processing done for a single trigger of the streaming
* query. Events are emitted even when no new data is available to be processed.
*
* @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
* @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
* @param batchId A unique id for the current batch of data being processed. Note that in the
* case of retries after a failure a given batchId my be executed more than once.
* Similarly, when there is no data to be processed, the batchId will not be
* incremented.
* @param batchDuration The process duration of each batch.
* @param durationMs The amount of time taken to perform various operations in milliseconds.
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
* {{{
* "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
* "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
* "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
* "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
* }}}
* All timestamps are in ISO8601 format, i.e. UTC timestamps.
* @param stateOperators Information about operators in the query that store state.
* @param sources detailed statistics on data being read from each of the streaming sources.
* @since 2.1.0
*/
@Evolving
class StreamingQueryProgress private[sql](
val id: UUID,
val runId: UUID,
val name: String,
val timestamp: String,
val batchId: Long,
val batchDuration: Long,
val durationMs: ju.Map[String, JLong],
val eventTime: ju.Map[String, String],
val stateOperators: Array[StateOperatorProgress],
val sources: Array[SourceProgress],
val sink: SinkProgress,
@JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
val observedMetrics: ju.Map[String, Row]) extends Serializable {
/** The aggregate (across all sources) number of records processed in a trigger. */
def numInputRows: Long = sources.map(_.numInputRows).sum
/** The aggregate (across all sources) rate of data arriving. */
def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
/** The aggregate (across all sources) rate at which Spark is processing data. */
def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue) ~
("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => row.jsonValue))
}
}
/**
* Information about progress made for a source in the execution of a [[StreamingQuery]]
* during a trigger. See [[StreamingQueryProgress]] for more information.
*
* @param description Description of the source.
* @param startOffset The starting offset for data being read.
* @param endOffset The ending offset for data being read.
* @param latestOffset The latest offset from this source.
* @param numInputRows The number of records read from this source.
* @param inputRowsPerSecond The rate at which data is arriving from this source.
* @param processedRowsPerSecond The rate at which data from this source is being processed by
* Spark.
* @since 2.1.0
*/
@Evolving
class SourceProgress protected[sql](
val description: String,
val startOffset: String,
val endOffset: String,
val latestOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
val processedRowsPerSecond: Double,
val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("latestOffset" -> tryParse(latestOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
}
private def tryParse(json: String) = try {
parse(json)
} catch {
case NonFatal(e) => JString(json)
}
}
/**
* Information about progress made for a sink in the execution of a [[StreamingQuery]]
* during a trigger. See [[StreamingQueryProgress]] for more information.
*
* @param description Description of the source corresponding to this status.
* @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily)
* or Sink V1 (until decommissioned).
* @since 2.1.0
*/
@Evolving
class SinkProgress protected[sql](
val description: String,
val numOutputRows: Long) extends Serializable {
/** SinkProgress without custom metrics. */
protected[sql] def this(description: String) = {
this(description, DEFAULT_NUM_OUTPUT_ROWS)
}
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("numOutputRows" -> JInt(numOutputRows))
}
}
private[sql] object SinkProgress {
val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L
def apply(description: String, numOutputRows: Option[Long]): SinkProgress =
new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS))
}
private object SafeJsonSerializer {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
if (map.isEmpty) return JNothing
val keys = map.asScala.keySet.toSeq.sorted
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
}
}