From 29def0069d96ca449204ad27e8c66ca2a218ce84 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 24 Jul 2018 11:34:49 +0200 Subject: [PATCH] [SPARK-24899][SQL][DOC] Add example of monotonically_increasing_id standard function to scaladoc --- .../MonotonicallyIncreasingID.scala | 4 +- .../org/apache/spark/sql/functions.scala | 42 ++++++++++++++++--- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala index f1da592a76845..090ea205da665 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{DataType, LongType} */ @ExpressionDescription( usage = """ - _FUNC_() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed + _FUNC_() - Returns monotonically increasing 64-bit integers. The generated IDs are guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion @@ -80,7 +80,5 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Stateful { override def prettyName: String = "monotonically_increasing_id" - override def sql: String = s"$prettyName()" - override def freshCopy(): MonotonicallyIncreasingID = MonotonicallyIncreasingID() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index de1d422856ba9..bd161a8daa7c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1150,16 +1150,48 @@ object functions { /** * A column expression that generates monotonically increasing 64-bit integers. * - * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. + * The generated IDs are guaranteed to be monotonically increasing and unique, but not + * consecutive (unless all rows are in the same single partition which you rarely want due to + * the volume of the data). * The current implementation puts the partition ID in the upper 31 bits, and the record number * within each partition in the lower 33 bits. The assumption is that the data frame has * less than 1 billion partitions, and each partition has less than 8 billion records. * - * As an example, consider a `DataFrame` with two partitions, each with 3 records. - * This expression would return the following IDs: - * * {{{ - * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. + * // Create a dataset with four partitions, each with two rows. + * val q = spark.range(start = 0, end = 8, step = 1, numPartitions = 4) + * + * // Make sure that every partition has the same number of rows + * q.mapPartitions(rows => Iterator(rows.size)).foreachPartition(rows => assert(rows.next == 2)) + * q.select(monotonically_increasing_id).show + * + * // Assign consecutive IDs for rows per partition + * import org.apache.spark.sql.expressions.Window + * // count is the name of the internal registry of MonotonicallyIncreasingID to count rows + * // Could also be "id" since it is unique and consecutive in a partition + * import org.apache.spark.sql.functions.{row_number, shiftLeft, spark_partition_id} + * val rowNumber = row_number over Window.partitionBy(spark_partition_id).orderBy("id") + * // row_number is a sequential number starting at 1 within a window partition + * val count = rowNumber - 1 as "count" + * val partitionMask = shiftLeft(spark_partition_id cast "long", 33) as "partitionMask" + * val demo = q.select( + * $"id", + * partitionMask, + * count, + * monotonically_increasing_id) + * scala> demo.orderBy("id").show + * +---+-------------+-----+-----------------------------+ + * | id|partitionMask|count|monotonically_increasing_id()| + * +---+-------------+-----+-----------------------------+ + * | 0| 0| 0| 0| + * | 1| 0| 1| 1| + * | 2| 8589934592| 0| 8589934592| + * | 3| 8589934592| 1| 8589934593| + * | 4| 17179869184| 0| 17179869184| + * | 5| 17179869184| 1| 17179869185| + * | 6| 25769803776| 0| 25769803776| + * | 7| 25769803776| 1| 25769803777| + * +---+-------------+-----+-----------------------------+ * }}} * * @group normal_funcs