Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8701][Streaming][WebUI] Add input metadata in the batch page #7081

Closed
wants to merge 7 commits into from

Conversation

Projects
None yet
4 participants
@zsxwing
Copy link
Member

commented Jun 29, 2015

This PR adds metadata to InputInfo. InputDStream can report its metadata for a batch and it will be shown in the batch page.

For example,

screen shot

FileInputDStream will display the new files for a batch, and DirectKafkaInputDStream will display its offset ranges.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 29, 2015

Test build #35977 has finished for PR 7081 at commit d496ae9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@@ -33,7 +33,7 @@ import org.apache.spark.streaming.Time
@DeveloperApi
case class BatchInfo(
batchTime: Time,
streamIdToNumRecords: Map[Int, Long],

This comment has been minimized.

Copy link
@tdas

tdas Jun 30, 2015

Contributor

This is a problem. We cannot expose InputInfo class through BatchInfo as the first one is private[streaming] whereas the latter in public.

I wondering what is the right way here. Any ideas?

This comment has been minimized.

Copy link
@zsxwing

zsxwing Jul 1, 2015

Author Member

Use two parameters: streamIdToNumRecords and streamIdToMetadata?

This comment has been minimized.

Copy link
@tdas

tdas Jul 3, 2015

Contributor

Okay I chatted with @pwendell , and the decision in as follows.

  1. Name = StreamInputInfo,
  2. It will have the field info: Map[String, Object]. Rather than a string, its most future-proof is to use a Map[String, Object], as we may want to not just strings but actual objects. For example, for files, one of the keys in the map be "files" and the value is a list of files.
  3. Deprecate streamIdToNumRecords and introduce streamIdToInputInfo: Map[Int, StreamInputInfo]
  4. object StreamInputInfo can have well known key names like for number of records, etc. They are used to store the corresponding data in the map.

This comment has been minimized.

Copy link
@tdas

tdas Jul 3, 2015

Contributor

Maybe case class StreamInputInfo(numRecords: Int, metadata: Map[String, Object]) is fine.

And the metadata map will have at least one standard field named "Description" which will map to the string that will be shown in the UI.

This comment has been minimized.

Copy link
@tdas

tdas Jul 3, 2015

Contributor

Also, please make the string prettier by introducing newlines and tabs, and making the HTML preserve newlines and tabs

@zsxwing

This comment has been minimized.

Copy link
Member Author

commented Jul 4, 2015

New screenshot
screen shot 2015-07-04 at 7 23 35 pm

private def metadataDescriptionToHTML(metadataDescription: String): Seq[Node] = {
// tab to 4 spaces and "\n" to "<br/>"
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))

This comment has been minimized.

Copy link
@zsxwing

zsxwing Jul 4, 2015

Author Member

Because HTML doesn't support tab, I use 4 spaces instead.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 4, 2015

Test build #36532 has finished for PR 7081 at commit 1d94582.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
}
val description = offsetRanges.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"range: [${offsetRange.fromOffset}, ${offsetRange.untilOffset})"

This comment has been minimized.

Copy link
@tdas

tdas Jul 7, 2015

Contributor

Can you use the format X --> Y (where Y = unitOffset - 1)
BTW, if the range is from: 5 until: 5 (that is not data), then you should ignore that range in the UI.

This comment has been minimized.

Copy link
@tdas

tdas Jul 7, 2015

Contributor

Or "offsets: X to Y"

Some(filesToRDD(newFiles))
val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user
val metadata = Map("files" -> newFiles.toList, "Description" -> newFiles.mkString("\n"))

This comment has been minimized.

Copy link
@tdas

tdas Jul 7, 2015

Contributor

The name "description" must should be a well known name, and there should a variable assigned. Say StreamInputInfo.METADATA_KEY_DESCRIPTION.

s"range: [${offsetRange.fromOffset}, ${offsetRange.untilOffset})"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map("offsets" -> offsetRanges.toList, "Description" -> description)

This comment has been minimized.

Copy link
@tdas

tdas Jul 7, 2015

Contributor

Similar comment as below.

@zsxwing

This comment has been minimized.

Copy link
Member Author

commented Jul 7, 2015

New screenshot:

g1

And screenshot for no files and no Kafka data:
g2

@SparkQA

This comment has been minimized.

Copy link

commented Jul 7, 2015

Test build #36679 has started for PR 7081 at commit 8730dec.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 7, 2015

Test build #36679 has finished for PR 7081 at commit 8730dec.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamInputInfo(
@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 7, 2015

Merged build finished. Test FAILed.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 7, 2015

Merged build triggered.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 7, 2015

Merged build started.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 7, 2015

Test build #36682 has started for PR 7081 at commit 74762da.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 7, 2015

Test build #36682 has finished for PR 7081 at commit 74762da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamInputInfo(
@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 7, 2015

Merged build finished. Test PASSed.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build triggered.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build started.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36915 has started for PR 7081 at commit d906209.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build triggered.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build started.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36918 has started for PR 7081 at commit f7abd9b.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36918 has finished for PR 7081 at commit f7abd9b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamInputInfo(
@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build finished. Test FAILed.

@tdas

This comment has been minimized.

Copy link
Contributor

commented Jul 9, 2015

Jenkins, test this please.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build triggered.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build started.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36920 has started for PR 7081 at commit f7abd9b.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36915 has finished for PR 7081 at commit d906209.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamInputInfo(
@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build finished. Test FAILed.

@tdas

This comment has been minimized.

Copy link
Contributor

commented Jul 9, 2015

Jenkins, test this please.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build triggered.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build started.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36925 has started for PR 7081 at commit f7abd9b.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36920 has finished for PR 7081 at commit f7abd9b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamInputInfo(
@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build finished. Test FAILed.

@zsxwing

This comment has been minimized.

Copy link
Member Author

commented Jul 9, 2015

retest this please

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build triggered.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36925 timed out for PR 7081 at commit f7abd9b after a configured wait of 175m.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build finished. Test FAILed.

@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build started.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36932 has started for PR 7081 at commit f7abd9b.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2015

Test build #36932 has finished for PR 7081 at commit f7abd9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamInputInfo(
@AmplabJenkins

This comment has been minimized.

Copy link

commented Jul 9, 2015

Merged build finished. Test PASSed.

@tdas

This comment has been minimized.

Copy link
Contributor

commented Jul 9, 2015

Merging this to master! Thanks @zsxwing

@asfgit asfgit closed this in 1f6b0b1 Jul 9, 2015

@zsxwing zsxwing deleted the zsxwing:input-metadata branch Jul 10, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.