# Spark DF - Basics

Let's start off with the fundamentals of Spark DataFrame. The functionality in this tutorial has been adapted from 
Chang Hsin Lee. Find out more here:
- https://changhsinlee.com/pyspark-dataframe-basics/

Objective: In this exercise, you'll understand more about DataFrames, how to start a spark session, and carry out some basic data exploration, manipulation and aggregation. 

What is a DataFrame? A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. Find more information here: https://spark.apache.org/docs/latest/sql-programming-guide.html

What is a Spark Session? It provides a single point of entry to interact with Spark's underlying functionality, which allows us to simply program Spark with DataFrame/Dataset APIs. A new Spark Session must be started in each of our notebooks. 

In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [8]:
# Let's read in the data. If you open the dataset, you'll find that each column has a header. We specify that by stating that header=True.
# To make our lives easier, we can also use 'inferSchema' when importing CSVs. This automatically detects data types.
# If you would like to manually change data types, refer to this article: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
df = spark.read.csv("bank-additional-full.csv",inferSchema=True,header=True,sep=";")

## Basic Data Exploration
Now that we've started the session and imported the data, let's explore the data.

In [9]:
# The show method allows you visualise DataFrames in a tabular format. 
df.show()

+---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|        job| marital|          education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 56|  housemaid| married|           basic.4y|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
| 57|   services| married|        high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.

This dataset was originally from Kaggle (https://www.kaggle.com/rouseguy/bankbalanced/data). It's used to predict whether or not a client will subscribe to a term deposit (deposit column) if called by the banks call centre reps. You'll be using a simplified version of the original dataset throughout the DataFrame tutorials, and the full dataset in the binomial logistic regression machine learning exercise too.

In [10]:
# Print schema allows us to visualise the data structure at a high level. 
df.printSchema()

# We can also use head to print a specific amount of rows, so we can get a better understanding of the data points. 
# Note that we have to specify 'print' depending on the method we're using. Otherwise it may not show up!
print(df.head(1))

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: double (nullable = true)
 |-- cons.price.idx: double (nullable = true)
 |-- cons.conf.idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr.employed: double (nullable = true)
 |-- y: string (nullable = true)

[Row(age=56, job='housemaid', marital='married', education='basic.4y', default='no', housing='no', loan='no', contact='telephone', month='may', day_of_week='

In [11]:
# We can use the describe method get some general statistics on our data too. 
df.describe().show()

Py4JJavaError: An error occurred while calling o98.describe.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:347)
	at scala.None$.get(Option.scala:345)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$aggregatableColumns$2.apply(Dataset.scala:235)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$aggregatableColumns$2.apply(Dataset.scala:233)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$aggregatableColumns(Dataset.scala:233)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2093)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2082)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2845)
	at org.apache.spark.sql.Dataset.describe(Dataset.scala:2082)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


From this, you may realise that we should have excluded the non-integer columns. But there is one interesting fact about this table. Martial has a count of 95 and Balance has a count of 96, while the others have a count of 100. Looks like there may be some missing data. We'll handle this in the upcoming data cleaning exercise. 

In [10]:
# Let's select the columns that are integers, and use the describe method again.
# We see that the average age is 41. The average bank account balance is $1,074. 
# And they spoke to call centre reps for approx. 931 seconds on average. 
df.select('age', 'balance', 'duration').describe().show()

+-------+-----------------+------------------+------------------+
|summary|              age|           balance|          duration|
+-------+-----------------+------------------+------------------+
|  count|              100|                96|               100|
|   mean|            40.92|1074.5520833333333|            931.97|
| stddev|9.704398669436122|1709.7039686497387|353.62730638085895|
|    min|               23|              -416|               395|
|    max|               60|             10576|              2087|
+-------+-----------------+------------------+------------------+



## Basic Data Manipulation
The code above shows you how to simply select columns, but there's much more that PySpark can do! Let's dig deeper into data manipulation.

In [11]:
# Let's select the balance column and assign it to a variable. 
bal_col = df.select('balance')

# We can then use the show method on that variable.
bal_col.show()

+-------+
|balance|
+-------+
|   2343|
|   null|
|   1270|
|   null|
|    184|
|      0|
|    830|
|    545|
|      1|
|   null|
|    100|
|    309|
|    199|
|    460|
|    703|
|   3837|
|   null|
|     -8|
|     55|
|    168|
+-------+
only showing top 20 rows



In [13]:
# We can also add columns and manipulate the DataFrame. Let's times balance by 10, and add the output to a new column.
df.withColumn('balance_times_10',df['balance']*10).show()

# Question: If we print the df DataFrame again, why is the 'balance_times_10 column' missing?
df.show()

+---+-----------+--------+---------+-------+-------+----+--------+-------+----------------+
|age|        job| marital|education|balance|housing|loan|duration|deposit|balance_times_10|
+---+-----------+--------+---------+-------+-------+----+--------+-------+----------------+
| 59|     admin.|    null|secondary|   2343|    yes|  no|    1042|    yes|           23430|
| 56|     admin.| married|secondary|   null|     no|  no|    1467|    yes|            null|
| 41| technician| married|secondary|   1270|    yes|  no|    1389|    yes|           12700|
| 55|   services|    null|secondary|   null|    yes|  no|     579|    yes|            null|
| 54|     admin.| married| tertiary|    184|     no|  no|     673|    yes|            1840|
| 42| management|    null| tertiary|      0|    yes| yes|     562|    yes|               0|
| 56| management| married| tertiary|    830|    yes| yes|    1201|    yes|            8300|
| 60|    retired|    null|secondary|    545|    yes|  no|    1030|    yes|      

In [14]:
# Let's try out some additional DataFrame methods.
# How would we identify individuals with a balance above $5,000? Using filter! 
df.filter("balance > 5000").show()

# We can also use more advanced filters. For example, let's see the jobs of people with over $2,500 in their bank account.
df.filter("balance > 2500").select('job','balance').show()

+---+-----------+--------+---------+-------+-------+----+--------+-------+
|age|        job| marital|education|balance|housing|loan|duration|deposit|
+---+-----------+--------+---------+-------+-------+----+--------+-------+
| 51|blue-collar| married|secondary|   7180|    yes|  no|     927|    yes|
| 41|blue-collar|divorced|secondary|   5291|    yes|  no|    1423|    yes|
| 29| management| married| tertiary|  10576|     no|  no|    1224|    yes|
+---+-----------+--------+---------+-------+-------+----+--------+-------+

+-----------+-------+
|        job|balance|
+-----------+-------+
| management|   3837|
| technician|   3285|
|  housemaid|   3923|
|blue-collar|   2823|
| technician|   3652|
|blue-collar|   7180|
|blue-collar|   5291|
| technician|   4580|
| management|  10576|
| technician|   3706|
| management|   4393|
|blue-collar|   4438|
+-----------+-------+



In [15]:
# What if we wanted to identify those that were under 40 and had over $2,500 in their account? 
# We can use multiple conditions.
df.filter("balance > 2500 AND age < 40").select('age','job','balance').show()

+---+-----------+-------+
|age|        job|balance|
+---+-----------+-------+
| 35| management|   3837|
| 29| management|  10576|
| 27| technician|   3706|
| 36|blue-collar|   4438|
+---+-----------+-------+



## Basic Data Aggregation
On top of filtering, we can also group/aggregate data. Let's see how that works. 

In [16]:
df.groupBy('job').mean().show()

+-------------+------------------+------------------+-----------------+
|          job|          avg(age)|      avg(balance)|    avg(duration)|
+-------------+------------------+------------------+-----------------+
|   management|           40.4375|         1771.5625|          1139.75|
|      retired|              55.0|            843.75|          1060.25|
|      unknown|              49.0|             341.0|            520.0|
|self-employed|              31.0|             144.0|            676.0|
|  blue-collar| 39.48571428571429|1050.6470588235295|917.9714285714285|
| entrepreneur|              34.0|            -195.5|            432.5|
|       admin.|              42.0| 524.1538461538462|            815.5|
|   technician|39.411764705882355|1343.3529411764705|980.2941176470588|
|     services|              43.5| 277.6666666666667|          815.125|
|    housemaid|              52.0|            3923.0|            942.0|
|   unemployed|              37.0|             381.0|           

