# PySpark Training - Manipulating DataFrames

-------
# Instantiating a Spark session

Open a Spark session available in our notebook as the `spark` object. In the background, YARN allocate CPU and RAM resources to create a driver and one or more executor JVMs.

In [1]:
import findspark
findspark.init()

# Create a spark-session (akin to what pyspark provides when it is started)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
type(spark)

pyspark.sql.session.SparkSession

In [3]:
spark

Check the YARN UI: [http://slalomdsvm:8088/ui2/](http://slalomdsvm:8088/ui2/)

In [4]:
! yarn application -list

20/08/10 16:57:11 INFO client.RMProxy: Connecting to ResourceManager at slalomdsvm/10.0.2.15:8050
20/08/10 16:57:11 INFO client.AHSProxy: Connecting to Application History server at slalomdsvm/10.0.2.15:10200
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL
application_1596825752622_0008	       pyspark-shell	               SPARK	   vagrant	   default	           RUNNING	         UNDEFINED	            10%	             http://slalomdsvm:4040


Use the built-in `help()` function to take a look at the methods supported by the class `pyspark.sql.session.SparkSession`.

In [5]:
import pyspark
help(pyspark.sql.session.SparkSession)

Help on class SparkSession in module pyspark.sql.session:

class SparkSession(builtins.object)
 |  The entry point to programming Spark with the Dataset and DataFrame API.
 |  
 |  A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as
 |  tables, execute SQL over tables, cache tables, and read parquet files.
 |  To create a SparkSession, use the following builder pattern:
 |  
 |  >>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()
 |  
 |  .. autoattribute:: builder
 |     :annotation:
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax.
 |      
 |      .. versionadded:: 2.0
 |  
 |  __exit__(self, exc_type, exc_val, exc_tb)
 |      Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax.
 |      
 |    

----
# Checking Spark session options

In [6]:
sc = spark.sparkContext
sc.getConf().getAll()

[('spark.history.kerberos.keytab', 'none'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.driver.appUIAddress', 'http://slalomdsvm:4040'),
 ('spark.history.ui.port', '18081'),
 ('spark.driver.memory', '512M'),
 ('spark.driver.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.history.fs.cleaner.interval', '7d'),
 ('spark.shuffle.io.serverThreads', '128'),
 ('spark.yarn.historyServer.address', 'slalomdsvm:18081'),
 ('spark.ui.proxyBase', '/proxy/application_1596825752622_0008'),
 ('spark.executor.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.sql.streaming.streamingQueryListeners', ''),
 ('spark.sql.statistics.fallBackToHdfs', 'true'),
 ('spark.executorEnv.PYTHONPATH',
  '{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.shuffle.file.buffer', '1m'),
 ('spark.history.provider',
  'org.apache.spark.deploy.hist

In [7]:
#import pandas as pd

---
# Create simple dataframes

We will be mostly working with small dataframes to allow us to quickly visualize and check the outcome of the operations we will apply to the dataframes.

In [8]:
col_names = ['id', 'first_name', 'last_name']
rows = [
    (1, 'John', 'Doe'),
    (1, 'John', 'Doe'), 
    (1, 'John', None), 
    (2, 'Jane', 'Doe'),
    (3, 'Herbie', 'Hancock'),
    (4, 'Erin', 'Brockovich'),        
]

df1 = spark.createDataFrame(rows, col_names)

In [9]:
type(df1)

pyspark.sql.dataframe.DataFrame

`number_sox` is the number of individual sox owned by the person with ID `id`.

In [10]:
col_names = ['id', 'number_sox']
rows = [
    (1, 24),
    (2, 30),
    (3, 29),
    (4, 40),        
]

df2 = spark.createDataFrame(rows, col_names)

In [11]:
type(df2)

pyspark.sql.dataframe.DataFrame

See all the methods which can be applie to a Spark dataframe using the `help()` function.

In [12]:
help(pyspark.sql.dataframe.DataFrame)

Help on class DataFrame in module pyspark.sql.dataframe:

class DataFrame(builtins.object)
 |  A distributed collection of data grouped into named columns.
 |  
 |  A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
 |  and can be created using various functions in :class:`SparkSession`::
 |  
 |      people = spark.read.parquet("...")
 |  
 |  Once created, it can be manipulated using the various domain-specific-language
 |  (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
 |  
 |  To select a column from the data frame, use the apply method::
 |  
 |      ageCol = people.age
 |  
 |  A more concrete example::
 |  
 |      # To create DataFrame using SparkSession
 |      people = spark.read.parquet("...")
 |      department = spark.read.parquet("...")
 |  
 |      people.filter(people.age > 30).join(department, people.deptId == department.id) \
 |        .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
 |  
 |  .. versionadde

---
# Printing the dataframe schema

## Question

Use the method `.printSchema()` to see the schema of dataframes `df1` and `df2`.

In [13]:
help(pyspark.sql.dataframe.DataFrame.printSchema)

Help on function printSchema in module pyspark.sql.dataframe:

printSchema(self)
    Prints out the schema in the tree format.
    
    >>> df.printSchema()
    root
     |-- age: integer (nullable = true)
     |-- name: string (nullable = true)
    <BLANKLINE>
    
    .. versionadded:: 1.3



## Your answer

## Solution

In [14]:
df1.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



In [15]:
df2.printSchema()

root
 |-- id: long (nullable = true)
 |-- number_sox: long (nullable = true)



---
# Printing the first dataframe rows

## Question
Use the method `show()` to see the rows of `df1` and `df2`.

In [16]:
help(pyspark.sql.dataframe.DataFrame.show)

Help on function show in module pyspark.sql.dataframe:

show(self, n=20, truncate=True, vertical=False)
    Prints the first ``n`` rows to the console.
    
    :param n: Number of rows to show.
    :param truncate: If set to True, truncate strings longer than 20 chars by default.
        If set to a number greater than one, truncates long strings to length ``truncate``
        and align cells right.
    :param vertical: If set to True, print output rows vertically (one line
        per column value).
    
    >>> df
    DataFrame[age: int, name: string]
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> df.show(truncate=3)
    +---+----+
    |age|name|
    +---+----+
    |  2| Ali|
    |  5| Bob|
    +---+----+
    >>> df.show(vertical=True)
    -RECORD 0-----
     age  | 2
     name | Alice
    -RECORD 1-----
     age  | 5
     name | Bob
    
    .. versionadded:: 1.3



##  Your answer

## Solution

Notes:

  * Asking for `df1` only shows the reference to the object. This is beacuse of Spark's lazy evaluation: the dataframe has not been created yet (`createDataFrame()` is a transformation).
  * The dataframe will only be created when we apply an action to it like `show()`.
  * Note the difference in duration between `df1` and `df1.show()`.

In [17]:
df1

DataFrame[id: bigint, first_name: string, last_name: string]

In [18]:
df1.show()

+---+----------+----------+
| id|first_name| last_name|
+---+----------+----------+
|  1|      John|       Doe|
|  1|      John|       Doe|
|  1|      John|      null|
|  2|      Jane|       Doe|
|  3|    Herbie|   Hancock|
|  4|      Erin|Brockovich|
+---+----------+----------+



In [19]:
df2

DataFrame[id: bigint, number_sox: bigint]

In [20]:
df2.show()

+---+----------+
| id|number_sox|
+---+----------+
|  1|        24|
|  2|        30|
|  3|        29|
|  4|        40|
+---+----------+



----------
# Joining dataframes

## Question
Use the method `join()` to inner-join `df1` and `df2` on column `id`.

Put the resulting dataframe in `joined_df`.

In [21]:
help(pyspark.sql.dataframe.DataFrame.join)

Help on function join in module pyspark.sql.dataframe:

join(self, other, on=None, how=None)
    Joins with another :class:`DataFrame`, using the given join expression.
    
    :param other: Right side of the join
    :param on: a string for the join column name, a list of column names,
        a join expression (Column), or a list of Columns.
        If `on` is a string or a list of strings indicating the name of the join column(s),
        the column(s) must exist on both sides, and this performs an equi-join.
    :param how: str, default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
        ``full``, ``full_outer``, ``left``, ``left_outer``, ``right``, ``right_outer``,
        ``left_semi``, and ``left_anti``.
    
    The following performs a full outer join between ``df1`` and ``df2``.
    
    >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
    [Row(name=None, height=80), Row(name='Bob', height=85), Row(name='Alice', height=None

##  Your answer

## Solution

In [22]:
joined_df = df1.join(df2, 'id', how = 'inner')

joined_df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  1|      John|       Doe|        24|
|  1|      John|      null|        24|
|  3|    Herbie|   Hancock|        29|
|  2|      Jane|       Doe|        30|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



---
# Keeping distinct values

## Question
Use the method `distinct()` to keep unique rows only in `joined_df`.

Put the resulting dataframe in `distinct_df`.

In [23]:
help(pyspark.sql.dataframe.DataFrame.distinct)

Help on function distinct in module pyspark.sql.dataframe:

distinct(self)
    Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
    
    >>> df.distinct().count()
    2
    
    .. versionadded:: 1.3



##  Your answer

## Solution

In [24]:
distinct_df = joined_df.distinct() \

distinct_df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  1|      John|      null|        24|
|  3|    Herbie|   Hancock|        29|
|  2|      Jane|       Doe|        30|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



---
# Removing rows containing `null` values

## Question
Use the method `dropna()` to remove rows containing `null` values in `distinct_df`.
Put the resulting dataframe in `df`.

In [25]:
help(pyspark.sql.dataframe.DataFrame.dropna)

Help on function dropna in module pyspark.sql.dataframe:

dropna(self, how='any', thresh=None, subset=None)
    Returns a new :class:`DataFrame` omitting rows with null values.
    :func:`DataFrame.dropna` and :func:`DataFrameNaFunctions.drop` are aliases of each other.
    
    :param how: 'any' or 'all'.
        If 'any', drop a row if it contains any nulls.
        If 'all', drop a row only if all its values are null.
    :param thresh: int, default None
        If specified, drop rows that have less than `thresh` non-null values.
        This overwrites the `how` parameter.
    :param subset: optional list of column names to consider.
    
    >>> df4.na.drop().show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    | 10|    80|Alice|
    +---+------+-----+
    
    .. versionadded:: 1.3.1



##  Your answer

## Solution

In [26]:
df = distinct_df.dropna()

df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  3|    Herbie|   Hancock|        29|
|  2|      Jane|       Doe|        30|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



---
# Selecting columns

## Question
Use the method `select()` to select the columns `id` and `first_name` from `df`.

Put the resulting dataframe in `selected_df`.

In [27]:
help(pyspark.sql.dataframe.DataFrame.select)

Help on function select in module pyspark.sql.dataframe:

select(self, *cols)
    Projects a set of expressions and returns a new :class:`DataFrame`.
    
    :param cols: list of column names (string) or expressions (:class:`Column`).
        If one of the column names is '*', that column is expanded to include all columns
        in the current DataFrame.
    
    >>> df.select('*').collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.select('name', 'age').collect()
    [Row(name='Alice', age=2), Row(name='Bob', age=5)]
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name='Alice', age=12), Row(name='Bob', age=15)]
    
    .. versionadded:: 1.3



##  Your answer

## Solution

In [28]:
selected_df = df.select('id', 'first_name')

selected_df.show()

+---+----------+
| id|first_name|
+---+----------+
|  1|      John|
|  3|    Herbie|
|  2|      Jane|
|  4|      Erin|
+---+----------+



Note: You can get a column object with:

In [29]:
df.id

Column<b'id'>

---
# Filtering rows

## Question
Use the method `filter()` to keep only the rows from `df` where `number_sox <= 25)` OR `number_sox >= 35`.

Put the resulting dataframe in `filtered_df`.

In [30]:
help(pyspark.sql.dataframe.DataFrame.filter)

Help on function filter in module pyspark.sql.dataframe:

filter(self, condition)
    Filters rows using the given condition.
    
    :func:`where` is an alias for :func:`filter`.
    
    :param condition: a :class:`Column` of :class:`types.BooleanType`
        or a string of SQL expression.
    
    >>> df.filter(df.age > 3).collect()
    [Row(age=5, name='Bob')]
    >>> df.where(df.age == 2).collect()
    [Row(age=2, name='Alice')]
    
    >>> df.filter("age > 3").collect()
    [Row(age=5, name='Bob')]
    >>> df.where("age = 2").collect()
    [Row(age=2, name='Alice')]
    
    .. versionadded:: 1.3



##  Your answer

## Solution

In [31]:
filtered_df = df.filter((df.number_sox <= 25) | (df.number_sox >= 35))

filtered_df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



---
# Randomizing rows

## Question
Using the functions `orderBy` and `rand()`, randomize the order of the dataframe rows.

Ensure the results of the randomization operation are reproducible.

Put the resulting dataframe in `randomized_df`.

In [32]:
help(pyspark.sql.dataframe.DataFrame.orderBy)

Help on function sort in module pyspark.sql.dataframe:

sort(self, *cols, **kwargs)
    Returns a new :class:`DataFrame` sorted by the specified column(s).
    
    :param cols: list of :class:`Column` or column names to sort by.
    :param ascending: boolean or list of boolean (default True).
        Sort ascending vs. descending. Specify list for multiple sort orders.
        If a list is specified, length of the list must equal length of the `cols`.
    
    >>> df.sort(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.sort("age", ascending=False).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice

In [33]:
from pyspark.sql.functions import rand
help(pyspark.sql.functions.rand)

Help on function rand in module pyspark.sql.functions:

rand(seed=None)
    Generates a random column with independent and identically distributed (i.i.d.) samples
    from U[0.0, 1.0].
    
    >>> df.withColumn('rand', rand(seed=42) * 3).collect()
    [Row(age=2, name='Alice', rand=1.1568609015300986),
     Row(age=5, name='Bob', rand=1.403379671529166)]
    
    .. versionadded:: 1.4



##  Your answer

## Solution

In [34]:
from pyspark.sql.functions import rand

randomized_df = df.orderBy(rand(seed=1234))
randomized_df.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  2|      Jane|       Doe|        30|
|  1|      John|       Doe|        24|
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



---
# Ordering rows

## Question
Using the functions `orderBy`, `asc()` and `desc()`, order the dataframe rows:

 * ascending by `id`.
 * ascending by `last_name`, `number_sox`.
 * ascending by `last_name` and decending by `number_sox`.

In [35]:
help(pyspark.sql.dataframe.DataFrame.orderBy)

Help on function sort in module pyspark.sql.dataframe:

sort(self, *cols, **kwargs)
    Returns a new :class:`DataFrame` sorted by the specified column(s).
    
    :param cols: list of :class:`Column` or column names to sort by.
    :param ascending: boolean or list of boolean (default True).
        Sort ascending vs. descending. Specify list for multiple sort orders.
        If a list is specified, length of the list must equal length of the `cols`.
    
    >>> df.sort(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.sort("age", ascending=False).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice

In [36]:
help(pyspark.sql.functions.asc)

Help on function asc in module pyspark.sql.functions:

asc(col)
    Returns a sort expression based on the ascending order of the given column name.
    
    .. versionadded:: 1.3



In [37]:
help(pyspark.sql.functions.desc)

Help on function desc in module pyspark.sql.functions:

desc(col)
    Returns a sort expression based on the descending order of the given column name.
    
    .. versionadded:: 1.3



## Your answer

In [38]:
# Ascending by id


In [39]:
# Ascending by last_name, number_sox


In [40]:
# Ascending by last_name and decending by number_sox


## Solution

Note: `orderBy()` will produce an ascending ordering by default.

In [41]:
from pyspark.sql.functions import asc

randomized_df \
.orderBy(asc('id')) \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  2|      Jane|       Doe|        30|
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



In [42]:
randomized_df \
.orderBy('id') \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  1|      John|       Doe|        24|
|  2|      Jane|       Doe|        30|
|  3|    Herbie|   Hancock|        29|
|  4|      Erin|Brockovich|        40|
+---+----------+----------+----------+



In [43]:
randomized_df \
.orderBy('last_name', 'number_sox') \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  4|      Erin|Brockovich|        40|
|  1|      John|       Doe|        24|
|  2|      Jane|       Doe|        30|
|  3|    Herbie|   Hancock|        29|
+---+----------+----------+----------+



In [44]:
from pyspark.sql.functions import desc

randomized_df \
.orderBy('last_name', desc('number_sox')) \
.show()

+---+----------+----------+----------+
| id|first_name| last_name|number_sox|
+---+----------+----------+----------+
|  4|      Erin|Brockovich|        40|
|  2|      Jane|       Doe|        30|
|  1|      John|       Doe|        24|
|  3|    Herbie|   Hancock|        29|
+---+----------+----------+----------+



---
# Row-wise operations ("Lamda" functions) and User Defined Functions (UDFs)


We are now going to see how to implement row-wise operations on a Spark dataframe (equivalent to Pandas lambda functions). This requires to define either a "regular" function or a lambda function via a User Defined Function (UDF) and then apply the UDF to all the dataframe rows.

## Question
Define a regular function which:

 1. Takes the total number of individual sock as input.
 2. Divides that number by 2 and floors it to get an integer number of pairs of complete sox.
 3. Returns the number of pairs of sox.

Then create a UDF from this function using the function `udf()` (the type of the values returned by the UDF must be `IntegerType`).

Finally apply the UDF to the dataframe `df` to add a new column (`number_sox_pair`) containing the number of complete pairs of sox. Put the resulting dataframe in `df_with_pairs`.

Once this is working, repeat using a `lambda` function instead of a "regular" function.

In [45]:
help(pyspark.sql.functions.udf)

Help on function udf in module pyspark.sql.functions:

udf(f=None, returnType=StringType)
    Creates a user defined function (UDF).
    
    .. note:: The user-defined functions are considered deterministic by default. Due to
        optimization, duplicate invocations may be eliminated or the function may even be invoked
        more times than it is present in the query. If your function is not deterministic, call
        `asNondeterministic` on the user defined function. E.g.:
    
    >>> from pyspark.sql.types import IntegerType
    >>> import random
    >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
    
    .. note:: The user-defined functions do not support conditional expressions or short circuiting
        in boolean expressions and it ends up with being executed all internally. If the functions
        can fail on special rows, the workaround is to incorporate the condition into the functions.
    
    .. note:: The user-defined

In [46]:
help(pyspark.sql.types.IntegerType)

Help on class IntegerType in module pyspark.sql.types:

class IntegerType(IntegralType)
 |  Int data type, i.e. a signed 32-bit integer.
 |  
 |  Method resolution order:
 |      IntegerType
 |      IntegralType
 |      NumericType
 |      AtomicType
 |      DataType
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  simpleString(self)
 |  
 |  ----------------------------------------------------------------------
 |  Data and other attributes inherited from IntegralType:
 |  
 |  __metaclass__ = <class 'pyspark.sql.types.DataTypeSingleton'>
 |      Metaclass for DataType
 |  
 |  ----------------------------------------------------------------------
 |  Methods inherited from DataType:
 |  
 |  __eq__(self, other)
 |      Return self==value.
 |  
 |  __hash__(self)
 |      Return hash(self).
 |  
 |  __ne__(self, other)
 |      Return self!=value.
 |  
 |  __repr__(self)
 |      Return repr(self).
 |  
 |  fromInternal(self, obj)
 |      Converts an internal SQL object in

##  Your answer

## Solution

In [47]:
# Regular function version

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType


# Define the UDF using a "regular" function
def sox_pairs(x):
    return round(float(x)/2.0)
    
sox_pairs_udf = udf(lambda x: sox_pairs(x), IntegerType())


# Apply the UDF to the DF
df_with_pairs = df \
.withColumn("number_sox_pair", sox_pairs_udf("number_sox"))


# Show the results
df_with_pairs.show()

+---+----------+----------+----------+---------------+
| id|first_name| last_name|number_sox|number_sox_pair|
+---+----------+----------+----------+---------------+
|  1|      John|       Doe|        24|             12|
|  3|    Herbie|   Hancock|        29|             14|
|  2|      Jane|       Doe|        30|             15|
|  4|      Erin|Brockovich|        40|             20|
+---+----------+----------+----------+---------------+



In [48]:
# Lambda function version

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType


# Create the UDF directly from a lambda function
sox_pairs_udf_l = udf(lambda x: round(float(x)/2.0), IntegerType())


# Apply the UDF to the DF
df_with_pairs = df \
.withColumn("number_sox_pair", sox_pairs_udf_l("number_sox"))

# Show the results
df_with_pairs.show()

+---+----------+----------+----------+---------------+
| id|first_name| last_name|number_sox|number_sox_pair|
+---+----------+----------+----------+---------------+
|  1|      John|       Doe|        24|             12|
|  3|    Herbie|   Hancock|        29|             14|
|  2|      Jane|       Doe|        30|             15|
|  4|      Erin|Brockovich|        40|             20|
+---+----------+----------+----------+---------------+



---
# Aggregation

## Question

Calculate the total number of individual sox owned by each family using the functions `groupBy()`, `agg()` and `sum()`.

Name the column with the number of sox `number_sox_per_family` using the method `alias()`.

Put the resulting dataframe in `counts_df`.

In [49]:
help(pyspark.sql.dataframe.DataFrame.groupBy)

Help on function groupBy in module pyspark.sql.dataframe:

groupBy(self, *cols)
    Groups the :class:`DataFrame` using the specified columns,
    so we can run aggregation on them. See :class:`GroupedData`
    for all the available aggregate functions.
    
    :func:`groupby` is an alias for :func:`groupBy`.
    
    :param cols: list of columns to group by.
        Each element should be a column name (string) or an expression (:class:`Column`).
    
    >>> df.groupBy().avg().collect()
    [Row(avg(age)=3.5)]
    >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
    [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
    >>> sorted(df.groupBy(df.name).avg().collect())
    [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
    >>> sorted(df.groupBy(['name', df.age]).count().collect())
    [Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)]
    
    .. versionadded:: 1.3



In [50]:
help(pyspark.sql.dataframe.DataFrame.agg)

Help on function agg in module pyspark.sql.dataframe:

agg(self, *exprs)
    Aggregate on the entire :class:`DataFrame` without groups
    (shorthand for ``df.groupBy.agg()``).
    
    >>> df.agg({"age": "max"}).collect()
    [Row(max(age)=5)]
    >>> from pyspark.sql import functions as F
    >>> df.agg(F.min(df.age)).collect()
    [Row(min(age)=2)]
    
    .. versionadded:: 1.3



In [51]:
help(pyspark.sql.functions.sum)

Help on function sum in module pyspark.sql.functions:

sum(col)
    Aggregate function: returns the sum of all values in the expression.
    
    .. versionadded:: 1.3



In [52]:
help(pyspark.sql.dataframe.DataFrame.alias)

Help on function alias in module pyspark.sql.dataframe:

alias(self, alias)
    Returns a new :class:`DataFrame` with an alias set.
    
    >>> from pyspark.sql.functions import *
    >>> df_as1 = df.alias("df_as1")
    >>> df_as2 = df.alias("df_as2")
    >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
    >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()
    [Row(name='Bob', name='Bob', age=5), Row(name='Alice', name='Alice', age=2)]
    
    .. versionadded:: 1.3



##  Your answer

## Solution

In [53]:
from pyspark.sql.functions import sum

counts_df = df  \
.groupBy('last_name') \
.agg(sum('number_sox') \
.alias('number_sox_per_family'))

counts_df.show()

+----------+---------------------+
| last_name|number_sox_per_family|
+----------+---------------------+
|   Hancock|                   29|
|       Doe|                   54|
|Brockovich|                   40|
+----------+---------------------+



Note: The concept of UDF can be taken one step futher to be applicable to aggregation situation, these UDFs are called UDAFs (User Defined Aggregated Functions). Ex: if you want to apply an advanced algorithm to a dataframe in an aggregated fashion: you would need to create a UDAF which implements the algorithm and takes care of grouped computation. This is a very advanced topic and is not covered in this training.

---
# Exploding a "list" column into rows

## Question
Sometime, columns contain lists of values and it is useful to expand (or explode) each all the values in the lists into multiple rows.

Using the function `explode()`, expand the column `list` into multiple rows and rename the column `list` to `item`

The resulting dataframe will have 2 columns: `id` and `item` and will be named `exploded_df`.

Hint: you will need to `select()` the columns you want in the resulting dataframe.

In [54]:
col_names = ['id', 'list']
rows = [
    (1, ['A', 'B']),
    (2, ['C']),
    (3, ['D', 'D']),
    (4, ['E', 'F']),        
]

list_df = spark.createDataFrame(rows, col_names)
list_df.show()

+---+------+
| id|  list|
+---+------+
|  1|[A, B]|
|  2|   [C]|
|  3|[D, D]|
|  4|[E, F]|
+---+------+



In [55]:
help(pyspark.sql.functions.explode)

Help on function explode in module pyspark.sql.functions:

explode(col)
    Returns a new row for each element in the given array or map.
    
    >>> from pyspark.sql import Row
    >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
    [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
    
    >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
    +---+-----+
    |key|value|
    +---+-----+
    |  a|    b|
    +---+-----+
    
    .. versionadded:: 1.4



##  Your answer

## Solution

In [56]:
from pyspark.sql.functions import explode

exploded_df = list_df \
.select('id', explode('list').alias('item'))

exploded_df.show()

+---+----+
| id|item|
+---+----+
|  1|   A|
|  1|   B|
|  2|   C|
|  3|   D|
|  3|   D|
|  4|   E|
|  4|   F|
+---+----+



---
# Grouping multiple rows into a "list" column

## Question

Recreate `list_df` from `exploded_df`:

Group the rows from `exploded_df` by `id` and put all the `item` from the original rows for each `id` in `exploded_df` in a list using the functions `collect_list()`, `groupyBy()`,` agg()`.

Name the resulting -"list-column" `list`.

In [57]:
help(pyspark.sql.functions.collect_list)

Help on function collect_list in module pyspark.sql.functions:

collect_list(col)
    Aggregate function: returns a list of objects with duplicates.
    
    >>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
    >>> df2.agg(collect_list('age')).collect()
    [Row(collect_list(age)=[2, 5, 5])]
    
    .. versionadded:: 1.6



##  Your answer

## Solution

In [58]:
from pyspark.sql.functions import collect_list

exploded_df \
.groupBy('id') \
.agg(collect_list('item') \
.alias('list')) \
.show()

+---+------+
| id|  list|
+---+------+
|  1|[A, B]|
|  3|[D, D]|
|  2|   [C]|
|  4|[E, F]|
+---+------+



Note: multiple rows can be grouped into a `list` or a `set` with one row per `list` or `set`. Order is conserved for lists, sets do not have the concept of order so the original ordering information from the rows will be lost.

In [59]:
from pyspark.sql.functions import collect_set

exploded_df \
.groupBy('id') \
.agg(collect_set('item') \
.alias('set')) \
.show()

+---+------+
| id|   set|
+---+------+
|  1|[B, A]|
|  3|   [D]|
|  2|   [C]|
|  4|[F, E]|
+---+------+



---
# Loading data from a CSV file on HDFS into a Spark dataframe

## Question

Using the method `spark.read.option().option().csv(path)`, load a csv file located on HDFS at `csv_path` into the dataframe `temperature_df`.

The Spark session object (named `spark`) was already instantiated in the first cells of the notebook.

In [60]:
csv_path = "/user/vagrant/data/earth-surface-temperature/csv/GlobalLandTemperaturesByMajorCity.csv"

In [61]:
! hdfs dfs -ls -h $csv_path

-rw-r--r--   1 vagrant hdfs     13.5 M 2020-08-03 13:29 /user/vagrant/data/earth-surface-temperature/csv/GlobalLandTemperaturesByMajorCity.csv


In [62]:
type(spark)

pyspark.sql.session.SparkSession

In [63]:
help(pyspark.sql.session.SparkSession.read)

Help on property:

    Returns a :class:`DataFrameReader` that can be used to read data
    in as a :class:`DataFrame`.
    
    :return: :class:`DataFrameReader`
    
    .. versionadded:: 2.0



In [64]:
help(pyspark.sql.readwriter.DataFrameReader.csv)

Help on function csv in module pyspark.sql.readwriter:

csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None)
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, disable
    ``inferSchema`` option or specify the schema explicitly using ``schema``.
    
    :param path: string, or list of strings, for input path(s),
                 or RDD of Strings storing CSV rows.
    :param schema: an optional :class:`pyspar

##  Your answer

## Solution

In [65]:
temperature_df = spark \
.read \
.option("header", "true") \
.option("inferschema", "true") \
.option("mode", "DROPMALFORMED") \
.csv(csv_path)

temperature_df.show()

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1849-01-01 00:00:00|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-02-01 00:00:00|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-03-01 00:00:00|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-04-01 00:00:00|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-05-01 00:00:00|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-06-01 00:00:00|            24.844|                        1.402|Abidjan|Côte D'Ivoire|   5.63N|   

---
# Sampling rows

## Question

Sample 10% of `temperature_df`'s rows using the method `sample()`.

Make sure it is possible to reproduce the random selection.

Put the resulting dataframe in `sampled_df`.

Use the method `count()` to count the number of rows and verify the sampling is correct.

In [66]:
help(pyspark.sql.dataframe.DataFrame.sample)

Help on function sample in module pyspark.sql.dataframe:

sample(self, withReplacement=None, fraction=None, seed=None)
    Returns a sampled subset of this :class:`DataFrame`.
    
    :param withReplacement: Sample with replacement or not (default False).
    :param fraction: Fraction of rows to generate, range [0.0, 1.0].
    :param seed: Seed for sampling (default a random seed).
    
    .. note:: This is not guaranteed to provide exactly the fraction specified of the total
        count of the given :class:`DataFrame`.
    
    .. note:: `fraction` is required and, `withReplacement` and `seed` are optional.
    
    >>> df = spark.range(10)
    >>> df.sample(0.5, 3).count()
    4
    >>> df.sample(fraction=0.5, seed=3).count()
    4
    >>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
    1
    >>> df.sample(1.0).count()
    10
    >>> df.sample(fraction=1.0).count()
    10
    >>> df.sample(False, fraction=1.0).count()
    10
    
    .. versionadded:: 1.3



In [67]:
help(pyspark.sql.dataframe.DataFrame.count)

Help on function count in module pyspark.sql.dataframe:

count(self)
    Returns the number of rows in this :class:`DataFrame`.
    
    >>> df.count()
    2
    
    .. versionadded:: 1.3



##  Your answer

## Solution

In [68]:
temperature_df.count()

239177

In [69]:
sampled_df = temperature_df.sample(fraction = 0.1, seed = 1234)

sampled_df.count()

23828

---
# Writing a Spark dataframe to a parquet file on HDFS

## Question

Parquet is a great format to persist tabular data. It performs especially well for dataframes which have columns with values repeating on contigous rows.

Use `df.write.parquet(path)` to persist `temperature_df` to HDFS at `parquet_path` using the parquet format.

In [70]:
parquet_path = "/user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity"

In [71]:
help(pyspark.sql.dataframe.DataFrame.write)

Help on property:

    Interface for saving the content of the non-streaming :class:`DataFrame` out into external
    storage.
    
    :return: :class:`DataFrameWriter`
    
    .. versionadded:: 1.4



In [72]:
help(pyspark.sql.readwriter.DataFrameWriter.parquet)

Help on function parquet in module pyspark.sql.readwriter:

parquet(self, path, mode=None, partitionBy=None, compression=None)
    Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
    
    :param path: the path in any Hadoop supported file system
    :param mode: specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already                 exists.
    :param partitionBy: names of partitioning columns
    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, snappy, gzip, and lzo).
                        This will override ``spark.sql.parq

##  Your answer

## Solution

In [73]:
# Cleanup the path is something else is already there
! hdfs dfs -rm -R $parquet_path

20/08/10 16:58:25 INFO fs.TrashPolicyDefault: Moved: 'hdfs://slalomdsvm:8020/user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity' to trash at: hdfs://slalomdsvm:8020/user/vagrant/.Trash/Current/user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity1597103905375


In [74]:
# Default write, will write to multiple files (due to number reducers in Spark's internal architecture)
temperature_df \
.write \
.parquet(parquet_path)

In [75]:
! hdfs dfs -ls -R -h $parquet_path

-rw-r--r--   1 vagrant hdfs          0 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/_SUCCESS
-rw-r--r--   1 vagrant hdfs    854.8 K 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/part-00000-8b9e9c92-45a8-4c7f-af54-dffefec21895-c000.snappy.parquet
-rw-r--r--   1 vagrant hdfs    567.4 K 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/part-00001-8b9e9c92-45a8-4c7f-af54-dffefec21895-c000.snappy.parquet


The df will be written to HDFS in a directory containing multiple files (one per Spark reducer). To control the number of output files:

In [76]:
#(note the mode `overwrite` to avoid having to delete the directory beforehand if it already exists)

n_files = 1

temperature_df \
.coalesce(n_files) \
.write \
.mode('overwrite') \
.parquet(parquet_path)

In [77]:
! hdfs dfs -ls -R -h $parquet_path

-rw-r--r--   1 vagrant hdfs          0 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/_SUCCESS
-rw-r--r--   1 vagrant hdfs      1.2 M 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/part-00000-ad866dee-10ac-44d3-affa-441e607f8e45-c000.snappy.parquet


Parquet also provides a very convenient partitioning functionality, the data for each single value in the partition will be under its own directory on HDFS.
In-file indexing  is comping up but requires installation of additional libraries and is not ubiquitous yet.

In [78]:
temperature_df \
.orderBy('Country', 'City', 'dt') \
.write \
.partitionBy('Country') \
.mode('overwrite') \
.parquet(parquet_path)

In [79]:
! hdfs dfs -ls -R -h $parquet_path | head -20

drwxr-xr-x   - vagrant hdfs          0 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Afghanistan
-rw-r--r--   1 vagrant hdfs     20.1 K 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Afghanistan/part-00000-fb252b37-e78f-4135-9272-b6db37262217.c000.snappy.parquet
-rw-r--r--   1 vagrant hdfs     16.1 K 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Afghanistan/part-00001-fb252b37-e78f-4135-9272-b6db37262217.c000.snappy.parquet
drwxr-xr-x   - vagrant hdfs          0 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Angola
-rw-r--r--   1 vagrant hdfs      3.4 K 2020-08-10 16:58 /user/vagrant/data/earth-surface-temperature/parquet/GlobalLandTemperaturesByMajorCity/Country=Angola/part-00001-fb252b37-e78f-4135-9272-b6db37262217.c000.sn

Notes: In general, we want to create files with a size of a few times the HDFS block-size (default: 128MB). We want a few large files (YES: ≈100+MB -> ≈1GB), not many small files (NO: ≈1KB -> ≈10MB). It takes a lot longer to read/write many small files.

---
# Collecting a Spark dataframe into a "regular" pandas dataframe

## Question

Using the function `toPandas`, collect `df` from Spark's executor memories into the "regular" Python memory as a Pandas datadrame.

Put the resulting Pandas dataframe in `pandas_df` (not a Spark dataframe anymore).

In [80]:
help(pyspark.sql.dataframe.DataFrame.toPandas)

Help on function toPandas in module pyspark.sql.dataframe:

toPandas(self)
    Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
    
    This is only available if Pandas is installed and available.
    
    .. note:: This method should only be used if the resulting Pandas's DataFrame is expected
        to be small, as all the data is loaded into the driver's memory.
    
    .. note:: Usage with spark.sql.execution.arrow.enabled=True is experimental.
    
    >>> df.toPandas()  # doctest: +SKIP
       age   name
    0    2  Alice
    1    5    Bob
    
    .. versionadded:: 1.3



##  Your answer

## Solution

In [81]:
# Collect into a "classic" pandas dataframe
pandas_df = df.toPandas()
pandas_df

Unnamed: 0,id,first_name,last_name,number_sox
0,1,John,Doe,24
1,3,Herbie,Hancock,29
2,2,Jane,Doe,30
3,4,Erin,Brockovich,40


Note: you may also collect the Spark dataframe into a list of `pyspark.sql.Rows`.

In [82]:
collected_rows = df.collect()

collected_rows

[Row(id=1, first_name='John', last_name='Doe', number_sox=24),
 Row(id=3, first_name='Herbie', last_name='Hancock', number_sox=29),
 Row(id=2, first_name='Jane', last_name='Doe', number_sox=30),
 Row(id=4, first_name='Erin', last_name='Brockovich', number_sox=40)]

---
# Closing the Spark session

## Question

Close the spark session to destroy the executors and release cluster CPU/RAM resources (completes YARN application and destroys executors, driver JVM).
Confirm the YARN application has been terminated by using the YARN UI or shell command `yarn application -list`.

In [83]:
help(pyspark.sql.session.SparkSession.stop)

Help on function stop in module pyspark.sql.session:

stop(self)
    Stop the underlying :class:`SparkContext`.
    
    .. versionadded:: 2.0



##  Your answer

## Solution

Close the `spark` session to destroy the release cluster CPU/RAM resources (completes YARN application and destroys executors, driver JVM).

In [84]:
spark.stop()

Check the YARN UI: [http://slalomdsvm:8088/ui2/](http://slalomdsvm:8088/ui2/)

In [85]:
! yarn application -list

20/08/10 16:59:09 INFO client.RMProxy: Connecting to ResourceManager at slalomdsvm/10.0.2.15:8050
20/08/10 16:59:09 INFO client.AHSProxy: Connecting to Application History server at slalomdsvm/10.0.2.15:10200
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):0
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL
