<a href="https://colab.research.google.com/github/akash865/spark_101/blob/master/Spark_101_DataManipulation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark 101 - Data Manipulation using Spark
<hr size="5"/>

> Notebook containing basic spark functions to get started with data analysis
- Author: Akash Chandra
- Comments: true
- Categories: [Python, PySpark, Pandas]
- Spark version: 3.0.3

## **1. Running Spark in Colab**

Running spark codes in colab need some library imports. Please follow notebook to get started.

### 1.1 Initialize Spark

The below line of codes initializes spark. This installs Apache Spark 3.0.0, Java 8, and Findspark, a library that makes it easy for Python to find Spark. You might also need to refer to correct version while installing. Here I am using 3.0.3. Any update in the library location might lead to an error, please refer to the link below to find the stable version. [Link to library](https://downloads.apache.org/spark/)

> Version: spark-3.0.3


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# sqlContext = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
# Check if spark is initialized
df = spark.sql('''select 'spark' as hello''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



If you are able to run the codes above, you are good to proceed. Let's start with ways to create dataframe in spark. We will also do some basic data manipulation. But let's start with importing useful libraries first. Then we will look at some useful dataframe methods.
<br>
<br>
Please note that there are many functions available in `pyspark.sql.function` which will be helpful for data analysis and descriptive analytics.

In [5]:
# Import spark libraries
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F # for more sql functions
from functools import reduce

# Data manipulation using spark

Before we begin this section, let start with importing the data we have. We will use some basic functions usually needed when requiring analytics.

In [6]:
# Using file uploaded to github
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/akash865/spark_101/master/banklist.csv"
spark.sparkContext.addFile(url)

df = spark.read.options(header="true", inferschema = "true", delimiter=",").csv("file://"+SparkFiles.get("banklist.csv"))

print('df.count  :', df.count())
print('df.col ct :', len(df.columns))
print('df.columns:', df.columns)

df.count  : 561
df.col ct : 6
df.columns: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


### **3. Using SQL in spark**

Often there is a need to use SQL for data manipulation. Though spark provides nearly all functions that we use in SQL, it is often easier to use SQL because of its familiarity. Please do note that it is best practice to avoid spaces in column names (in the example below). We will also learn spark functions/methods that will help as we go along. We can start with schema details.

In [7]:
df.registerTempTable("temp_tb")

df_check = spark.sql('''select `Bank Name`, City, `Closing Date` from temp_tb''')
df_check.show(4, truncate=False)

+--------------------------------+-------------+------------+
|Bank Name                       |City         |Closing Date|
+--------------------------------+-------------+------------+
|The First State Bank            |Barboursville|3-Apr-20    |
|Ericson State Bank              |Ericson      |14-Feb-20   |
|City National Bank of New Jersey|Newark       |1-Nov-19    |
|Resolute Bank                   |Maumee       |25-Oct-19   |
+--------------------------------+-------------+------------+
only showing top 4 rows



## **4 Dataframe Basic Operations**

### 4.1 Describe dataframe

Describe is a useful method which performs `count`, `mean`, `stddev`, `min` and `max` on all columns. We could limit our variables by passing in column name(s) in `describe`.

In [8]:
df.describe().show()

+-------+--------------------+-------+----+-----------------+---------------------+------------+
|summary|           Bank Name|   City|  ST|             CERT|Acquiring Institution|Closing Date|
+-------+--------------------+-------+----+-----------------+---------------------+------------+
|  count|                 561|    561| 561|              561|                  561|         561|
|   mean|                null|   null|null|31685.68449197861|                 null|        null|
| stddev|                null|   null|null|16446.65659309965|                 null|        null|
|    min|1st American Stat...|Acworth|  AL|               91|      1st United Bank|    1-Aug-08|
|    max|               ebank|Wyoming|  WY|            58701|  Your Community Bank|    9-Sep-11|
+-------+--------------------+-------+----+-----------------+---------------------+------------+



In [9]:
df.describe('City', 'ST').show()

+-------+-------+----+
|summary|   City|  ST|
+-------+-------+----+
|  count|    561| 561|
|   mean|   null|null|
| stddev|   null|null|
|    min|Acworth|  AL|
|    max|Wyoming|  WY|
+-------+-------+----+



### 4.2 Counts, Columns and Schema

Prior to data processing, it is imperative to look at `counts`, `columns` and `data types`. This is easily done below. We could look at the string and numerical columns using `dtypes`. 

In [10]:
print('df.count		:', df.count())
print('df.columns	:', df.columns)
print('df dtypes	:', df.dtypes)
print('df schema 1:', df.schema)

df.count		: 561
df.columns	: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']
df dtypes	: [('Bank Name', 'string'), ('City', 'string'), ('ST', 'string'), ('CERT', 'int'), ('Acquiring Institution', 'string'), ('Closing Date', 'string')]
df schema 1: StructType(List(StructField(Bank Name,StringType,true),StructField(City,StringType,true),StructField(ST,StringType,true),StructField(CERT,IntegerType,true),StructField(Acquiring Institution,StringType,true),StructField(Closing Date,StringType,true)))


In [11]:
print('df schema 1:')
df.printSchema()

df schema 1:
root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)



### 4.3 Remove Duplicates

When working with data from other sources, it is often required to clean them before processing further. `dropDuplicates` remove all instances of rows with duplicates. Do note that the 'complete' duplicates rows are removed and it doesn't take into account the duplicate IDs. Removing duplicate IDs require the use of `window` function, we will visit later.

In [12]:
df = df.dropDuplicates()
print('df.count		:', df.count())
print('df.columns	:', df.columns)

df.count		: 561
df.columns	: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


### 4.4 Select specific columns

Specific columns can be stored in a dataframe using `select` statement. It accepts list as an argument for a list of specific columns/field.

In [13]:
df2 = df.select(*['Bank Name', 'City'])
df2.show(2)

+--------------------+--------+
|           Bank Name|    City|
+--------------------+--------+
| First Bank of Idaho| Ketchum|
|Amcore Bank, Nati...|Rockford|
+--------------------+--------+
only showing top 2 rows



### 4.5 Select multiple columns

In case we have a list of columns we need to drop, alternatively we could remove multiple columns from dataframe. So, we are using list operations in python to get the variable list subset. And then using `select` method creates a dataframe with specific columns.

In [14]:
col_l = list(set(df.columns)  - {'CERT','ST'})
df2 = df.select(*col_l)
df2.show(2)

+---------------------+--------------------+--------+------------+
|Acquiring Institution|           Bank Name|    City|Closing Date|
+---------------------+--------------------+--------+------------+
|      U.S. Bank, N.A.| First Bank of Idaho| Ketchum|   24-Apr-09|
|          Harris N.A.|Amcore Bank, Nati...|Rockford|   23-Apr-10|
+---------------------+--------------------+--------+------------+
only showing top 2 rows



### 4.6 Rename columns

We will rename multiple columns in the example below. I found it annoying to look at spaces in field names. So let's start with removing spaces from the columns using `withColumnRenamed`. The first argument is the existing column and the 2nd argument is the new column name.

In [15]:
df2 = df \
  .withColumnRenamed('Bank Name'            , 'bank_name') \
  .withColumnRenamed('Acquiring Institution', 'acq_institution') \
  .withColumnRenamed('Closing Date'         , 'closing_date') \
  .withColumnRenamed('ST'                   , 'state') \
  .withColumnRenamed('CERT'                 , 'cert') #\

df2.show(2)

+--------------------+--------+-----+-----+---------------+------------+
|           bank_name|    City|state| cert|acq_institution|closing_date|
+--------------------+--------+-----+-----+---------------+------------+
| First Bank of Idaho| Ketchum|   ID|34396|U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|   IL| 3735|    Harris N.A.|   23-Apr-10|
+--------------------+--------+-----+-----+---------------+------------+
only showing top 2 rows



### 4.7 Rename columns using loop

For renaming multiple columns, we could use loop to help us perform similar operations like case change, replace characters. In the below example, we are replacing all spaces with `_`.

In [16]:
rename_expr = [col(column).alias(column.replace(' ', '_')) for column in df.columns]

df2 = df.select(*rename_expr)
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+
|           Bank_Name|    City| ST| CERT|Acquiring_Institution|Closing_Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+------------+
only showing top 2 rows



### 4.8 Add columns

In the below example, we will copy the column state using `col` function. PySpark SQL libraries contains many functions which we will be using through. Note that `col` is one such function which returns column values.

In [17]:
df2 = df.withColumn('state', col('ST'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-----+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|state|
+--------------------+--------+---+-----+---------------------+------------+-----+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|
+--------------------+--------+---+-----+---------------------+------------+-----+
only showing top 2 rows



### 4.9 Add constant column

What if we want to add a column with same value throughout. Say in this case, we want to add country - 'US' as a new column. `lit` helps us 'linear transform' this value to all rows in the dataframe.

In [18]:
df2 = df.withColumn('country', lit('US'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|country|
+--------------------+--------+---+-----+---------------------+------------+-------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|     US|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|     US|
+--------------------+--------+---+-----+---------------------+------------+-------+
only showing top 2 rows



### 4.10 Drop columns

Instead of selecting specific columns, we could also `drop` columns not required for analysis. 




In [19]:
df2 = df.drop('CERT')
df2.show(2)

+--------------------+--------+---+---------------------+------------+
|           Bank Name|    City| ST|Acquiring Institution|Closing Date|
+--------------------+--------+---+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+---------------------+------------+
only showing top 2 rows



### 4.11 Drop multiple columns

As with `select`, dropping multipl columns is also simple. All we have to do is pass a list of columns using `*` to remove. Below are 2 ways to doing the same.

In [20]:
df2 = df.drop(*['CERT','ST'])
df2.show(2)

+--------------------+--------+---------------------+------------+
|           Bank Name|    City|Acquiring Institution|Closing Date|
+--------------------+--------+---------------------+------------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---------------------+------------+
only showing top 2 rows



In [21]:
df2 = reduce(DataFrame.drop, ['CERT','ST'], df)
df2.show(2)

+--------------------+--------+---------------------+------------+
|           Bank Name|    City|Acquiring Institution|Closing Date|
+--------------------+--------+---------------------+------------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---------------------+------------+
only showing top 2 rows



### 4.12 Filter data

When looking at data tables, we are required for 'cuts and slices' to get insights. Below we have examples to filter data using `>`, `<`, `==`, `between` and `isin`

In [22]:
# Equal to values
df2 = df.where(df['ST'] == 'NE')

# Between values
df3 = df.where(df['CERT'].between('1000','2000'))

# Is inside multiple values
df4 = df.where(df['ST'].isin('NE','IL'))

print('df.count  :', df.count())
print('df2.count :', df2.count())
print('df3.count :', df3.count())
print('df4.count :', df4.count())


df.count  : 561
df2.count : 4
df3.count : 9
df4.count : 73


In [23]:
# Equal to values
df2 = df.where(df['ST'] == 'NE')

print('\ndf2 sample below')
df2.show(2)


df2 sample below
+-------------------+---------+---+-----+---------------------+------------+
|          Bank Name|     City| ST| CERT|Acquiring Institution|Closing Date|
+-------------------+---------+---+-----+---------------------+------------+
|       TierOne Bank|  Lincoln| NE|29341|   Great Western Bank|    4-Jun-10|
|Sherman County Bank|Loup City| NE| 5431|        Heritage Bank|   13-Feb-09|
+-------------------+---------+---+-----+---------------------+------------+
only showing top 2 rows



### 4.13 Filter data using logical operators

If we need to filter using multiple conditions, we could use logical operators. Please note the logical operators here, `AND`:`&`, `OR`:`|` and `NOT`:`!`. In the given dataframe, we have only 1 failed bank in state of 'NE' and city of 'Ericson'.

In [24]:
df2 = df.where((df['ST'] == 'NE') & (df['City'] == 'Ericson'))
df2.show(3)

+------------------+-------+---+-----+---------------------+------------+
|         Bank Name|   City| ST| CERT|Acquiring Institution|Closing Date|
+------------------+-------+---+-----+---------------------+------------+
|Ericson State Bank|Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
+------------------+-------+---+-----+---------------------+------------+



### 4.14 Cast datatypes

Often we would need to change the datatype for a particular variable. When we create dataframe using text files, there is a possibility thaof datatype mismatch, due to some errors/missing in data. We could cast this to a different datatype as needed. We will deal with datetypes later on. Please note the two methods to cast datatype as string below. We could also use same variable name in the tranformed variable. 

In [25]:
print('='*50)
print('Pre cast')
print(df.printSchema())

df2 = df \
.withColumn('CERT_str1', df['CERT'].cast('string')) \
.withColumn('CERT_str2', df['CERT'].cast(StringType())) #\

print('='*50)
print('Post cast')
print(df2.printSchema())


Pre cast
root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)

None
Post cast
root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)
 |-- CERT_str1: string (nullable = true)
 |-- CERT_str2: string (nullable = true)

None


### 4.15 Coalesce

`Coalesce` is a word which I frequently started using after learning SQL. In some cases, we are required to replace some columns containing nulls by a constant or maybe another column in the same data. We could use `CASE` statements to priortize replacing NULLs or using `coalesce` comes in handy.

In [26]:
df = spark.createDataFrame(
    [
      ['a1', 7, 5],
      ['a2', 5, 11],
      ['a3', None, 42],
      ['a4', 10, 15]
    ], # Data rows
    ['user', 'day_1', 'day_2'] # Column names
)

df.show()

print('='*50)
print('Using Coalesce')
df2 = df.withColumn('final_day', coalesce(col('day_1'), col('day_2'), lit(0))) # takes the first non-null value
df2.show()

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    7|    5|
|  a2|    5|   11|
|  a3| null|   42|
|  a4|   10|   15|
+----+-----+-----+

Using Coalesce
+----+-----+-----+---------+
|user|day_1|day_2|final_day|
+----+-----+-----+---------+
|  a1|    7|    5|        7|
|  a2|    5|   11|        5|
|  a3| null|   42|       42|
|  a4|   10|   15|       10|
+----+-----+-----+---------+



### 4.16 Replace values in dataframe

Replacing all values in dataframe

In [27]:
# Pre replace
df.show(2)

# Post replace
print('Replace 7 in the above dataframe with 17 at all instances')
df.na.replace(7,17).show(2)

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    7|    5|
|  a2|    5|   11|
+----+-----+-----+
only showing top 2 rows

Replace 7 in the above dataframe with 17 at all instances
+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|   17|    5|
|  a2|    5|   11|
+----+-----+-----+
only showing top 2 rows



### 4.17 Sort values

Sorting in dataframe

In [28]:
# Unsorted
print('Unsorted dataframe')
df.show(5)

# Default - ascending
print('Post ascending sorted day_1')
df.sort('day_1').show(5)

# Descending sort
print('Post descending sorted day_1')
df.sort(col('day_1').desc()).show(5)
## ANOTHER WAY TO WRITE THE STATEMENT IS ==> df.sort('ST',ascending=False).show(2)

Unsorted dataframe
+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    7|    5|
|  a2|    5|   11|
|  a3| null|   42|
|  a4|   10|   15|
+----+-----+-----+

Post ascending sorted day_1
+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a3| null|   42|
|  a2|    5|   11|
|  a1|    7|    5|
|  a4|   10|   15|
+----+-----+-----+

Post descending sorted day_1
+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a4|   10|   15|
|  a1|    7|    5|
|  a2|    5|   11|
|  a3| null|   42|
+----+-----+-----+

