# Spark on Faculty platform

Install the 'pyspark' Python package from pip and the 'openjdk-8-jre' apt package before running this notebook. Using a Faculty platform environment for this is recommended.

## Begin a Spark Session

To start creating Spark distributed data structures, first construct a Spark session using the provided helper module:

In [1]:
from configure_spark import get_session
spark = get_session()

You can then construct Spark dataframes with the session:

In [2]:
import pandas
pandas_df = pandas.DataFrame({
    "name": ["Andrew", "Nadine", "Victor", "Oana"],
    "gender": ["male", "female", "male", "female"],
    "age": [31, 27, 35, 29]
})
df = spark.createDataFrame(pandas_df)

RDDs can be constructed using the Spark context inside the session:

In [3]:
sc = spark.sparkContext
rdd = sc.parallelize([1, 3, 1, 2, 2, 4])

**Note:** `spark.createDataFrame` and `sc.parallelize` are useful for testing the API, but only work for data that we can load into memory before sending it to Spark (i.e. not for datasets of a size that will actually make use of Spark's scalability). Use [`spark.read.load`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.load) (or one of the specialised readers like [`spark.read.csv`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv)) to read DataFrames directly into Spark from storage or [`sc.textFile`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile) or similar to read RDDs directly from storage.

## Getting data out of Spark

As described in the presentation, values are not computed in Spark until the whole pipeline is evaluated. To evaluate the contents of a DataFrame:

In [4]:
df.toPandas()

Unnamed: 0,name,gender,age
0,Andrew,male,31
1,Nadine,female,27
2,Victor,male,35
3,Oana,female,29


And similarly for an RDD:

In [5]:
rdd.collect()

[1, 3, 1, 2, 2, 4]

**Note:** In a practical situation, Spark DataFrames and RDDs will often have more data in them than you can fit in memory. Spark does not try to be clever when you tell it to collect all the data together as a Pandas DataFrame or Python list - it will do as you say and possibly crash the Python kernel if there is too much data to handle.

## RDD Transformations

'Transformations' result in new RDDs whose contents are evaluated lazily. To see their contents we need to `.collect()` to force evaluation:

In [6]:
rdd.map(lambda v: v * 2).collect()

[2, 6, 2, 4, 4, 8]

In [7]:
rdd.filter(lambda v: v > 2).collect()

[3, 4]

In [8]:
rdd.distinct().collect()

[1, 3, 2, 4]

## RDD Actions

'Actions' result in Spark actually performing some requested calculations:

In [9]:
rdd.reduce(lambda v1, v2: v1 + v2)

13

In [10]:
rdd.take(2)

[1, 3]

`.collect()` is also an action:

In [11]:
rdd.collect()

[1, 3, 1, 2, 2, 4]

## DataFrame transformations

Spark DataFrames have semantics more like SQL than Pandas. To construct a DataFrame from an existing one, we 'select' from it as if it were a SQL table:

In [12]:
df.select("name", "age").toPandas()

Unnamed: 0,name,age
0,Andrew,31
1,Nadine,27
2,Victor,35
3,Oana,29


To transform the columns of a DataFrame, we can apply Spark functions provided in PySpark:

In [13]:
import pyspark.sql.functions as func
df.select(
    "name", func.length("name")
).toPandas()

Unnamed: 0,name,length(name)
0,Andrew,6
1,Nadine,6
2,Victor,6
3,Oana,4


`pyspark.sql.functions.col()` allows you to select a column then apply some basic operations to it:

In [14]:
df.select(
    "name", func.col("gender") == "female"
).toPandas()

Unnamed: 0,name,(gender = female)
0,Andrew,False
1,Nadine,True
2,Victor,False
3,Oana,True


You can also rename the columns in the generated DataFrame:

In [15]:
df.select(
    "name", func.length("name").alias("len")
).toPandas()

Unnamed: 0,name,len
0,Andrew,6
1,Nadine,6
2,Victor,6
3,Oana,4


To define more custom logic, create a Spark user defined function (UDF):

In [16]:
from pyspark.sql import types
capitalise = func.udf(
    lambda s: s.upper(),  # First argument is a Python function
    types.StringType()  # Second argument is the return type of the function
)
df.select(capitalise("name")).toPandas()

Unnamed: 0,<lambda>(name)
0,ANDREW
1,NADINE
2,VICTOR
3,OANA


You can also append columns to DataFrames as an alternative to `.select()`:

In [17]:
df.withColumn(
    "capitalised_name", capitalise("name")
).toPandas()

Unnamed: 0,name,gender,age,capitalised_name
0,Andrew,male,31,ANDREW
1,Nadine,female,27,NADINE
2,Victor,male,35,VICTOR
3,Oana,female,29,OANA


Some more examples:

In [18]:
df.filter(df.age < 30).toPandas()

Unnamed: 0,name,gender,age
0,Nadine,female,27
1,Oana,female,29


In [19]:
df.groupBy("gender").count().toPandas()

Unnamed: 0,gender,count
0,female,2
1,male,2


In [20]:
df.groupBy("gender").avg("age").toPandas()  # Also min, max, etc.

Unnamed: 0,gender,avg(age)
0,female,28.0
1,male,33.0


You can also 'join' DataFrames like you might join tables in SQL. Let's first create another DataFrame to join on to:

In [21]:
pandas_purchases = pandas.DataFrame({
    "name": ["Andrew", "Nadine", "Andrew"],
    "value": [23.21, 14.23, 8.23]
})
purchases = spark.createDataFrame(pandas_purchases)

In [22]:
df.join(purchases, df.name == purchases.name).toPandas()

Unnamed: 0,name,gender,age,name.1,value
0,Nadine,female,27,Nadine,14.23
1,Andrew,male,31,Andrew,23.21
2,Andrew,male,31,Andrew,8.23


## Other useful DataFrame methods

`.printSchema()` allows you to check the structure of a DataFrame without evaluating it:

In [23]:
transformed = df.select(
    "name",
    "age",
    func.length("name").alias("len")
)
transformed.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- len: integer (nullable = true)



`.cache()` allows you to tell Spark to keep a DataFrame in memory, if it can. This is useful as if you have a DataFrame that is expensive to compute, you can avoid it getting re-evaluated many times in an interactive session:

In [24]:
transformed_cached = df.select(
    "name",
    "age",
    func.length("name").alias("len")
).cache()