# Spark Recitation

![title](img/chopsticks_title.png)

![title](img/how_to_use_chopsticks.png)

# Initialize Spark

In [2]:

import os
del os.environ['PYSPARK_SUBMIT_ARGS']
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.appName('Spark_Rec').getOrCreate()

# Reading In and Exploring Data

In [3]:
chopsticks_sdf = spark.read.option('header', 'true').csv('data/chopstick-effectiveness.csv')

In [4]:
chopsticks_sdf.show(5)

+------------------------+----------+----------------+
|Food.Pinching.Effeciency|Individual|Chopstick.Length|
+------------------------+----------+----------------+
|                   19.55|         1|             180|
|                   27.24|         2|             180|
|                   28.76|         3|             180|
|                   31.19|         4|             180|
|                   21.91|         5|             180|
+------------------------+----------+----------------+
only showing top 5 rows



![title](img/arrangement.png)

In [5]:
chopsticks_sdf.count()

186

In [6]:
# Doesn't work. Why?
chopsticks_sdf.describe().show()

Py4JJavaError: An error occurred while calling o28.describe.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:347)
	at scala.None$.get(Option.scala:345)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$aggregatableColumns$2.apply(Dataset.scala:235)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$aggregatableColumns$2.apply(Dataset.scala:233)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$aggregatableColumns(Dataset.scala:233)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2078)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2067)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
	at org.apache.spark.sql.Dataset.describe(Dataset.scala:2067)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [7]:
chopsticks_sdf.printSchema()

root
 |-- Food.Pinching.Effeciency: string (nullable = true)
 |-- Individual: string (nullable = true)
 |-- Chopstick.Length: string (nullable = true)



# Jupyter Command and Edit Modes and Shift + Tab for Documentation

# Projecting Columns

In [8]:
chopsticks_sdf['Individual']

Column<b'Individual'>

In [9]:
chopsticks_sdf.Individual

Column<b'Individual'>

In [10]:
chopsticks_sdf[['Individual']]

DataFrame[Individual: string]

In [11]:
# number of individuals in experiment
chopsticks_sdf[['Individual']].distinct().count()

31

In [12]:
# lengths of chopsticks being tested
chopsticks_sdf[['`Chopstick.Length`']].distinct().show()

+----------------+
|Chopstick.Length|
+----------------+
|             300|
|             270|
|             180|
|             240|
|             210|
|             330|
+----------------+



# Renaming Columns

In [13]:
chopsticks_sdf = chopsticks_sdf.withColumnRenamed('Food.Pinching.Effeciency', 'efficiency') \
                               .withColumnRenamed('Individual', 'individual') \
                               .withColumnRenamed('Chopstick.Length', 'length')
chopsticks_sdf.show(5)

+----------+----------+------+
|efficiency|individual|length|
+----------+----------+------+
|     19.55|         1|   180|
|     27.24|         2|   180|
|     28.76|         3|   180|
|     31.19|         4|   180|
|     21.91|         5|   180|
+----------+----------+------+
only showing top 5 rows



In [14]:
# many ways of doing this
chopsticks_sdf.selectExpr('efficiency AS eff', 'individual AS ind', 'length').show(5)

+-----+---+------+
|  eff|ind|length|
+-----+---+------+
|19.55|  1|   180|
|27.24|  2|   180|
|28.76|  3|   180|
|31.19|  4|   180|
|21.91|  5|   180|
+-----+---+------+
only showing top 5 rows



In [15]:
chopsticks_sdf.createOrReplaceTempView('chopsticks_view')
spark.sql('SELECT efficiency AS eff, individual AS ind, length AS len FROM chopsticks_view').columns

['eff', 'ind', 'len']

In [16]:
# get multiple columns now that we have names that don't require backticks
chopsticks_sdf[['individual', 'efficiency']].show(5)

+----------+----------+
|individual|efficiency|
+----------+----------+
|         1|     19.55|
|         2|     27.24|
|         3|     28.76|
|         4|     31.19|
|         5|     21.91|
+----------+----------+
only showing top 5 rows



# Changing Columns Types

In [17]:
chopsticks_sdf = chopsticks_sdf.selectExpr('CAST(efficiency AS double)',
                                           'CAST(individual AS int)',
                                           'CAST(length AS int)')

In [18]:
chopsticks_sdf.printSchema()

root
 |-- efficiency: double (nullable = true)
 |-- individual: integer (nullable = true)
 |-- length: integer (nullable = true)



In [19]:
chopsticks_sdf.describe().show()

+-------+-----------------+-----------------+-----------------+
|summary|       efficiency|       individual|           length|
+-------+-----------------+-----------------+-----------------+
|  count|              186|              186|              186|
|   mean|25.00559139784947|             16.0|            255.0|
| stddev|4.039692913767978|8.968413038683735|51.37303951674644|
|    min|            14.47|                1|              180|
|    max|            36.15|               31|              330|
+-------+-----------------+-----------------+-----------------+



# Creating Schema and Reading in Data with Schema

In [20]:
# uses pyspark.sql.types
schema = StructType([StructField("efficiency", DoubleType()),
                     StructField("individual", IntegerType()),
                     StructField("length", IntegerType())])
chopsticks2_sdf = spark.read.schema(schema).option('header', 'true').csv('data/chopstick-effectiveness.csv')
chopsticks2_sdf.show(5)
chopsticks2_sdf.printSchema()

+----------+----------+------+
|efficiency|individual|length|
+----------+----------+------+
|     19.55|         1|   180|
|     27.24|         2|   180|
|     28.76|         3|   180|
|     31.19|         4|   180|
|     21.91|         5|   180|
+----------+----------+------+
only showing top 5 rows

root
 |-- efficiency: double (nullable = true)
 |-- individual: integer (nullable = true)
 |-- length: integer (nullable = true)



# Computing Mean Efficiency by Chopstick Length

In [21]:
chopsticks_sdf.groupBy('length').mean('efficiency').orderBy('avg(efficiency)', ascending=False).show()

+------+------------------+
|length|   avg(efficiency)|
+------+------------------+
|   240| 26.32290322580646|
|   210|25.483870967741932|
|   300|24.968064516129033|
|   180|24.935161290322586|
|   270|24.323870967741943|
|   330| 23.99967741935484|
+------+------------------+



In [22]:
# Looks a bit strange. Why?
spark.sql("""SELECT length, AVG(efficiency)
             FROM chopsticks_view
             GROUP BY length
             ORDER BY AVG(efficiency) DESC""").show()

+------+-------------------------------+
|length|avg(CAST(efficiency AS DOUBLE))|
+------+-------------------------------+
|   240|              26.32290322580646|
|   210|             25.483870967741932|
|   300|             24.968064516129033|
|   180|             24.935161290322586|
|   270|             24.323870967741943|
|   330|              23.99967741935484|
+------+-------------------------------+



# Computing Mean Efficiency by Individual and Joining to get Names

In [24]:
# read in names
schema = StructType([StructField("id", IntegerType()),
                     StructField("name", StringType())])
individuals_sdf = spark.read.schema(schema).option('header', 'true').csv('data/individual-names.csv')

In [25]:
chopsticks_sdf.join(individuals_sdf, chopsticks_sdf.individual == individuals_sdf.id) \
              .groupBy('individual', 'name').mean('efficiency').orderBy('avg(efficiency)', ascending=False) \
              .select('name', 'avg(efficiency)') \
              .show(10)

+--------------+------------------+
|          name|   avg(efficiency)|
+--------------+------------------+
|    Mr. Miyagi|31.028333333333332|
|    Daniel-san|             30.12|
|          Doge|30.004999999999995|
|  Trevin Gandy|            29.265|
|    Dank Memes|29.128333333333334|
|     A Firm JT| 28.06833333333334|
|  Steve IsHuge| 28.06833333333333|
|Jordan Hurwitz|27.556666666666672|
|       Kind A.| 27.49333333333334|
|         Small|27.403333333333336|
+--------------+------------------+
only showing top 10 rows



In [None]:
%%time
individuals_sdf.createOrReplaceTempView('names_view')
spark.sql("""SELECT name, AVG(efficiency) AS avg_efficiency
             FROM chopsticks_view
             JOIN names_view
             ON chopsticks_view.individual = names_view.id
             GROUP BY individual, name
             ORDER BY avg_efficiency DESC""").show(10)