What just happened? Our dataset was grouped by job title (technician, management, etc.) and the average age, balance and duration for each job was calculated. Why only these three? Because mean() automatically filters out any non-numeric features. But in most cases, it's good practice to sort. Let's see how that's done. 

In [17]:
# To simplify things, let's split this into two steps. First, let's create a variable then order by age.
# Careful when using show()! Otherwise the variable type will change and you won't be able to order it. 
group_job_df = df.groupBy('job').mean()

# Note that we have to use 'avg(age)' instead of age. Why? Because when you use mean(), it changes the feature's name (as you can see below).
print("Sorted by Age")
group_job_df.orderBy('avg(age)').show()

# Let's see what this looks like in one line.
print("Sorted by Balance")
df.groupBy('job').mean().orderBy('avg(balance)').show()

Sorted by Age
+-------------+------------------+------------------+-----------------+
|          job|          avg(age)|      avg(balance)|    avg(duration)|
+-------------+------------------+------------------+-----------------+
|self-employed|              31.0|             144.0|            676.0|
| entrepreneur|              34.0|            -195.5|            432.5|
|   unemployed|              37.0|             381.0|            985.0|
|   technician|39.411764705882355|1343.3529411764705|980.2941176470588|
|  blue-collar| 39.48571428571429|1050.6470588235295|917.9714285714285|
|   management|           40.4375|         1771.5625|          1139.75|
|       admin.|              42.0| 524.1538461538462|            815.5|
|     services|              43.5| 277.6666666666667|          815.125|
|      unknown|              49.0|             341.0|            520.0|
|    housemaid|              52.0|            3923.0|            942.0|
|      retired|              55.0|            843.

## Cleaning Up
While the data may be accurate, it's still not necessarily appropriate in a professional context. Let's make a few adjustments to make it more appealing.

In [18]:
from pyspark.sql.functions import format_number, col

# Let's start off with this. Just grouping by job and presenting the mean.
group_job_df = df.groupBy('job').mean()
group_job_df.show()

# Now that we've calculated the mean, the values for blue-collar and technician are extremely long. 
# We can use format_number to reduce the total amount of decimals. 
# The number two represents the amount of decimals we want to be displayed.
group_job_df = group_job_df.select('job',
                                   format_number('avg(age)',2),
                                   format_number('avg(balance)',2),
                                   format_number('avg(duration)',2))
group_job_df.show()

# But now the column names look quite unprofessional. We can assign an alias to rename each of them.
group_job_df = group_job_df.select(col('job').alias('Job Category'),
                                   col('format_number(avg(age), 2)').alias('Average Age'),
                                   col('format_number(avg(balance), 2)').alias('Average Balance'),
                                   col('format_number(avg(duration), 2)').alias('Average Duration'))
group_job_df.show()

# Finally, let's sort the DataFrame by age.
group_job_df = group_job_df.orderBy('Average Age')

print('Average Age, Balance and Duration by Job Category')
group_job_df.show()

+-------------+------------------+------------------+-----------------+
|          job|          avg(age)|      avg(balance)|    avg(duration)|
+-------------+------------------+------------------+-----------------+
|   management|           40.4375|         1771.5625|          1139.75|
|      retired|              55.0|            843.75|          1060.25|
|      unknown|              49.0|             341.0|            520.0|
|self-employed|              31.0|             144.0|            676.0|
|  blue-collar| 39.48571428571429|1050.6470588235295|917.9714285714285|
| entrepreneur|              34.0|            -195.5|            432.5|
|       admin.|              42.0| 524.1538461538462|            815.5|
|   technician|39.411764705882355|1343.3529411764705|980.2941176470588|
|     services|              43.5| 277.6666666666667|          815.125|
|    housemaid|              52.0|            3923.0|            942.0|
|   unemployed|              37.0|             381.0|           

## Great job on finishing! 

Let's go over a few additional key takeaways:
- You should understand why group_job_df was reassigned each time. 
- Also, you should know that using pyspark.sql.functions is not the only way of achieving such tasks. You could use a different package, function or method (check out the documentation, or click here: https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark)
- Finally, you should realise that the PySpark API allows you to fully utilise the Python programming language. You don't have to be explicit like in the code example above - that was for the sake of simplicity. If you're comfortable with programming, try using a loop to make repetitive work faster and simpler.