<a href="https://colab.research.google.com/github/Yussufgit/MyPackage/blob/master/Common_spark_transformations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Common Transformations in Spark
© Explore Data Science Academy

## Learning objectives

In this train, you will learn how to:

- read data into Spark;
- implement projections and filters;
- rename, add and drop columns;
- perform aggregations, and 
- save progress by writing data from Spark. 

## Outline

To achieve our objectives, this train is structured as follows: 

 - An introduction to the PySpark API;
 - Accessing our data using PySpark;
 - Altering column data;
 - Projections and filters;
 - Adding, renaming, and dropping columns; 
 - Aggregations; and 
 - Writing data from the environment using PySpark.    




## An introduction to the PySpark API

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/Spark_and_python_s.png"
     alt="Dummy image 1"
     style="float: center; padding-bottom=0.5em"
     width=600px/>
     Spark and Python, a powerful partnership we'll learn to master. Logos exist in the <a href="https://creativecommons.org/"> Creative Commons</a> space.
</div>

To increase its access to a wider audience of users, Spark provides several language-specific APIs that can be used to interact with its underlying machinery. At the time of this train's writing, the officially supported APIs include implementations for [Python](https://spark.apache.org/docs/latest/api/python/index.html), [Scala](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html), [SQL](https://spark.apache.org/docs/latest/api/sql/), [R](https://spark.apache.org/docs/latest/api/R/index.html), and [Java](https://spark.apache.org/docs/latest/api/java/index.html).  


Due to the familiarity we've already gained with Python, in this train and from here onwards, we will focus on the Python implementation – Pyspark. 

As Spark (and therefore PySpark) has seen many version updates, with each varying the expected behaviour of the APIs, we will **prescribe the use of `PySpark 3.0.1+` (Spark 3.0.1) built on top of `Hadoop 2.7`** going forward. 

Remember that you can install PySpark through the installation of the complete Spark suite or by running:

`$ pip install pyspark` 

PySpark has a similar API to other Python programming tools and, since Spark 2.0 and the adoption of `DataFrames`, has grown to be very familiar to anyone who has worked with the Pandas API. 

Before we go any further, let's invoke the `PySpark` API and check that its version is correct:


In [2]:
!pip install pyspark
import pyspark
pyspark.__version__

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 59.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=047eeb6b9d3a2d847da4dd2ab8b0356fce2a1ea84aa37a56e78e917cee458a22
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


'3.3.0'

While Spark can be run on a large cluster consisting of several interdependent nodes, we assume that by default this train will run with Spark in `local` mode, where the `driver` and `executors` will be located on the same machine. 

As we've seen, to allow Spark to execute our instructions through the PySpark API, we first need to register a `SparkContext` followed by setting up a `SparkSession`:

In [3]:
# To start programming in Spark, define a SparkContext and SparkSession.

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)

With our session prepared, we're now ready to start reading in some data to analyse.

## Accessing our data using PySpark

In a field that is characterised by highly diverse sets of data that often need to be analysed, it is potentially risky to use a tool that is very particular about which data sources it supports. Fortunately for us, Spark has abstractions that allow for the consumption of data from several popular data sources, such as in-memory DataFrames, NoSQL databases, RDBMSs, and streaming applications.   

