Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented May 21, 2016

What changes were proposed in this pull request?

SparkSession greatly reduces the number of concepts which Spark users must know. Currently, SparkSession is defined as the entry point to programming Spark with the Dataset and DataFrame API. And, we can easily get RDD by calling Dataset.rdd or DataFrame.rdd, too.

However, many usages (including examples) are observed to extract SparkSession.sparkContext and keep it as own variable to call parallelize.

If SparkSession supports RDD seamlessly too, it would be great for usability. We can do this by simply adding parallelize API.

Example

 object SparkPi {
   def main(args: Array[String]) {
     val spark = SparkSession
       .builder
       .appName("Spark Pi")
       .getOrCreate()
-    val sc = spark.sparkContext
     val slices = if (args.length > 0) args(0).toInt else 2
     val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
-    val count = sc.parallelize(1 until n, slices).map { i =>
+    val count = spark.parallelize(1 until n, slices).map { i =>
     val count = spark.parallelize(1 until n, slices).map { i =>
       val x = random * 2 - 1
       val y = random * 2 - 1
       if (x*x + y*y < 1) 1 else 0
     }.reduce(_ + _)
     println("Pi is roughly " + 4.0 * count / n)
     spark.stop()
   }
 }
 spark = SparkSession\
   .builder\
   .appName("PythonPi")\
   .getOrCreate()

- sc = spark._sc
-
 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
 n = 100000 * partitions

 def f(_):
   x = random() * 2 - 1
   y = random() * 2 - 1
   return 1 if x ** 2 + y ** 2 < 1 else 0

-count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+count = spark.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
 print("Pi is roughly %f" % (4.0 * count / n))

 spark.stop()

How was this patch tested?

Pass the Jenkins test (with new python test) and also manual.

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin .
I'm wondering your opinion about this PR.

@SparkQA
Copy link

SparkQA commented May 21, 2016

Test build #59081 has finished for PR 13245 at commit 810f08a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 22, 2016

Test build #59085 has finished for PR 13245 at commit 4f6a69e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 22, 2016

Test build #59082 has finished for PR 13245 at commit 65f9746.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 22, 2016

hm we are trying to avoid returning rdds in the new apis. one thing we can do is to introduce a parallelize api that returns dataset?

@dongjoon-hyun
Copy link
Member Author

I see. Thank you!

@dongjoon-hyun
Copy link
Member Author

Unfortunately, Dataset (or Dataframe) seems not suitable to achieve the goal on Python.

>>> spark.parallelize(range(1, 10)).toDS()
...
AttributeError: 'RDD' object has no attribute 'toDS'
>>> spark.parallelize(range(1, 10)).toDF()
...
TypeError: Can not infer schema for type: <type 'int'>

I'll think about this more until tomorrow and close this if I cannot find a neat solution.

@dongjoon-hyun
Copy link
Member Author

Sorry, it wasn't a good idea of extending SparkSession for RDD. I'm closing this PR.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-15466 branch July 20, 2016 07:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants