**_pySpark Basics: Summary Statistics_**

_by Jeff Levy (jlevy@urban.org)_

_Last Updated: 7 July 2016, Spark v1.6.1_

This is intended to illustrate a problem I have run into with `unionAll`.  I am trying to take the `describe` output of a large dataset, calculate `skewness` and `kurtosis`, then build those results into two new rows and stack them.  It works with some test data created by simply parallelizing some rows, but it fails with `Task not serializable` for the rows created in my loop.

Both the test rows and the actual rows look the same when examined via `show` and `collect`.  When the rows that `unionAll` fails to merge are collected and then turned immediately back into dataframes, `unionAll` succeeds.

***

In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sc.version

u'1.6.1'

In [2]:
df = sqlContext.read.load('s3://ui-hfpc/Performance_2015Q1.txt',
                          format='com.databricks.spark.csv',
                          header='false',
                          inferSchema='true',
                          delimiter='|')

In [3]:
df = df[['C0', 'C2', 'C3', 'C4', 'C5', 'C6']]

In [4]:
df.show(10)

+------------+--------------------+-----+---------+---+---+
|          C0|                  C2|   C3|       C4| C5| C6|
+------------+--------------------+-----+---------+---+---+
|100002091588|               OTHER|4.125|     null|  0|360|
|100002091588|                    |4.125|     null|  1|359|
|100002091588|                    |4.125|     null|  2|358|
|100002091588|                    |4.125|     null|  3|357|
|100002091588|                    |4.125|     null|  4|356|
|100002091588|                    |4.125|     null|  5|355|
|100002091588|                    |4.125|342034.38|  6|354|
|100002091588|                    |4.125|341538.19|  7|353|
|100002091588|                    |4.125| 341040.3|  8|352|
|100002091588|PINGORA LOAN SERV...|4.125| 340540.7|  9|351|
+------------+--------------------+-----+---------+---+---+
only showing top 10 rows



In [5]:
#THIS IS THE DATA I WANT TO ADD ROWS TO
df_described = df.describe()
df_described.show()

+-------+--------------------+-------------------+------------------+------------------+-----------------+
|summary|                  C0|                 C3|                C4|                C5|               C6|
+-------+--------------------+-------------------+------------------+------------------+-----------------+
|  count|             3526154|            3526154|           1580402|           3526154|          3526154|
|   mean| 5.50388599500189E11|  4.178168090221903|  234846.780654818| 5.134865351881966|354.7084951479714|
| stddev|2.596112361975223E11|0.34382335723646484|118170.68592261613|3.3833930336063456|4.011812510792076|
|    min|        100002091588|               2.75|              0.85|                -1|              292|
|    max|        999995696635|              6.125|        1193544.39|                34|              480|
+-------+--------------------+-------------------+------------------+------------------+-----------------+



In [6]:
#CREATING THE ACTUAL NEW DATA TO APPEND
from pyspark.sql import Row
from pyspark.sql.functions import skewness, kurtosis

columns = df_described.columns  #a list of the column names: ['summary', 'C0', 'C3', 'C4', 'C5', 'C6']
funcs   = [skewness, kurtosis]  #a list of the functions we want to add to our summary statistics (imported above)
fnames  = ['skew', 'kurtosis']  #a list of strings describing the functions in the same order

def new_item(func, column):
    return str(df.select(func(column)).collect()[0][0])

new_data = []
for func, fname in zip(funcs, fnames):
    row_dict = {'summary':fname}
    for column in columns[1:]:
        row_dict[column] = new_item(func, column)
    new_data.append(Row(**row_dict))

In [7]:
#CREATING SOME TEST DATA
row = Row('summary', 'C0', 'C3', 'C4', 'C5', 'C6')
test_data = [row('test1', '1', '2', '3', '4', '5'),
            row('test2', '6', '7', '8', '9', '10')]

In [8]:
#A LOOK AT THE ACTUAL DATA
new_data

[Row(C0='-0.00183847089857', C3='0.519799339496', C4='0.758411576756', C5='0.286480156084', C6='-2.69765201567', summary='skew'),
 Row(C0='-1.19900726351', C3='0.126057726847', C4='0.576085602656', C5='0.195187780089', C6='24.7237858944', summary='kurtosis')]

In [9]:
#A LOOK AT THE TEST DATA
test_data

[Row(summary='test1', C0='1', C3='2', C4='3', C5='4', C6='5'),
 Row(summary='test2', C0='6', C3='7', C4='8', C5='9', C6='10')]

