Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13203] Add scalastyle rule banning use of mutable.SynchronizedBuffer #11082

Closed
wants to merge 11 commits into from
Closed

Conversation

ted-yu
Copy link

@ted-yu ted-yu commented Feb 4, 2016

See discussion at the tail of #11059

@andrewor14
Please review

@ted-yu ted-yu changed the title Add scalastyle rule banning use of mutable.SynchronizedBuffer [SPARK-13203] Add scalastyle rule banning use of mutable.SynchronizedBuffer Feb 4, 2016
@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

Jenkins, test this please.

@andrewor14
Copy link
Contributor

can you add one for SynchronizedMap as well

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Lo
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala:      with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala:    new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala:      new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]
streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala:import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala:  val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]

Since SynchronizedMap is used, should I open another JIRA ?

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:      new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, Strin
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:    val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuf
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:    val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuf
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:      new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, Strin
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:      new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[S
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:  val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffe
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala:    with mutable.SynchronizedBuffer[BlockGenerator]
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala:import scala.collection.mutable.SynchronizedBuffer
streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala:    val pushedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala:    val addedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala:    val addedMetadata = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala:    val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala:    val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]

Should the above be cleaned as addendum to #11059 or, using this PR ?

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #50773 has finished for PR 11082 at commit 4375f65.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

Jenkins, test this please.

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

Logged SPARK-13204 for replacing mutable.SynchronizedMap

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #50776 has finished for PR 11082 at commit a40711c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #50778 has finished for PR 11082 at commit f5666a8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

Jenkins, test this please.

@andrewor14
Copy link
Contributor

I think there's some overlap between this patch and #11067

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #50781 has finished for PR 11082 at commit a566d91.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 4, 2016

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50785 has finished for PR 11082 at commit b63c136.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Some files in this PR are not covered by #11067
@holdenk
I can base my PR on yours.

Let me know what you think.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please

@holdenk
Copy link
Contributor

holdenk commented Feb 5, 2016

I'd probably just wait for the streaming PR to also get merged in - makes it simpler.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50796 has finished for PR 11082 at commit ebf3509.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50800 has finished for PR 11082 at commit 74a4c70.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50801 has finished for PR 11082 at commit 879eebe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50814 has finished for PR 11082 at commit 7faca67.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

I am trying to add an Array to ConcurrentLinkedQueue (collectedData) in Scala 2.11:

stream.foreachRDD { rdd => Collections.addAll(collectedData, rdd.collect()) }

I got the following compilation error:

[error]  found   : java.util.concurrent.ConcurrentLinkedQueue[(String, String)]
[error]  required: java.util.Collection[_ >: java.io.Serializable]
[error] Note: (String, String) <: Any, but Java-defined trait Collection is invariant in type E.
[error] You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
[error]     stream.foreachRDD { rdd => Collections.addAll(allReceived, rdd.collect()) }

@holdenk
Copy link
Contributor

holdenk commented Feb 5, 2016

@ted-yu you can use Arrays.asList with the splat syntax along with the ConcurrentLinkedQueues add all method:

collectedData.addAll(Arrays.asList(rdd.collect():_*))

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50839 has finished for PR 11082 at commit b84c8c7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50841 has finished for PR 11082 at commit 5e770b0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Feb 5, 2016

There is no error in DirectKafkaStreamSuite.scala

Once @holdenk's PR goes in, I will rebase.

@srowen
Copy link
Member

srowen commented Feb 7, 2016

@ted-yu please close this PR, or go ahead and make it only about the issue you filed: a scalastyle rule.

@ted-yu
Copy link
Author

ted-yu commented Feb 7, 2016

DirectKafkaStreamSuite.scala is not covered by #11067
Addition of scalastyle rule would not pass even after #11067 goes in.

Should I open a seperate PR for DirectKafkaStreamSuite.scala ?

@srowen
Copy link
Member

srowen commented Feb 7, 2016

Of course, this depends on the new rule passing for the whole project. If an existing PR means to catch all occurrences of the issue the rule would flag, then it needs to cover this case too. You should comment on #11067, not here. I see no reason you would open another PR.

@ted-yu
Copy link
Author

ted-yu commented Feb 7, 2016

bq. If an existing PR means to catch all occurrences of the issue the rule would flag, then it needs to cover this case too

In this case, looks like SPARK-13151 should cover all occurrences.

@ted-yu ted-yu closed this Feb 9, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants