# Common Spark Transformations

## Learning Objectives

For this notebook I'll be: 
- Reading data into Spark
- Implementing projections and filters
- Renaming and adding columns
- Performing aggregations
- Saving the work

To achieve the obectives set out in the above statement, I'll also be Using the Pyspark API, which forms a part of the Spark Core.

Let's get started shall we :)

It's always important to check the version of Spark you're currently working with because things change frequently and the this helps you feref to the correct documentation when the need arises

In [1]:
import pyspark
pyspark.__version__

'3.3.0'

Awesome, now that Spark is installed, let me clarify that I'll be running it locally, therefore the `driver` and the `executor`  will be located on the same machine ie `localhost`.

Ok, now in order to get Spark to execute my instructions through the PySpark API, I first need to register a `SparkContext` followed by setting up a `SparkSession.`

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Initiate the Spark Context
sc = SparkContext()
#Initiate the Spark Session
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/18 10:27:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


I'll be using Bitcoin dataset from kaggle for this analysis. I'll also be working with the spark dataframe as apposed to RDD in this example.

We know that spark can infer the schema of a dataset that it reads (2 passes of the data are required ofr this) or we can define the schema ourselves when reading in the data.

In [3]:
btc_df = spark.read.csv('/Users/daluxmba/Desktop/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv', header=True)

                                                                                

In [4]:
# In spark we use the show() method to display the dataframe or to have a look at it's contents
btc_df.show(3)

+----------+----+----+----+-----+------------+-----------------+--------------+
| Timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+----------+----+----+----+-----+------------+-----------------+--------------+
|1325317920|4.39|4.39|4.39| 4.39|  0.45558087|     2.0000000193|          4.39|
|1325317980| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|
|1325318040| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|
+----------+----+----+----+-----+------------+-----------------+--------------+
only showing top 3 rows



In [5]:
btc_df.describe().show()

                                                                                

+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|summary|           Timestamp|   Open|   High|    Low|  Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+
|  count|             4857377|4857377|4857377|4857377|4857377|     4857377|          4857377|       4857377|
|   mean|1.4713007665042922E9|    NaN|    NaN|    NaN|    NaN|         NaN|              NaN|           NaN|
| stddev| 8.428019437553181E7|    NaN|    NaN|    NaN|    NaN|         NaN|              NaN|           NaN|
|    min|          1325317920|     10|     10|    1.5|    1.5|           0|                0|            10|
|    max|          1617148800|    NaN|    NaN|    NaN|    NaN|         NaN|              NaN|           NaN|
+-------+--------------------+-------+-------+-------+-------+------------+-----------------+--------------+



Now lets take a look at the schema here

In [6]:
btc_df.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume_(BTC): string (nullable = true)
 |-- Volume_(Currency): string (nullable = true)
 |-- Weighted_Price: string (nullable = true)



From the above I can see that spark infered all of the data types incorrectly and basically read everything as a string.
- Timestamp should not be a string
- Open, High, Low, Close, Voluems, and Weighted Price should all be numeric


To better read in this data, I can manually specify the schema using built-in data types in Spark.
- `StructType([])` defines the overall structure of the dataframe
- `StructField()` defines the data type of each column therefore it's used a lot. 