In [10]:
#TURN BOTH INTO DATAFRAMES (USING SELECT AT THE END TO ORDER THE COLUMNS THE SAME)
df_described2 = sc.parallelize(new_data).toDF().select(columns)
df_test       = sc.parallelize(test_data).toDF().select(columns)

In [11]:
df_described2.show()

+--------+-----------------+--------------+--------------+--------------+--------------+
| summary|               C0|            C3|            C4|            C5|            C6|
+--------+-----------------+--------------+--------------+--------------+--------------+
|    skew|-0.00183847089857|0.519799339496|0.758411576756|0.286480156084|-2.69765201567|
|kurtosis|   -1.19900726351|0.126057726847|0.576085602656|0.195187780089| 24.7237858944|
+--------+-----------------+--------------+--------------+--------------+--------------+



In [12]:
df_test.show()

+-------+---+---+---+---+---+
|summary| C0| C3| C4| C5| C6|
+-------+---+---+---+---+---+
|  test1|  1|  2|  3|  4|  5|
|  test2|  6|  7|  8|  9| 10|
+-------+---+---+---+---+---+



In [18]:
df_described2.dtypes

[('summary', 'string'),
 ('C0', 'string'),
 ('C3', 'string'),
 ('C4', 'string'),
 ('C5', 'string'),
 ('C6', 'string')]

In [19]:
df_test.dtypes

[('summary', 'string'),
 ('C0', 'string'),
 ('C3', 'string'),
 ('C4', 'string'),
 ('C5', 'string'),
 ('C6', 'string')]

In [13]:
#TRY TO UNION THE ACTUAL NEW DATA AND THE TEST DATA SEPARATELY WITH THE ORIGINAL DATA
expanded_describe = df_described.unionAll(df_described2)
test_describe     = df_described.unionAll(df_test)

In [14]:
#SUCCESS WITH THE TEST DATA
test_describe.show()

+-------+--------------------+-------------------+------------------+------------------+-----------------+
|summary|                  C0|                 C3|                C4|                C5|               C6|
+-------+--------------------+-------------------+------------------+------------------+-----------------+
|  count|             3526154|            3526154|           1580402|           3526154|          3526154|
|   mean| 5.50388599500189E11|  4.178168090221903|  234846.780654818| 5.134865351881966|354.7084951479714|
| stddev|2.596112361975223E11|0.34382335723646484|118170.68592261613|3.3833930336063456|4.011812510792076|
|    min|        100002091588|               2.75|              0.85|                -1|              292|
|    max|        999995696635|              6.125|        1193544.39|                34|              480|
|  test1|                   1|                  2|                 3|                 4|                5|
|  test2|                   6|       

In [15]:
#FAILURE WITH THE REAL DATA
expanded_describe.show()

Py4JJavaError: An error occurred while calling o173.showString.
: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
	at org.apache.spark.sql.execution.ConvertToUnsafe.doExecute(rowFormatConverters.scala:38)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
	at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
	at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
	at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
	at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
	at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
	at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: scala.collection.Iterator$$anon$11
Serialization stack:
	- object not serializable (class: scala.collection.Iterator$$anon$11, value: empty iterator)
	- field (class: scala.collection.Iterator$$anonfun$toStream$1, name: $outer, type: interface scala.collection.Iterator)
	- object (class scala.collection.Iterator$$anonfun$toStream$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream(WrappedArray(3526154, 3526154, 1580402, 3526154, 3526154), WrappedArray(5.50388599500189E11, 4.178168090221903, 234846.780654818, 5.134865351881966, 354.7084951479714), WrappedArray(2.596112361975223E11, 0.34382335723646484, 118170.68592261613, 3.3833930336063456, 4.011812510792076), WrappedArray(100002091588, 2.75, 0.85, -1, 292), WrappedArray(999995696635, 6.125, 1193544.39, 34, 480)))
	- field (class: scala.collection.immutable.Stream$$anonfun$zip$1, name: $outer, type: class scala.collection.immutable.Stream)
	- object (class scala.collection.immutable.Stream$$anonfun$zip$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream((WrappedArray(3526154, 3526154, 1580402, 3526154, 3526154),(count,<function1>)), (WrappedArray(5.50388599500189E11, 4.178168090221903, 234846.780654818, 5.134865351881966, 354.7084951479714),(mean,<function1>)), (WrappedArray(2.596112361975223E11, 0.34382335723646484, 118170.68592261613, 3.3833930336063456, 4.011812510792076),(stddev,<function1>)), (WrappedArray(100002091588, 2.75, 0.85, -1, 292),(min,<function1>)), (WrappedArray(999995696635, 6.125, 1193544.39, 34, 480),(max,<function1>))))
	- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: $outer, type: class scala.collection.immutable.Stream)
	- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream([count,3526154,3526154,1580402,3526154,3526154], [mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714], [stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076], [min,100002091588,2.75,0.85,-1,292], [max,999995696635,6.125,1193544.39,34,480]))
	- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: $outer, type: class scala.collection.immutable.Stream)
	- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream([count,3526154,3526154,1580402,3526154,3526154], [mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714], [stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076], [min,100002091588,2.75,0.85,-1,292], [max,999995696635,6.125,1193544.39,34,480]))
	- field (class: org.apache.spark.sql.execution.LocalTableScan, name: rows, type: interface scala.collection.Seq)
	- object (class org.apache.spark.sql.execution.LocalTableScan, LocalTableScan [summary#228,C0#229,C3#230,C4#231,C5#232,C6#233], [[count,3526154,3526154,1580402,3526154,3526154],[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],[min,100002091588,2.75,0.85,-1,292],[max,999995696635,6.125,1193544.39,34,480]]
)
	- field (class: org.apache.spark.sql.execution.ConvertToUnsafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
	- object (class org.apache.spark.sql.execution.ConvertToUnsafe, ConvertToUnsafe
+- LocalTableScan [summary#228,C0#229,C3#230,C4#231,C5#232,C6#233], [[count,3526154,3526154,1580402,3526154,3526154],[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],[min,100002091588,2.75,0.85,-1,292],[max,999995696635,6.125,1193544.39,34,480]]
)
	- field (class: org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, name: $outer, type: class org.apache.spark.sql.execution.ConvertToUnsafe)
	- object (class org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
	... 57 more


In [16]:
#COLLECT THE ORIGINAL AND THE NEW DATA, CONCATENATE THE LISTS, TURN THEM BACK INTO ONE DF
dd1 = df_described.collect()
dd2 = df_described2.collect()
dd3 = dd1 + dd2
new_df = sc.parallelize(dd3).toDF()
new_df.show()

+--------+--------------------+-------------------+------------------+------------------+-----------------+
| summary|                  C0|                 C3|                C4|                C5|               C6|
+--------+--------------------+-------------------+------------------+------------------+-----------------+
|   count|             3526154|            3526154|           1580402|           3526154|          3526154|
|    mean| 5.50388599500189E11|  4.178168090221903|  234846.780654818| 5.134865351881966|354.7084951479714|
|  stddev|2.596112361975223E11|0.34382335723646484|118170.68592261613|3.3833930336063456|4.011812510792076|
|     min|        100002091588|               2.75|              0.85|                -1|              292|
|     max|        999995696635|              6.125|        1193544.39|                34|              480|
|    skew|   -0.00183847089857|     0.519799339496|    0.758411576756|    0.286480156084|   -2.69765201567|
|kurtosis|      -1.199007263

In [17]:
#COLLECT THE ORIGINAL AND THE NEW DATA, TURN THEM INDIVIDUALY BACK INTO DFs, THEN UNIONALL
dd1_df = sc.parallelize(dd1).toDF()
dd2_df = sc.parallelize(dd2).toDF()
dd1_df.unionAll(dd2_df).show()

+--------+--------------------+-------------------+------------------+------------------+-----------------+
| summary|                  C0|                 C3|                C4|                C5|               C6|
+--------+--------------------+-------------------+------------------+------------------+-----------------+
|   count|             3526154|            3526154|           1580402|           3526154|          3526154|
|    mean| 5.50388599500189E11|  4.178168090221903|  234846.780654818| 5.134865351881966|354.7084951479714|
|  stddev|2.596112361975223E11|0.34382335723646484|118170.68592261613|3.3833930336063456|4.011812510792076|
|     min|        100002091588|               2.75|              0.85|                -1|              292|
|     max|        999995696635|              6.125|        1193544.39|                34|              480|
|    skew|   -0.00183847089857|     0.519799339496|    0.758411576756|    0.286480156084|   -2.69765201567|
|kurtosis|      -1.199007263

NOTE: the above error is currently unsolved, and a work-around such as the last two code blocks is necessary to stack this data right now.  it was discussed unsuccessfully here: http://stackoverflow.com/questions/38255145/task-not-serializable-error-in-pyspark-on-unionall?noredirect=1#comment63932376_38255145