To see an example of this in practice, let's load the dataset which we'll be using throughout this train. Here our data consist of recorded Bitcoin transactions that are sourced from Kaggle [here](https://www.kaggle.com/mczielinski/bitcoin-historical-data).


<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/bitcoin.png"
     alt="Bitcoin-dataset"
     style="float: center; padding-bottom=0.5em"
     width=300px/>
     "Do you even Bitcoin?" Image by <a href="https://pixabay.com/users/sinisamaric1-3044277/?utm_source=link-attribution&amp;utm_medium=referral&amp;utm_campaign=image&amp;utm_content=3125488">Sinisa Maric</a> from <a href="https://pixabay.com/?utm_source=link-attribution&amp;utm_medium=referral&amp;utm_campaign=image&amp;utm_content=3125488">Pixabay</a>
</div>


We aim to read this data in as a DataFrame object. Spark's DataFrameReader (`spark.read()` function) does this for us by reading in a file and returning a `DataFrame` object of rows and named columns. The data type held within each of these columns can either be defined through the use of a schema (more on this later on) or can be inferred at read time by Spark. 

Using the `SparkSession` object we defined in the previous cell, it's easy to call methods to read in data from CSV, or any other [supported data format](https://spark.apache.org/docs/latest/sql-data-sources.html). Here we are provided with several methods that can be strung together to alter the behaviour of our read task. These include: 

 - the **`format()`** method, which can be used to designate the input file format;   
 - the **`option()`** method, which takes in key-value pairs to define various read behaviours, such as data integrity checking or header inclusion;    
 - the **`schema()`** method, which takes in either a `StructType` object or a [Data Definition Language](https://en.wikipedia.org/wiki/Data_definition_language) (`DDL`) string to define the data types expected in each DataFrame column; and   
 - the **`load()`** method, which receives a system path to the file(s) that are to be read. 
 
Let's put this all together by reading our data into a DataFrame which we'll call (generically) `df`. We'll include options to read in the data header and specify that our target source file is in a `csv` format: 

In [4]:
!unzip /content/archive.zip

Archive:  /content/archive.zip
  inflating: bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv  


In [5]:
# Remember to check that the path to the data given below is valid for your system.
df = spark.read \
     .format('csv') \
     .option('header',True) \
     .load('/content/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv')

To make our code more concise, PySpark also provides format-specific load operators, such as `spark.read.csv()`, which can be convenient if our read operation requires no special attributes and is in a consistent format.

In [6]:
# Remember to check that the path to the data given below is valid for your system.
df = spark.read.csv('/content/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv', header=True)

In Spark, you can use the `.show()` method on a DataFrame to have a look at its contents. The results, however, are not as intuitive as the `.head()` function in Pandas, and a lack of rich formatting makes them much more difficult to view.

In [7]:
# The `show()` method allows us to view the data, with the first argument specifying the number of rows to show, 
# and the second specifying that long entries should not be truncated. 
df.show(5, truncate=False)

+----------+----+----+----+-----+------------+-----------------+--------------+
|Timestamp |Open|High|Low |Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+----------+----+----+----+-----+------------+-----------------+--------------+
|1325317920|4.39|4.39|4.39|4.39 |0.45558087  |2.0000000193     |4.39          |
|1325317980|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|1325318040|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|1325318100|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|1325318160|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
+----------+----+----+----+-----+------------+-----------------+--------------+
only showing top 5 rows



Spark has a `describe()` function which gives us a summarised view of the dataset. Importantly, this function doesn't automatically return a result, but instead produces a transformed version of our DataFrame which still  needs to be displayed:  

In [8]:
df_desc = df.describe()
df_desc.show(10, False)

+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|summary|Timestamp           |Open   |High   |Low    |Close  |Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|count  |4857377             |4857377|4857377|4857377|4857377|4857377     |4857377          |4857377       |
|mean   |1.4713007665042922E9|NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
|stddev |8.428019437554955E7 |NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
|min    |1325317920          |10     |10     |1.5    |1.5    |0           |0                |10            |
|max    |1617148800          |NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+



If we try to perform many operations on a single DataFrame, we'll soon find that the above syntax is cumbersome and adds needless bloat to our code. Thankfully, the PySpark API supports *chaining*, which allows us to string many operations together on a single line. 

See this in action, by performing the `describe()` operation again through chaining:  

In [9]:
# We can call the .show() method directly on the transformed DataFrame produced from .describe().
df.describe().show(10, False)

+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|summary|Timestamp           |Open   |High   |Low    |Close  |Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|count  |4857377             |4857377|4857377|4857377|4857377|4857377     |4857377          |4857377       |
|mean   |1.4713007665042922E9|NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
|stddev |8.428019437554955E7 |NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
|min    |1325317920          |10     |10     |1.5    |1.5    |0           |0                |10            |
|max    |1617148800          |NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+



Similar to Pandas, Spark also allows us to have a look at the data types using the `.dtypes` attribute of a DataFrame, or by calling its `printSchema()` method: 

In [10]:
df.dtypes

[('Timestamp', 'string'),
 ('Open', 'string'),
 ('High', 'string'),
 ('Low', 'string'),
 ('Close', 'string'),
 ('Volume_(BTC)', 'string'),
 ('Volume_(Currency)', 'string'),
 ('Weighted_Price', 'string')]

In [11]:
df.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume_(BTC): string (nullable = true)
 |-- Volume_(Currency): string (nullable = true)
 |-- Weighted_Price: string (nullable = true)



From the above output, we can see that all the columns within our DataFrame were inferred incorrectly as `string` data types. This is an example of Spark's best (although unfortunately inadequate) efforts to automatically infer data types.  

To better read in our data, we can manually specify the schema of the dataset through built-in data types used in Spark. Remember that `StructType()` defines the overall structure of a DataFrame, and `StructField()` defines the content of a column – containing a field name, data type, and if the field is nullable. 

So let's go ahead and define a schema for our dataset. To do so, we make use of the various data types provided within the `pyspark.sql.types` module (a full list of the supported types and their properties can be found [here](https://spark.apache.org/docs/latest/sql-ref-datatypes.html)).  

*Note that below we read timestamps in as Integer types, and then later cast them to Timestamp types. We do this as the Spark reader does not perfectly cast date types at read time.*

In [12]:
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType

schema = StructType([StructField('Timestamp', IntegerType(), True),
                     StructField('Open', FloatType(), True),
                     StructField('High', FloatType(), True),
                     StructField('Low', FloatType(), True),
                     StructField('Close', FloatType(), True),
                     StructField('Volume_(BTC)', FloatType(), True),
                     StructField('Volume_(Currency)', FloatType(), True),
                     StructField('Weighted_Price', FloatType(), True)])

With our freshly defined schema, let's read in the data again:

In [13]:
df = spark.read.csv('/content/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv', header=True, schema=schema, multiLine=True)

We now check the DataFrame schema for correctness: 

In [14]:
df.printSchema()

root
 |-- Timestamp: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume_(BTC): float (nullable = true)
 |-- Volume_(Currency): float (nullable = true)
 |-- Weighted_Price: float (nullable = true)



Success! 

With a correct schema, let's take a look at our data again, this time paying special attention to the `Timestamp` field we mentioned earlier:

In [15]:
df.describe().show(10, False)

+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|summary|Timestamp           |Open   |High   |Low    |Close  |Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|count  |4857377             |4857377|4857377|4857377|4857377|4857377     |4857377          |4857377       |
|mean   |1.4713007665042922E9|NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
|stddev |8.428019437554696E7 |NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
|min    |1325317920          |3.8    |3.8    |1.5    |1.5    |0.0         |0.0              |3.8           |
|max    |1617148800          |NaN    |NaN    |NaN    |NaN    |NaN         |NaN              |NaN           |
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+



In [16]:
df.show(5)

+----------+----+----+----+-----+------------+-----------------+--------------+
| Timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+----------+----+----+----+-----+------------+-----------------+--------------+
|1325317920|4.39|4.39|4.39| 4.39|  0.45558086|              2.0|          4.39|
|1325317980| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|
|1325318040| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|
|1325318100| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|
|1325318160| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|
+----------+----+----+----+-----+------------+-----------------+--------------+
only showing top 5 rows



From the above output, everything appears to be correct except for the `Timestamp` column. Our goal, therefore, is to cast this field to the appropriate type. However, to do this we first have to better understand how we *manipulate data* in PySpark.

## Altering column data

Manipulating the contents of a column, such as redefining its data type, is a common operation in Spark. To perform such a step, we call the `withColumn()` method on a DataFrame object. This method takes in two arguments:

 - The first is the **`colName`**, or "column name" argument, which is a string indicating the column we would like to manipulate. If we provide the name of an existing column within our DataFrame, the method will essentially update its contents. Alternatively, if we provide a new column name, the method will append this new column to the resulting DataFrame. 
 - The second argument is the **`col`**, or "column expression", which defines what operation(s) should be performed to update/create the resulting column.  

In our instance, we want to update the existing Timestamp column; changing the data from an `integer` encoding to a more suitable `timestamp` format. 

To perform this conversion, we can use a powerful set of functions provided within PySpark's [`functions` module](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions). Here we are specifically interested in the `to_timestamp()` and `to_date()` functions, however, we'll soon see that we make extensive use of this module for many tasks.       

In [17]:
# We use the conventional alias `F` for the functions module due to its heavy usage within our code.
from pyspark.sql import functions as F

Let's put our understanding together and update our `Timestamp` column:

In [18]:
df = df.withColumn(colName='Timestamp', col=F.to_timestamp(df['Timestamp']))

Take a second to note how we selected the `Timestamp` column. Here we referenced the field using the `df['Timestamp']` syntax, which should feel familiar to us due to its similarity to Pandas. 

Interestingly, Spark actually provides several ways in which we can reference columns. 

These include using the `functions` module:

In [19]:
F.col('Timestamp')

Column<'Timestamp'>

Using Pandas notation:

In [20]:
df['Timestamp']

Column<'Timestamp'>

Or applying dot notation:

In [21]:
df.Timestamp

Column<'Timestamp'>

Each of these is equivalent and will return the same `Column` object to us. 

Moving along, let's inspect the results of the transformation we just performed:

In [22]:
df.show(5, False)

+-------------------+----+----+----+-----+------------+-----------------+--------------+
|Timestamp          |Open|High|Low |Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+-------------------+----+----+----+-----+------------+-----------------+--------------+
|2011-12-31 07:52:00|4.39|4.39|4.39|4.39 |0.45558086  |2.0              |4.39          |
|2011-12-31 07:53:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|2011-12-31 07:54:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|2011-12-31 07:55:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|2011-12-31 07:56:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
+-------------------+----+----+----+-----+------------+-----------------+--------------+
only showing top 5 rows



As can be seen, the `Timestamp` column is now appropriately formatted, with its underlying data type being `TimestampType`. Using this data type, Spark allows us to access parts of the timestamp using other functions in the pyspark.sql.functions module,  such as variants on `day()`, `month()`, and `year()`. 

Using the `withColumn()` method, let's extract the year into a new column:

In [23]:
# Remember, F.col('Timestamp') is equivalent to df['Timestamp'].
df = df.withColumn('Year', F.year(F.col('Timestamp')))

And the results are: 

In [24]:
df.show(5, False)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----+
|Timestamp          |Open|High|Low |Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Year|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+
|2011-12-31 07:52:00|4.39|4.39|4.39|4.39 |0.45558086  |2.0              |4.39          |2011|
|2011-12-31 07:53:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|
|2011-12-31 07:54:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|
|2011-12-31 07:55:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|
|2011-12-31 07:56:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+
only showing top 5 rows



**\[Exercise 1\]**

To familiarise yourself with the above functionality and syntax that we've covered so far, see if you can add two new columns to the above DataFrame called `Month`, and `Weekday`, each containing the appropriate data derived from the `Timestamp` column:   

In [29]:
# Write your code to add a 'Month' column to the DataFrame here:
df_ex1 = df.withColumn('Month', F.month(F.col('Timestamp')))



In [30]:
df_ex1.show(5, False)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
|Timestamp          |Open|High|Low |Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Year|Month|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
|2011-12-31 07:52:00|4.39|4.39|4.39|4.39 |0.45558086  |2.0              |4.39          |2011|12   |
|2011-12-31 07:53:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |
|2011-12-31 07:54:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |
|2011-12-31 07:55:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |
|2011-12-31 07:56:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
only showing top 5 rows



In [31]:
# Write your code to add a 'Weekday' column to the DataFrame here: 
# Note that the weekday can be represented as an integer, i.e. Sunday --> 1, Saturday --> 7.
df_ex1 = df.withColumn('Weekday', F.dayofweek(F.col('Timestamp')))


In [32]:
df_ex1.show(5, False)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+-------+
|Timestamp          |Open|High|Low |Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Year|Month|Weekday|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+-------+
|2011-12-31 07:52:00|4.39|4.39|4.39|4.39 |0.45558086  |2.0              |4.39          |2011|12   |7      |
|2011-12-31 07:53:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |7      |
|2011-12-31 07:54:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |7      |
|2011-12-31 07:55:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |7      |
|2011-12-31 07:56:00|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |2011|12   |7      |
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+-------+
only showing top 5 rows



## Projections and filters

We now turn our attention to an important operation that we'll be using extensively in PySpark, namely projection. 

A *projection* in a relational sense is a way to return only the rows matching specific conditions. In PySpark, projections are done using the `select` method, where filters (that are used to specify the criteria for selection) are expressed using the `filter()` or `where()` methods, which are both equivalent. 

To get a feel for how this works, let's try to implement a projection by selecting all entries which occur within our data after 2019.

To do this, we're going to use a couple of functions from the `datetime` module that you hopefully should be familiar with. Here we'll construct a `date` object, which can be used to filter `timestamp` data in Spark. 
Note that Spark also allows filtering with strings, for example, using `> '2020-01'` to return all dates greater than Jan-01-2020, but it is generally suggested to give the correct type to avoid possible data type mismatch errors between PySpark and Spark. 

Time to get filtering:

In [41]:
# Create our date object for filtering.
from datetime import date
filter_date = date(2020, 1, 1)

In [42]:
df.where(F.col('Timestamp') > filter_date).show(5, True)  # <-- Return all entries later than Jan 01, 2020.

+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+----+-----+
|          Timestamp|   Open|   High|    Low|  Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Year|Month|
+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+----+-----+
|2020-01-01 00:01:00|7161.51|7161.51|7155.09| 7161.2|   3.7769244|        27047.305|      7161.198|2020|    1|
|2020-01-01 00:02:00|7158.82|7158.82|7158.82|7158.82|  0.02927792|        209.59535|       7158.82|2020|    1|
|2020-01-01 00:03:00|7158.82|7158.82| 7156.9| 7156.9|  0.06581935|         471.1561|      7158.322|2020|    1|
|2020-01-01 00:04:00| 7158.5| 7158.5|7154.97| 7157.2|  0.97138673|         6950.501|      7155.236|2020|    1|
|2020-01-01 00:05:00|7156.52|7159.51| 7150.1| 7158.5|  0.88693166|         6342.851|      7151.454|2020|    1|
+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+----+-----+
o

Nice! It looks like everything worked as expected. 

We've just filtered based on the row values found within a column, with the resulting DataFrame having its full complement of fields. Often, however, we'd like to constrain the columns we return as part of the projection as well. 

To do this we make use of the `select()` method, which allows us to pick a subset of the fields from the target DataFrame:  

In [43]:
df.select('Timestamp', 'Open', 'High', 'Low').show(5, False)

+-------------------+----+----+----+
|Timestamp          |Open|High|Low |
+-------------------+----+----+----+
|2011-12-31 07:52:00|4.39|4.39|4.39|
|2011-12-31 07:53:00|NaN |NaN |NaN |
|2011-12-31 07:54:00|NaN |NaN |NaN |
|2011-12-31 07:55:00|NaN |NaN |NaN |
|2011-12-31 07:56:00|NaN |NaN |NaN |
+-------------------+----+----+----+
only showing top 5 rows



As we've previously seen, we can chain this method together with our previously built filter to produce a concise projection: 

In [44]:
df.select('Timestamp', 'Open', 'High', 'Low').where(F.col('Timestamp') > filter_date).show(5, False)

+-------------------+-------+-------+-------+
|Timestamp          |Open   |High   |Low    |
+-------------------+-------+-------+-------+
|2020-01-01 00:01:00|7161.51|7161.51|7155.09|
|2020-01-01 00:02:00|7158.82|7158.82|7158.82|
|2020-01-01 00:03:00|7158.82|7158.82|7156.9 |
|2020-01-01 00:04:00|7158.5 |7158.5 |7154.97|
|2020-01-01 00:05:00|7156.52|7159.51|7150.1 |
+-------------------+-------+-------+-------+
only showing top 5 rows



Nothing is stopping us from defining multiple filters within our projection. To do this, we simply need to use binary logic operators to define how our filters are to be combined.

To demonstrate how this can be done, let's create a projection which only contains entries recorded after `2020-01-01`, *and* whose `'Open'` field value is greater than 7000: 

In [45]:
filter_1 = F.col('Timestamp') > filter_date
filter_2 = F.col('Open') > 7000.0
# We use the '&' symbol to define the logical intersection of our filters. 
df.where(filter_1 & filter_2).show(5, False) 

+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+----+-----+
|Timestamp          |Open   |High   |Low    |Close  |Volume_(BTC)|Volume_(Currency)|Weighted_Price|Year|Month|
+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+----+-----+
|2020-01-01 00:01:00|7161.51|7161.51|7155.09|7161.2 |3.7769244   |27047.305        |7161.198      |2020|1    |
|2020-01-01 00:02:00|7158.82|7158.82|7158.82|7158.82|0.02927792  |209.59535        |7158.82       |2020|1    |
|2020-01-01 00:03:00|7158.82|7158.82|7156.9 |7156.9 |0.06581935  |471.1561         |7158.322      |2020|1    |
|2020-01-01 00:04:00|7158.5 |7158.5 |7154.97|7157.2 |0.97138673  |6950.501         |7155.236      |2020|1    |
|2020-01-01 00:05:00|7156.52|7159.51|7150.1 |7158.5 |0.88693166  |6342.851         |7151.454      |2020|1    |
+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+----+-----+
o

It's important to note from the above syntax that we use *binary operators* (`&` for 'and', `|` for 'or', `~` for 'not') to combine filtering conditions, instead of Python's built-in logical operators (`and`, `or`, `not`). Using the latter operators will raise an error, as the underlying data structures being compared are not boolean values (which is the data type that Python's logical operators expect). 

Similar to previous examples, we can also write the above projection compactly by using chaining, while also selecting a subset of the DataFrame's columns: 

In [46]:
df.select('Timestamp', 'Open', 'High', 'Close')\
  .where((F.col('Timestamp') > filter_date) & (
      F.col('Open') > 7000))\
  .show(5, False)

+-------------------+-------+-------+-------+
|Timestamp          |Open   |High   |Close  |
+-------------------+-------+-------+-------+
|2020-01-01 00:01:00|7161.51|7161.51|7161.2 |
|2020-01-01 00:02:00|7158.82|7158.82|7158.82|
|2020-01-01 00:03:00|7158.82|7158.82|7156.9 |
|2020-01-01 00:04:00|7158.5 |7158.5 |7157.2 |
|2020-01-01 00:05:00|7156.52|7159.51|7158.5 |
+-------------------+-------+-------+-------+
only showing top 5 rows



Note the presence of double brackets in the above statement, which are used to logically separate our filtering conditions that we've defined inline. 

**\[Exercise 2\]**

To test your understanding of the content we've covered so far, we challenge you to try and create a projection containing all applicable entries within the data which: 
 - occur on a Monday after Dec 31, 2017; and
 - have a 'Close' value less than 6000. 

To neaten the results of your projection, display only the 'Timestamp' and 'Close' columns in your answer.

In [50]:
# Write your code to define your data projection here:
new_filter_date= date(2017,12,31)
filter_1 = F.dayofweek(F.col('Timestamp')) == 2
filter_3 = F.col('Timestamp') > new_filter_date
filter_2 = F.col('Close') < 6000
df_ex2 = df.select('Timestamp', 'Close').where((filter_1) & (filter_2) & (filter_3))


In [51]:
df_ex2.show(5, False)

+-------------------+-------+
|Timestamp          |Close  |
+-------------------+-------+
|2018-11-19 00:00:00|5559.15|
|2018-11-19 00:01:00|5547.13|
|2018-11-19 00:02:00|5551.14|
|2018-11-19 00:03:00|5549.3 |
|2018-11-19 00:04:00|5549.29|
+-------------------+-------+
only showing top 5 rows



## Adding, renaming, and dropping columns

To round off our understanding of DataFrame manipulation, let's consider some Spark functionality to alter the *structure* of a DataFrame.  

Spark allows us to rename, add, or drop columns in DataFrames. While we've already looked at how to add columns to a DataFrame through the use of the `withColumn()` method,  in the following subsections, we'll consider both the motivation and methods behind column renaming and removal.

### Renaming a column 

There may be limitless reasons why we may want to rename some or all of the columns within a given DataFrame. For a data engineer, however, the motivation behind renaming one or more columns typically arises from two frequent scenarios: 

 - **Maintaining a naming convention:** It is often the case that our data integrates into a larger system or is stored in a specific file format that requires a certain naming convention to be upheld. For example, the Parquet file format (which will be discussed in an upcoming section) does not allow for space characters within column names. 
 
 - **To increase readability:** While we may become very familiar with the datasets we handle on a day-to-day basis, our ultimate role as data engineers is to help *other* teams, individuals, and systems to make effective use of available data. To fulfil this mandate, we may want to improve the readability of our DataFrame by including more descriptive column headings, normalising capitalisation, or removing non-alphanumeric characters which may be problematic for data scientists and analysts to work with.   


To put these principles into practice, let's consider our current DataFrame and think of ways in which we can improve its column headings: 

In [52]:
# We can use the .columns attribute of a DataFrame to conveniently see its field headings.
df.columns 

['Timestamp',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume_(BTC)',
 'Volume_(Currency)',
 'Weighted_Price',
 'Year',
 'Month']

Looking at the current data, two potential issues need our attention: 

 - Each column is capitalised, which may defy our organisation's naming convention.
 - The `Volume_(BTC)` and `Volume_(Currency)` fields contain non-alphanumeric characters (brackets) which may impede the efforts of data scientists working with the data. 

To rename columns in a DataFrame, we use the `withColumnRenamed()` method. This method works similarly to the `withColumn()` method and takes in two string arguments – the column to rename and the new name it should be designated by.

Let's see this in action: 

In [53]:
new_df = df.withColumnRenamed('Timestamp', 'timestamp')

Remember that Python DataFrames and RDDs are immutable. Thus, even if we rename a column, or we use `withColumn` to manipulate data within a DataFrame, we are not actually renaming the column or performing the transformation on the original DataFrame, but are rather creating a new object containing the changes. So if we want to persist the changes, we have to instantiate a new object which will receive these (shown as `new_df` in the cell above).

In [54]:
new_df.show(5)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
|          timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Year|Month|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
|2011-12-31 07:52:00|4.39|4.39|4.39| 4.39|  0.45558086|              2.0|          4.39|2011|   12|
|2011-12-31 07:53:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
|2011-12-31 07:54:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
|2011-12-31 07:55:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
|2011-12-31 07:56:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
only showing top 5 rows



Here we can see the newly updated column name, along with the other columns which are still capitalised. 

If we want to apply the same transformation to all the columns within our DataFrame, we might want to complete the process programmatically in Python to make the transformation generic (and save some time).

Let's see how we can do this:

In [55]:
# Loop over each column name within our DataFrame.
for column in df.columns:
    df = df.withColumnRenamed(column, '_'.join(column.split()).lower())
# Display our results.
df.show(5)

+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
|          timestamp|open|high| low|close|volume_(btc)|volume_(currency)|weighted_price|year|month|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
|2011-12-31 07:52:00|4.39|4.39|4.39| 4.39|  0.45558086|              2.0|          4.39|2011|   12|
|2011-12-31 07:53:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
|2011-12-31 07:54:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
|2011-12-31 07:55:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
|2011-12-31 07:56:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|2011|   12|
+-------------------+----+----+----+-----+------------+-----------------+--------------+----+-----+
only showing top 5 rows



There we go. Each column is now in lower case. 

We aren't done though, as we still have to address the presence of non-alphanumeric characters within the headings. 

**\[Exercise 3\]**

Before we give the code to perform the removal of the non-alphanumeric characters, think about how you would alter the above code to get rid of the `(` and `)` characters.

In [56]:
# Write your code here to remove the '(' and ')' characters from the column headings:
for column in df.columns:
    df = df.withColumnRenamed(column, '_'.join(column.split()).lower().replace('(', '').replace(')', ''))
df.show(5)



+-------------------+----+----+----+-----+----------+---------------+--------------+----+-----+
|          timestamp|open|high| low|close|volume_btc|volume_currency|weighted_price|year|month|
+-------------------+----+----+----+-----+----------+---------------+--------------+----+-----+
|2011-12-31 07:52:00|4.39|4.39|4.39| 4.39|0.45558086|            2.0|          4.39|2011|   12|
|2011-12-31 07:53:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|   12|
|2011-12-31 07:54:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|   12|
|2011-12-31 07:55:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|   12|
|2011-12-31 07:56:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|   12|
+-------------------+----+----+----+-----+----------+---------------+--------------+----+-----+
only showing top 5 rows



Having tried to find a solution, did you get something like this? 

In [None]:
for column in df.columns:
    df = df.withColumnRenamed(column, '_'.join(column.split()).lower().replace('(', '').replace(')', ''))
df.show(5)

+-------------------+----+----+----+-----+----------+---------------+--------------+----+
|          timestamp|open|high| low|close|volume_btc|volume_currency|weighted_price|year|
+-------------------+----+----+----+-----+----------+---------------+--------------+----+
|2011-12-31 07:52:00|4.39|4.39|4.39| 4.39|0.45558086|            2.0|          4.39|2011|
|2011-12-31 07:53:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|
|2011-12-31 07:54:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|
|2011-12-31 07:55:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|
|2011-12-31 07:56:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011|
+-------------------+----+----+----+-----+----------+---------------+--------------+----+
only showing top 5 rows



### Dropping  a column

Similar to column renaming, it is often required or desirable for us to remove a column(s) from our DataFrame. Some of these reasons may include: 

 - **Removing duplicate data:** Some processes within our data pipeline may cause columns to become duplicated. For example, when performing a join between DataFrames in Spark, an artefact of the operation is the duplication of the joining column which appears twice within the result. Dropping one of these columns removes the needless redundancy within our data.
 - **Pruning our data:** Another by-product of processes within our data pipeline may be the creation of features that aren't duplicated, but are irrelevant or unusable for our needs. Examples of this could include the dropping of a non-key field generated by default from an [IOT](https://en.wikipedia.org/wiki/Internet_of_things) device or removing a column that contains too high a proportion of missing data.    

To remove one or more columns from a DataFrame, simply call its `.drop()` method, which accepts a comma-separated list of strings indicating the names of the columns to be removed. 

For example: 

In [57]:
df.drop('Open', 'Close', 'High', 'Low').show(5)

+-------------------+----------+---------------+--------------+----+-----+
|          timestamp|volume_btc|volume_currency|weighted_price|year|month|
+-------------------+----------+---------------+--------------+----+-----+
|2011-12-31 07:52:00|0.45558086|            2.0|          4.39|2011|   12|
|2011-12-31 07:53:00|       NaN|            NaN|           NaN|2011|   12|
|2011-12-31 07:54:00|       NaN|            NaN|           NaN|2011|   12|
|2011-12-31 07:55:00|       NaN|            NaN|           NaN|2011|   12|
|2011-12-31 07:56:00|       NaN|            NaN|           NaN|2011|   12|
+-------------------+----------+---------------+--------------+----+-----+
only showing top 5 rows



Again, it's important to remember that, due to the immutability of DataFrames, the result of the `drop` operation needs to be assigned to a new or existing variable to be preserved.

## Aggregations

We've learned many important skills up until this point. We can now load data into Spark from an external source, change its structure, create additional fields, and view various projections representing filtered versions of its content. Now, we're going to learn one last basic skill which will prove vital in our ability to analyse data in Spark – performing data **aggregations**. 

The concept of performing an aggregation shouldn't be unfamiliar to us; we've come across these operations in both SQL and Pandas (Python-based DataFrames). Thankfully, we'll soon see that performing an aggregation using PySpark is very similar to these other frameworks.

Remember that when we perform an aggregation, we collect or *'group'* data based on a specified field, and then calculate an aggregate result using a given function. This means that our aggregation operations are inherently composed of two parts: 

 - The **Group stage**, where we designate which field(s) we should aggregate over; and
 - The **Function application stage**, where we specify an aggregation function to apply to our grouped data. 
 
In PySpark, both of these stages can be applied using distinct methods. 

Let's first consider the *grouping stage* of an aggregation. To group data together, we call the `groupBy()` method of a DataFrame. This method accepts one or more columns within our DataFrame and returns a [`RelationalGroupedDataset`](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RelationalGroupedDataset.html) object which can have aggregations applied to it. 

To see this in action, let's group our data according to the year it was captured. We could do this by executing the following statement:  

In [58]:
df.groupBy(F.year('timestamp'))

<pyspark.sql.group.GroupedData at 0x7f406b54d810>

Interesting. As we mentioned above, the result of the `groupBy()` method isn't a DataFrame as we've previously seen, but is instead a `RelationalGroupedDataset` object with specific properties. In fact, we can't actually visualise this object to get a result from it:

In [59]:
# Calling .show() on the GroupedData object will result in an error! 
df.groupBy(F.year('timestamp')).show()

AttributeError: ignored

This is where the *aggregation function application* stage comes in. It allows us to transform our grouped data object into a calculated result, producing a DataFrame once more. 

There are several aggregation functions that we can choose from. These are all specified within and available from our trusty [`functions` module](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions), and include: 
 - `min()`: Return the smallest value found in the grouped data;
 - `max()`: Return the largest value found in the grouped data;
 - `sum()`: Return the sum of all values in the grouped data; and
 - `mean()`: Return the average of all values in the grouped data. 
 
Using this knowledge, let's complete our example from above by counting the number of records that have been recorded for each year within the dataset:

In [60]:
# We can call the .count method on the result of our .groupBy operation.
df.groupBy(F.year('timestamp')).count().show()

+---------------+------+
|year(timestamp)| count|
+---------------+------+
|           2018|525600|
|           2015|519128|
|           2013|525600|
|           2014|525600|
|           2019|525600|
|           2020|527040|
|           2012|527040|
|           2016|527040|
|           2011|   968|
|           2017|525600|
|           2021|128161|
+---------------+------+



Looks good! 

It's important to note that we don't always have to use the `groupBy()` function to calculate an aggregate of a column's data. In this sense, there are many aggregation methods which can be called directly from a DataFrame, including `stat()`, `correlation()`, `covariance()`, `sampleBy()`, `approxQuantile()` and `frequentItems()`.

We also aren't restricted to call a single aggregation function on our grouped data. Instead, we can make use of the powerful `agg()` method, which allows us to create multiple aggregations by supplying a dictionary object of all transformations we wish to perform on our DataFrame. Here the basic structure of the dictionary should be `{field: aggregation_function}`, for example, `{'high': 'sum'}`. 

Let's see a practical example of this: 

In [61]:
# Note that we use the .dropna() method to remove all NaN values
# from the dataset which would otherwise cause all our results 
# to become NaN. 
df.dropna().groupBy(F.year('timestamp')) \
  .agg({'high': 'max', 'volume_currency': 'sum'}) \
  .show(10, False)

+---------------+---------+---------------------+
|year(timestamp)|max(high)|sum(volume_currency) |
+---------------+---------+---------------------+
|2018           |17234.99 |3.0746672203400585E10|
|2015           |502.0    |1.545444530137112E9  |
|2013           |1163.0   |1.5511774873354862E9 |
|2014           |995.0    |2.6152928091775055E9 |
|2019           |13880.0  |2.3158536096590027E10|
|2020           |29300.0  |3.3290575546295395E10|
|2012           |16.41    |5764800.229803922    |
|2016           |980.74   |1.1124637970242171E9 |
|2011           |4.58     |425.3203430175781    |
|2017           |19666.0  |2.1775822445412468E10|
+---------------+---------+---------------------+
only showing top 10 rows



Looking at these results, we can make two observations: 

Firstly, *the names of the aggregated columns conform to a specific format*, namely `aggregation_function(column_name)`. For example, using the `max` function on the `'high'` column produces a result column called `'max(high)'`. 

Secondly, it becomes apparent that any trends in our results are difficult to spot due to a lack of order. This is where the `orderBy()` method saves the day. Almost identical to its SQL counterpart, in Spark the `orderBy()` method accepts a column name used to order the aggregation output either in ascending (default behaviour) or descending order.

We can extend our example once again by adding in ordering:

In [62]:
# Take note that, because of Spark's naming convention, 
# we order by 'year(timestamp)' instead of just 'timestamp'.
df.dropna().groupBy(F.year('timestamp')) \
    .agg({'high': 'max', 'volume_currency': 'sum'}) \
    .orderBy('year(timestamp)').show(10, False)

+---------------+---------+---------------------+
|year(timestamp)|max(high)|sum(volume_currency) |
+---------------+---------+---------------------+
|2011           |4.58     |425.3203430175781    |
|2012           |16.41    |5764800.229803922    |
|2013           |1163.0   |1.5511774873354862E9 |
|2014           |995.0    |2.6152928091775055E9 |
|2015           |502.0    |1.545444530137112E9  |
|2016           |980.74   |1.1124637970242171E9 |
|2017           |19666.0  |2.1775822445412468E10|
|2018           |17234.99 |3.0746672203400585E10|
|2019           |13880.0  |2.3158536096590027E10|
|2020           |29300.0  |3.3290575546295395E10|
+---------------+---------+---------------------+
only showing top 10 rows



As can be seen in the code above, Spark's naming convention can become pretty unwieldy quickly. Here if we were to string multiple aggregations together on a single field, it could result in something like `agg_3(agg_2(agg_1(field_name)))`. Only a mother could love that.

Spark helps us prevent such a tragedy by providing the `alias()` method, which allows us to specify what the result of an aggregation should be called. To create aliases, we use a select statement to get the columns that we want to rename, followed by using the `col()` method to retrieve the desired columns, and `alias()` method to rename them. As we've seen by now, Spark gives us multiple ways to accomplish something. You can also use the `withColumnRenamed()` method here instead of `alias()`. In most cases, it will depend on the use case while optimising succinctness, which determines what our approach should be. 

Let's put this all together by creating one final aggregation to give us a sense of Bitcoin's behaviour over the last 10 years:

In [63]:
# We chain the .alias() method within a select statement in order 
# to rename our aggregated results. 
df.dropna().groupBy(F.year('timestamp'))\
    .agg({'high': 'max', 'volume_currency': 'sum'})\
    .select(F.col('year(timestamp)').alias('year_agg'), 
            F.col('max(high)').alias('max'), 
            F.col('sum(volume_currency)').alias('volume_sum'))\
    .orderBy('year(timestamp)').show(10, False)

+--------+--------+---------------------+
|year_agg|max     |volume_sum           |
+--------+--------+---------------------+
|2011    |4.58    |425.3203430175781    |
|2012    |16.41   |5764800.229803922    |
|2013    |1163.0  |1.5511774873354862E9 |
|2014    |995.0   |2.6152928091775055E9 |
|2015    |502.0   |1.545444530137112E9  |
|2016    |980.74  |1.1124637970242171E9 |
|2017    |19666.0 |2.1775822445412468E10|
|2018    |17234.99|3.0746672203400585E10|
|2019    |13880.0 |2.3158536096590027E10|
|2020    |29300.0 |3.3290575546295395E10|
+--------+--------+---------------------+
only showing top 10 rows



This is quite informative. At first, we can observe Bitcoin's peak in 2017 where the digital currency maxed out just under 20 000 USD, then its subsequent crash, and finally, its recent climb and resurgence to a new high of just under 30 000 USD in 2020. We can also see a correlation between Bitcoin's trading value and the volume of trades over time. This isn't bad, considering we've used just a few basic Spark commands to analyse our data.  

With our analysis complete, it's time to learn how to store our data. 

## Writing data from the environment using PySpark

Typically, once we've performed some transformation on our dataset, we will want to persist our results by writing/saving them to an external data source. 

Spark supports a wide range of data formats, including CSV, JSON, Avro, and Parquet. For more information on the formats supported, see the [Spark sources documentation](https://spark.apache.org/docs/latest/sql-data-sources.html). A common choice as a storage format is Parquet, as it enables us to save our table's schema as metadata, allowing us to skip the tedious process of defining the data schema whenever our data is re-read into Spark.     

To save our data to a specific source, we use the `.write()` method of a DataFrame, which invokes the [DataFrameWriter](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=save#pyspark.sql.DataFrameWriter) class, allowing us to configure the write process. Just like the read operation that we saw at the start of this train, when we perform a data write we can call the `.format()` and `.option()` methods to alter the behaviour of the write operation before it is executed. Two unique write methods to remember are: 

 - **`save()`** which specifies a write location in storage (for example, `df.write.format("parquet").save(parquet_path)`); and
 - **`saveAsTable()`** which saves our data to a table, along with its metadata being written to the Hive metastore (for example, `df.write.format("parquet").saveAsTable(parquet_table)`). 

Related to the second method, remember that the Hive metastore is an abstraction built on top of the Hadoop file system (HDFS), but which also comes standard with a Spark installation. This allows us to store additional metadata along with our files, such as any predefined properties. Any table created using the above syntax will be created in `/user/hive/warehouse/{schema_name}.db/{table_name}`.

This table can also be created using Spark SQL (which is covered in a related train):

```SQL
CREATE TABLE {schema_name}.{table_name}
    ({table_schema})
    LOCATION "/user/hive/warehouse/{schema_name}.db/{table_name}"
```

Additional properties can be added to the table using:

```SQL
ALTER TABLE {schema_name}.{table_name} 
    SET TBLPROPERTIES (PROPERTIES = "{properties}")
```
    

That's enough theory for now. Let's cement our learning by writing our data to the same directory from which it was read, using the Parquet file format.  

In [66]:
# Just as we did in the read step of our data, ensure that the save path 
# given below exists and is accessible from this notebook. 
df.write.format("parquet").save("./data/bitcoin_data/transformed_data")

If the command didn't return any errors, then you've just successfully written data as a Parquet file – well done! 

## Conclusion

That was a lot to take in. Let's do a brief recap to remind us what we've learned. 

We started with an introduction to the PySpark API, which provides an interface to Spark via the Python programming language. We were then able to use PySpark's functionality to read in data for visualisation and processing, during which time we were also exposed to syntax features such as command chaining which can help make our code more succinct. 

Once read into Spark, we were able to manipulate our data in multiple ways; creating new columns, renaming existing ones, forming new projections through the application of filters, as well as analysing our data by performing aggregations on it. 

We then wrapped up by briefly discussing the various options available for writing our data from Spark to another source. 

We encourage you to practise the skills which we've covered within this train by trying to analyse your own sources of data that you may find. Doing so will ensure that you don't have to worry about small distractions such as syntax when you face more challenging scenarios as a data engineer.

## Appendix

The following resources may prove helpful on your learning journey with PySpark:

 - [Six Comprehensive Spark Exercises to Practice](https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565)

### Exercise solutions 

**\[Exercise 1\]**

In [None]:
# Part 1
# Write your code to add a 'Month' column to the DataFrame here: 

df_ex1 = df.withColumn('Month', F.month(F.col('Timestamp')))

In [None]:
# Part 2
# Write your code to add a 'Weekday' column to the DataFrame here: 
# Note that the weekday can be represented as an integer, i.e. Sunday --> 1, Saturday --> 7.

df_ex1 = df_ex1.withColumn('Weekday', F.dayofweek(F.col('Timestamp')))

**\[Exercise 2\]**

In [None]:
# Write your code to define your data projection here:

new_filter_date = date(2018, 1, 1)
filter_1 = F.dayofweek(F.col('Timestamp')) == 2
filter_3 = F.col('Timestamp') > new_filter_date
filter_2 = F.col('Close') < 6000
df_ex2 = df.select('Timestamp', 'Close').where((filter_1) & (filter_2) & (filter_3))