I'll go ahead and define a schema for this dataset. To do so, I'll make use of the various data types provided within the pyspark.sql.types module (a full list of the supported types and their properties can be found in the following link https://athena.explore-datascience.net/student/content/train-view/64/161/3131).

Note that below I read timestamps in as Integer types, I'll later cast them to Timestamp types. I did this because the Spark reader does not perfectly cast date types at read time.

In [7]:
#Import the required StructTypes in order to create the struct fields and in turn the schema

from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType

In [8]:
schema = StructType([
    StructField('Timestamp', IntegerType(), True),
    StructField('Open', DoubleType(), True),
    StructField('High', DoubleType(), True),
    StructField('Low', DoubleType(), True),
    StructField('Close', DoubleType(), True),
    StructField('Volume_(BTC)', DoubleType(), True),
    StructField('Volume_(Currency)', DoubleType(), True),
    StructField('Weighted_Price', DoubleType(), True)
])

In [9]:
btc_schema = spark.read.csv\
            ('/Users/daluxmba/Desktop/bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv', \
             header=True,\
            schema = schema,\
            multiLine = True)

In [10]:
btc_schema.printSchema()

root
 |-- Timestamp: integer (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_(BTC): double (nullable = true)
 |-- Volume_(Currency): double (nullable = true)
 |-- Weighted_Price: double (nullable = true)



Awesome, it looks all the schema changes applied to the dataframe have now taken effect. There's still the timestamp column to deal with though.

I will need to cast this field to the appropriate type. However, to do this I will first have to better understand how to manipulate data in PySpark.

## Altering column data

Manipulating the contents of a column, such as redefining its data type, is a common operation in Spark. To perform such a step, we call the `withColumn()` method on a DataFrame object. This method takes in two arguments:

    The first is the colName, or "column name" argument, which is a string indicating the column we would like to manipulate. If we provide the name of an existing column within our DataFrame, the method will essentially update its contents. Alternatively, if we provide a new column name, the method will append this new column to the resulting DataFrame.
    The second argument is the col, or "column expression", which defines what operation(s) should be performed to update/create the resulting column.

In our instance, we want to update the existing Timestamp column; changing the data from an integer encoding to a more suitable timestamp format.

To perform this conversion, we can use a powerful set of functions provided within PySpark's functions module. Here we are specifically interested in the to_timestamp() and to_date() functions, however, we'll soon see that we make extensive use of this module for many tasks.


In [11]:
# I need to import the sql functions module in order to perform the change and to apply functions to the dataframe.
from pyspark.sql import functions as F

In [12]:
btc_time_corrected = btc_schema.withColumn(colName='Timestamp', col=F.to_timestamp(btc_schema["Timestamp"]))

In [13]:
btc_time_corrected.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_(BTC): double (nullable = true)
 |-- Volume_(Currency): double (nullable = true)
 |-- Weighted_Price: double (nullable = true)



In [14]:
#Let's add an extra column to the dataframe. The column will indicate the day of the week, Sunday = 1

btc_time_corrected = btc_time_corrected.withColumn('Day', F.dayofweek(btc_time_corrected.Timestamp))

In [15]:
btc_time_corrected.show(3)

+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
|          Timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Day|
+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
|2011-12-31 09:52:00|4.39|4.39|4.39| 4.39|  0.45558087|     2.0000000193|          4.39|  7|
|2011-12-31 09:53:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|  7|
|2011-12-31 09:54:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|  7|
+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
only showing top 3 rows



## Projections and filters

Filerts and projections give us a way to select rows based on a condition we specify. In Pyspark projections are obtained using the `filter()` or the `where()` methods. They are basically the same and will produce the same result so it doesn't really matter qhich one you use.

To put projections to the test, I'll filter out the dataframe to have data from 2019 onwards. To accompish this, I'll import the datetime and create a filter for the date.

In [16]:
from datetime import date
filter_date = date(2019, 1, 1)

In [21]:
btc_2019 = btc_time_corrected.where(btc_time_corrected['Timestamp'] > filter_date)

In [22]:
btc_2019.show(3)

[Stage 9:>                                                          (0 + 1) / 1]

+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+---+
|          Timestamp|   Open|   High|    Low|  Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Day|
+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+---+
|2019-01-01 00:01:00|3674.58|3674.68| 3663.8| 3663.8| 15.44882028|     56671.806953|  3668.3582258|  3|
|2019-01-01 00:02:00|3664.41|3664.41|3664.41|3664.41|    0.015697|      57.52024377|       3664.41|  3|
|2019-01-01 00:03:00|3666.13|3669.13|3662.85|3669.13| 27.15298682|     99532.493706|  3665.6186064|  3|
+-------------------+-------+-------+-------+-------+------------+-----------------+--------------+---+
only showing top 3 rows



                                                                                

Perfect, looks like the DF now starts from 2019 as apposed to 2011. So, from the above, we've just filtered based on the rows, now what if we want to select columns using spark? Well, we have the `select()` function for that purpose!

Let's see how this works

In [23]:
btc_2019.select('Timestamp','Open','Close').show(3)

[Stage 10:>                                                         (0 + 1) / 1]

+-------------------+-------+-------+
|          Timestamp|   Open|  Close|
+-------------------+-------+-------+
|2019-01-01 00:01:00|3674.58| 3663.8|
|2019-01-01 00:02:00|3664.41|3664.41|
|2019-01-01 00:03:00|3666.13|3669.13|
+-------------------+-------+-------+
only showing top 3 rows



                                                                                

In [34]:
filter_date = date(2017,12,31)
filter_day = F.dayofweek(btc_time_corrected["Timestamp"]) == 2
filter_close = 6000

In [35]:
btc_time_corrected.select('Timestamp','Close')\
.where((btc_time_corrected['Timestamp'] > filter_date) & (btc_time_corrected['Close'] < filter_close) &filter_day ).show(4)

[Stage 14:>                                                         (0 + 1) / 1]

+-------------------+-------+
|          Timestamp|  Close|
+-------------------+-------+
|2018-11-19 00:00:00|5541.61|
|2018-11-19 00:01:00| 5541.8|
|2018-11-19 00:02:00|5540.64|
|2018-11-19 00:03:00|5540.64|
+-------------------+-------+
only showing top 4 rows



                                                                                

## Adding, Renaming and Dropping Columns

The two main reasons for a DE to remane columns are **Readability** and **to maintain the naming convention**.

Let's take a look at how I can improve both here.

In [38]:
cols = btc_2019.columns
cols

['Timestamp',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume_(BTC)',
 'Volume_(Currency)',
 'Weighted_Price',
 'Day']

In my personal opinion, you can never go wrong with making making the column names small letters. Also I believe removing special characters like '()' is warrented and will improve readability. I also want to save this file as a parquet file therefore no spaces between the names. Lets get it.


The one function I can use to rename a column is the `withColumnRenamed` function available in Pyspark

In [67]:
btc = btc_time_corrected

In [68]:
btc.show(2)

+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
|          Timestamp|Open|High| Low|Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|Day|
+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
|2011-12-31 09:52:00|4.39|4.39|4.39| 4.39|  0.45558087|     2.0000000193|          4.39|  7|
|2011-12-31 09:53:00| NaN| NaN| NaN|  NaN|         NaN|              NaN|           NaN|  7|
+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
only showing top 2 rows



In [69]:
for col in btc.columns:
    btc = btc.withColumnRenamed(col,col.lower())

In [70]:
btc.show(1)

+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
|          timestamp|open|high| low|close|volume_(btc)|volume_(currency)|weighted_price|day|
+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
|2011-12-31 09:52:00|4.39|4.39|4.39| 4.39|  0.45558087|     2.0000000193|          4.39|  7|
+-------------------+----+----+----+-----+------------+-----------------+--------------+---+
only showing top 1 row



In [74]:
btc = btc.withColumnRenamed('volume_(btc)',"volume_btc")

In [75]:
btc = btc.withColumnRenamed('volume_(currency)',"volume_currency")

In [76]:
btc.show(2)

+-------------------+----+----+----+-----+----------+---------------+--------------+---+
|          timestamp|open|high| low|close|volume_btc|volume_currency|weighted_price|day|
+-------------------+----+----+----+-----+----------+---------------+--------------+---+
|2011-12-31 09:52:00|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|  7|
|2011-12-31 09:53:00| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|  7|
+-------------------+----+----+----+-----+----------+---------------+--------------+---+
only showing top 2 rows



I'm feeling pretty good with this result for now. I have read in csv data using spark, corrected the schema and have renamed the columns appropriately. Now I am ready to save the result as a parquet file

In [81]:
btc.write.format("parquet").save("./transaformed_data")

                                                                                