Skip to content

Commit

Permalink
[SPARK-15466][SQL] Make SparkSession as the entry point to programm…
Browse files Browse the repository at this point in the history
…ing with RDD too
  • Loading branch information
dongjoon-hyun committed May 21, 2016
1 parent 201a51f commit 65f9746
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 7 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Methods for creating RDDs

/** Distribute a local Scala collection to form an RDD.
/**
* Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
Expand Down
4 changes: 1 addition & 3 deletions examples/src/main/python/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
.appName("PythonPi")\
.getOrCreate()

sc = spark._sc

partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions

Expand All @@ -42,7 +40,7 @@ def f(_):
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
count = spark.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ object SparkPi {
.builder
.appName("Spark Pi")
.getOrCreate()
val sc = spark.sparkContext
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = sc.parallelize(1 until n, slices).map { i =>
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,19 @@ def range(self, start, end=None, step=1, numPartitions=None):

return DataFrame(jdf, self._wrapped)

@since(2.0)
def parallelize(self, c, numSlices=None):
"""
Distribute a local Python collection to form an RDD. Using xrange
is recommended if the input represents a range for performance.
>>> spark.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> spark.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
"""
return self._sc.parallelize(c, numSlices)

def _inferSchemaFromList(self, data):
"""
Infer schema from list of Row or tuple.
Expand Down
18 changes: 17 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util.Utils


/**
* The entry point to programming Spark with the Dataset and DataFrame API.
* The entry point to programming Spark with RDD, Dataset and DataFrame API.
*
* To create a SparkSession, use the following builder pattern:
*
Expand Down Expand Up @@ -442,6 +442,22 @@ class SparkSession private(
new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG)
}

/**
* Distribute a local Scala collection to form an RDD.
*
* @since 2.0.0
*/
def parallelize[T: ClassTag](seq: Seq[T]): RDD[T] =
self.sparkContext.parallelize[T](seq)

/**
* Distribute a local Scala collection to form an RDD.
*
* @since 2.0.0
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int): RDD[T] =
self.sparkContext.parallelize[T](seq, numSlices)

/**
* Creates a [[DataFrame]] from an RDD[Row].
* User can specify whether the input rows should be converted to Catalyst rows.
Expand Down

0 comments on commit 65f9746

Please sign in to comment.