Skip to content

Conversation

@obermeier
Copy link
Member

External applications like Apache Cassandra are able to deal with IPv6 addresses. Libraries like spark-cassandra-connector combine Apache Cassandra with Apache Spark.
This combination is very useful IMHO.

One problem is that org.apache.spark.util.Utils.parseHostPort(hostPort: String) takes the last colon to sepperate the port from host path. This conflicts with literal IPv6 addresses.

I think we can take hostPort as literal IPv6 address if it contains tow ore more colons. If IPv6 addresses are enclosed in square brackets port definition is still possible.

@obermeier obermeier changed the title [SPARK-22180][CORE] Allow IPv6 [SPARK-22180][CORE] Allow IPv6 address in org.apache.spark.util.Utils.parseHostPort Oct 1, 2017
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ore
You might note here that you're checking that you don't have a [::1]:123 IPv6 address here -- the braces are key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turn the rule back on after the block?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hex digits can be uppercase right?
Should the pattern it not be more like [0-9a-f]*(:[0-9a-f]*)+ match a number, then colon-number colon-number pairs, not number-colon-number number-colon-number sequences?
It might end up being equivalent because the match is for 0 or more digits.

This allows some strings that it shouldn't like "::::", but, the purpose isn't to catch every possible case I guess. It would fail name resolution.

I thought Inet6Address would just provide parsing for this but I guess not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes a real parser would be much better!!
I hope the methods will check the input. Like the name resolver...

At this point I thought more about the separation of the port.
I think it is important to check if two colons exists, otherwise this expression accepts hostnames like abc:123

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final nit, use braces for both parts of the if-else

@jiangxb1987
Copy link
Contributor

@obermeier Could you please rebase this with the latest master? Thanks!

@obermeier obermeier force-pushed the issue/SPARK-22180 branch 2 times, most recently from cf1d920 to 8026b0f Compare November 6, 2017 23:05
@obermeier
Copy link
Member Author

Done

@obermeier obermeier force-pushed the issue/SPARK-22180 branch 2 times, most recently from 418927f to a6894d5 Compare November 13, 2017 00:58
@jiangxb1987
Copy link
Contributor

Are you planning to fully address the IPv6 issues? If not, why do we choose to adapt this single function separately?

@obermeier
Copy link
Member Author

obermeier commented Nov 13, 2017

I chose this function because I had some exceptions like this [1] if I used IPv6 hosts.
In this example org.apache.spark.util.Utils$.parseHostPort decided to use f904 as port but it was the last 16 bit chunk of an IPv6 address.

I do not have an overview over the Spark code so I am currently not able to provide a general IPv6 solution.

I think this code snipets will improve the situation and enables us to use IPv6 hosts in some cases.

[1]

java.lang.NumberFormatException: For input string: "f904"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
	at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:935)
	at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:36)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:206)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:187)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:187)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:166)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:165)
	at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:205)
	at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:169)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1058)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1626)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

@jiangxb1987
Copy link
Contributor

Sounds good.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change LGTM, cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should add more test cases to cover the invalid cases.

Copy link
Member Author

@obermeier obermeier Nov 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the preferred way to handle this kind of parse errors in Spark?
Changing the signature of this method to something like :Try[..], :Option ... is no option!?
Error log messages?
Unchecked Exceptions?
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this comment still valid?

Copy link
Member Author

@obermeier obermeier Nov 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment

….parseHostPort

## What changes were proposed in this pull request?

Take ```hostPort``` as literal IPv6 address if it contains tow ore more colons. If IPv6 addresses are enclosed in square brackets port definition is still possible.

## How was this patch tested?

Added a new test case into UtilsSuite

Remove comment
@vanzin
Copy link
Contributor

vanzin commented Dec 15, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Dec 15, 2017

Test build #84982 has finished for PR 19408 at commit 1400299.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Remove whitespace at end of line.
@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85062 has finished for PR 19408 at commit 68c3221.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Jan 26, 2018

To rephrase @jiangxb1987's question - supporting IPv6 is a much larger effort, which spark currently does not. We should be addressing that as the problem to solve, and fix this as part of IPv6 support : eliminating individual exceptions could simply result in spark platform going into inconsistent states, without any telemetry in the logs on why (because we removed/'fixed' the expected exceptions).

@jiangxb1987
Copy link
Contributor

Fully agree with @mridulm 's big picture suggestion, and I also think supporting IPv6 should be designed as a integral feature, instead of just putting together some PRs.

@obermeier
Copy link
Member Author

I total agree with you.
What do you think about just adding a log message if the given string is obviously not a valid host name.
Because the given NumberFormatException much later after the parsing component was a little bit confusing.

java.lang.NumberFormatException: For input string: "f904"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
	at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:935)
	at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:36)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:206)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:187)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:187)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:166)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:165)
	at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:205)
	at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:169)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1058)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1626)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

@obermeier
Copy link
Member Author

This issue seems to be fixed in Spark 2.3.2

@obermeier obermeier closed this Oct 13, 2018
@obermeier
Copy link
Member Author

If Spark runs in YARN Cluster this issue still exists

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 15, 2020
@github-actions github-actions bot closed this Jan 16, 2020
@obermeier obermeier deleted the issue/SPARK-22180 branch January 24, 2020 10:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants