<a href="https://colab.research.google.com/github/akash865/Watchdogs/blob/master/Spark_101.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark 101 - Getting started with spark

## **Running Spark in Colab**

Running spark codes need some library imports. Please follow notebook to get started.

### Initialize Spark

The below line of codes initializes spark. This installs Apache Spark 3.0.0, Java 8, and Findspark, a library that makes it easy for Python to find Spark. You might also need to refer to correct version while installing. Here I am using 2.4.5. I refered to a medium article by Asif to install spark.

> [Link to document guide](https://towardsdatascience.com/pyspark-in-google-colab-6821c2faf41c)




In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# sqlContext = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
df = spark.sql('''select 'spark' as hello''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



## **Ways to create dataframe**

If you are able to run the codes above, you are good to proceed. Let's start with ways to create dataframe in spark. We will also do some basic data manipulation. But let's start with importing useful libraries first. Then we will look at some useful dataframe methods.
<br>
<br>
Please note that there are many functions available in `pyspark.sql.function` which will be helpful for data analysis and descriptive analytics.

In [None]:
# Import spark libraries
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F # for more such functions
from functools import reduce

### View Spark dataframe

We use show or collect method as shown below to view dataframe in spark.

In [None]:
df.show()
OR 
df.collect()

### Create Dataframe from list

In [None]:
df = spark.createDataFrame(
    [
      ["2015-06-23", 5],
      ["2016-07-20", 7]
    ], # Data rows
    ["data_date", "months_to_add"] # Column names
)

df.show()

+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2015-06-23|            5|
|2016-07-20|            7|
+----------+-------------+



### Create Dataframe from RDD

In [None]:
l =  [["2015-06-23", 5]
      ,["2016-07-20", 7]] #List with data elements
rdd1 = spark.sparkContext.parallelize(l)

In [None]:

row_rdd = rdd1.map(lambda x: Row(x[0], x[1]))
df = spark.createDataFrame(row_rdd, ['data_date', 'months_to_add'])

df.show()

+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2015-06-23|            5|
|2016-07-20|            7|
+----------+-------------+



### Create Dataframe from RDD and datatype

In [None]:

schema = StructType([
    StructField("data_date", StringType(), True),
    StructField("months_to_add", IntegerType(), True)]) # Col, Type, Nullable

df = spark.createDataFrame(rdd1, schema)
df.show()


+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2015-06-23|            5|
|2016-07-20|            7|
+----------+-------------+



### Create Dataframe from lists

In [None]:
# Building a simple dataframe:
schema = StructType([
    StructField("data_date", StringType(), True),
    StructField("months_to_add", IntegerType(), True)
    ]) # Col, Type, Nullable


column1 = ["2015-06-23", "2016-07-20"]
column2 = [5, 7]

# Dataframe:
df = spark.createDataFrame(list(zip(column1, column2)), schema=schema)
df.show()

+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2015-06-23|            5|
|2016-07-20|            7|
+----------+-------------+



### Create Dataframe from Pandas dataframe

In [None]:
import pandas as pd
# df = spark.createDataFrame(pandas_df.toPandas()) # Creating pandas dataframe first

l =  [["2015-06-23", 5]
      ,["2016-07-20", 7]] #List with data elements
    
df = spark.createDataFrame(pd.DataFrame(l),['data_date','months_to_add'])
df.show()

+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2015-06-23|            5|
|2016-07-20|            7|
+----------+-------------+



### Create Dataframe from hive table

In [None]:
input_table = <db_name>.<table_name>
df = spark.sql('''select data_date, months_to_add from {0}'''.format(input_table)

### Create Dataframe from CSV or other text file

Data is available from data.gov which is FDIC failed bank list. You may download the same from link given or use any text file you have. Data link: https://catalog.data.gov/dataset/fdic-failed-bank-list.
<br>
<br>
Do note the arguments in read function. `header` is True for providing data with first line as header. `inferschema` is just a lazy way of using best possible data types. `delimiter` could be changed to tab (\t), or space(\\s) depending on input file.
<br>
<br>
I have also uploaded the same file to github. Feel free to use the link directly.

In [None]:
from google.colab import files
files.upload()

{}

In [None]:
! ls

banklist.csv  spark-2.4.5-bin-hadoop2.7      spark-2.4.5-bin-hadoop2.7.tgz.1
sample_data   spark-2.4.5-bin-hadoop2.7.tgz


In [None]:
# inferschema loads the closest datatype automatically from the data
# header option reads first line as columns, else default value

df = spark.read.options(header="true", inferschema = "true", delimiter=",").csv('banklist.csv')

print('df.count  :', df.count())
print('df.col ct :', len(df.columns))
print('df.columns:', df.columns)

df.count  : 561
df.col ct : 6
df.columns: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


In [None]:
# Using file uploaded to github

url = "https://raw.githubusercontent.com/akash865/spark_101/master/banklist.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

df = spark.read.options(header="true", inferschema = "true", delimiter=",").csv("file://"+SparkFiles.get("banklist.csv"))

print('df.count  :', df.count())
print('df.col ct :', len(df.columns))
print('df.columns:', df.columns)

df.count  : 561
df.col ct : 6
df.columns: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


## **Using SQL in spark**

Often there is a need to use SQL for data manipulation. Though spark provides nearly all functions that we use in SQL, it is often easier to use SQL because of its familiarity. Please do note that it is best practice to avoid spaces in column names (in the example below). We will also learn spark functions/methods that will help as we go along. We can start with schema details.

In [None]:
df.registerTempTable("temp_tb")

df_check = spark.sql('''select `Bank Name`, City, `Closing Date` from temp_tb''')
df_check.show(4, truncate=False)

+--------------------------------+-------------+------------+
|Bank Name                       |City         |Closing Date|
+--------------------------------+-------------+------------+
|The First State Bank            |Barboursville|3-Apr-20    |
|Ericson State Bank              |Ericson      |14-Feb-20   |
|City National Bank of New Jersey|Newark       |1-Nov-19    |
|Resolute Bank                   |Maumee       |25-Oct-19   |
+--------------------------------+-------------+------------+
only showing top 4 rows



## **Dataframe Basic Operations**

### Describe dataframe

Describe is a useful method which performs `count`, `mean`, `stddev`, `min` and `max` on all columns. We could limit our variables by passing in column name(s) in `describe`.

In [None]:
df.describe().show()

+-------+--------------------+-------+----+-----------------+---------------------+------------+
|summary|           Bank Name|   City|  ST|             CERT|Acquiring Institution|Closing Date|
+-------+--------------------+-------+----+-----------------+---------------------+------------+
|  count|                 561|    561| 561|              561|                  561|         561|
|   mean|                null|   null|null|31685.68449197861|                 null|        null|
| stddev|                null|   null|null|16446.65659309965|                 null|        null|
|    min|1st American Stat...|Acworth|  AL|               91|      1st United Bank|    1-Aug-08|
|    max|               ebank|Wyoming|  WY|            58701|  Your Community Bank|    9-Sep-11|
+-------+--------------------+-------+----+-----------------+---------------------+------------+



In [None]:
df.describe('City', 'ST').show()

+-------+-------+----+
|summary|   City|  ST|
+-------+-------+----+
|  count|    561| 561|
|   mean|   null|null|
| stddev|   null|null|
|    min|Acworth|  AL|
|    max|Wyoming|  WY|
+-------+-------+----+



### Counts, Columns and Schema

In [None]:
print('df.count		:', df.count())
print('df.columns	:', df.columns)
print('df dtypes	:', df.dtypes)
print('df schema 1:', df.schema)
print('df schema 2:', df.printSchema())


df.count		: 561
df.colums	: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']
df dtypes	: [('Bank Name', 'string'), ('City', 'string'), ('ST', 'string'), ('CERT', 'int'), ('Acquiring Institution', 'string'), ('Closing Date', 'string')]
df schema 1: StructType(List(StructField(Bank Name,StringType,true),StructField(City,StringType,true),StructField(ST,StringType,true),StructField(CERT,IntegerType,true),StructField(Acquiring Institution,StringType,true),StructField(Closing Date,StringType,true)))
root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)

df schema 2: None


### Remove Duplicates

In [None]:
df = df.dropDuplicates()
print('df.count		:', df.count())
print('df.columns	:', df.columns)

df.count		: 561
df.colums	: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


### Select specific columns

In [None]:
df2 = df.select(*['Bank Name', 'City'])
df2.show(2)

+--------------------+-------------+
|           Bank Name|         City|
+--------------------+-------------+
|The First State Bank|Barboursville|
|  Ericson State Bank|      Ericson|
+--------------------+-------------+
only showing top 2 rows



### Select multiple columns

Alternatively we could remove multiple columns from dataframe.

In [None]:
col_l = list(set(df.columns)  - {'CERT','ST'})
df2 = df.select(*col_l)
df2.show(2)

+--------------------+---------------------+------------+-------------+
|           Bank Name|Acquiring Institution|Closing Date|         City|
+--------------------+---------------------+------------+-------------+
|The First State Bank|       MVB Bank, Inc.|    3-Apr-20|Barboursville|
|  Ericson State Bank| Farmers and Merch...|   14-Feb-20|      Ericson|
+--------------------+---------------------+------------+-------------+
only showing top 2 rows



### Rename columns

We will rename multiple columns in the example below. Let's remove spaces and make columns more understable. 

In [None]:
df2 = df \
  .withColumnRenamed('Bank Name'            , 'bank_name') \
  .withColumnRenamed('Acquiring Institution', 'acq_institution') \
  .withColumnRenamed('Closing Date'         , 'closing_date') \
  .withColumnRenamed('ST'                   , 'state') \
  .withColumnRenamed('CERT'                 , 'cert') #\

df2.show(2)

+--------------------+--------+-----+-----+---------------+------------+
|           bank_name|    City|state| cert|acq_institution|closing_date|
+--------------------+--------+-----+-----+---------------+------------+
| First Bank of Idaho| Ketchum|   ID|34396|U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|   IL| 3735|    Harris N.A.|   23-Apr-10|
+--------------------+--------+-----+-----+---------------+------------+
only showing top 2 rows



### Rename columns using loop

If we have multiple columns, we could use loop to help us perform similar operations like case change, replace characters. In the below example, we are replacing all spaces with `_`.

In [None]:
rename_expr = [col(column).alias(column.replace(' ', '_')) for column in df.columns]

df2 = df.select(*rename_expr)
df2.show(2)

+--------------------+-------------+---+-----+---------------------+------------+
|           Bank_Name|         City| ST| CERT|Acquiring_Institution|Closing_Date|
+--------------------+-------------+---+-----+---------------------+------------+
|The First State Bank|Barboursville| WV|14361|       MVB Bank, Inc.|    3-Apr-20|
|  Ericson State Bank|      Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
+--------------------+-------------+---+-----+---------------------+------------+
only showing top 2 rows



### Add columns

In the below example, we will copy the column state using `col` function. PySpark SQL libraries contains many functions which we will be using through. Note that `col` is one such function which returns column values.

In [None]:
df2 = df.withColumn('state', col('ST'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-----+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|state|
+--------------------+--------+---+-----+---------------------+------------+-----+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|
+--------------------+--------+---+-----+---------------------+------------+-----+
only showing top 2 rows



### Add constant column

We will be using `lit` function to add a constant value. `lit` is an acronym for linear transform which transforms a single value to multiple rows.

In [None]:
df2 = df.withColumn('country', lit('US'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|country|
+--------------------+--------+---+-----+---------------------+------------+-------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|     US|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|     US|
+--------------------+--------+---+-----+---------------------+------------+-------+
only showing top 2 rows



### Drop columns

In [None]:
df2 = df.drop('CERT')
df2.show(2)

+--------------------+-------------+---+---------------------+------------+
|           Bank Name|         City| ST|Acquiring Institution|Closing Date|
+--------------------+-------------+---+---------------------+------------+
|The First State Bank|Barboursville| WV|       MVB Bank, Inc.|    3-Apr-20|
|  Ericson State Bank|      Ericson| NE| Farmers and Merch...|   14-Feb-20|
+--------------------+-------------+---+---------------------+------------+
only showing top 2 rows



### Drop multiple columns

It's also simple. All we have to do is pass a list of columns using `*` to remove. Below are 2 ways to doing the same.

In [None]:
df2 = df.drop(*['CERT','ST'])
df2.show(2)

+--------------------+-------------+---------------------+------------+
|           Bank Name|         City|Acquiring Institution|Closing Date|
+--------------------+-------------+---------------------+------------+
|The First State Bank|Barboursville|       MVB Bank, Inc.|    3-Apr-20|
|  Ericson State Bank|      Ericson| Farmers and Merch...|   14-Feb-20|
+--------------------+-------------+---------------------+------------+
only showing top 2 rows



In [None]:
df2 = reduce(DataFrame.drop, ['CERT','ST'], df)
df2.show(2)

+--------------------+-------------+---------------------+------------+
|           Bank Name|         City|Acquiring Institution|Closing Date|
+--------------------+-------------+---------------------+------------+
|The First State Bank|Barboursville|       MVB Bank, Inc.|    3-Apr-20|
|  Ericson State Bank|      Ericson| Farmers and Merch...|   14-Feb-20|
+--------------------+-------------+---------------------+------------+
only showing top 2 rows



### Filter data

Below we have examples to filter data using `>`, `<`, `==`, `between` and `isin`

In [None]:
# Equal to values
df2 = df.where(df['ST'] == 'NE')

# Between values
df3 = df.where(df['CERT'].between('1000','2000'))

# Is inside multiple values
df4 = df.where(df['ST'].isin('NE','IL'))

print('df.count  :', df.count())
print('df2.count :', df2.count())
print('df3.count :', df3.count())
print('df4.count :', df4.count())

print('\ndf2 sample below')
df2.show(2)

df.count  : 561
df2.count : 4
df3.count : 9
df4.count : 73

df2 sample below
+-------------------+-------+---+-----+---------------------+------------+
|          Bank Name|   City| ST| CERT|Acquiring Institution|Closing Date|
+-------------------+-------+---+-----+---------------------+------------+
| Ericson State Bank|Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
|Mid City Bank, Inc.|  Omaha| NE|19397|         Premier Bank|    4-Nov-11|
+-------------------+-------+---+-----+---------------------+------------+
only showing top 2 rows



### Filter data using logical operators

If we need to filter using multiple conditions, we could use logical operators. Please note the logical operators here, `AND`:`&`, `OR`:`|` and `NOT`:`!`

In [None]:
df2 = df.where((df['ST'] == 'NE') & (df['City'] == 'Ericson'))
df2.show(2)

+------------------+-------+---+-----+---------------------+------------+
|         Bank Name|   City| ST| CERT|Acquiring Institution|Closing Date|
+------------------+-------+---+-----+---------------------+------------+
|Ericson State Bank|Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
+------------------+-------+---+-----+---------------------+------------+



### Cast datatypes

Often we would need to change the datatype for a particular variable. When we create dataframe using text files, there is a possibility thaof datatype mismatch, due to some errors/missing in data. We could cast this to a different datatype as needed. We will deal with datetypes later on. Please note the two methods to cast datatype as string below. We could also use same variable name in the tranformed variable. 

In [None]:
print(df.printSchema())

df2 = df \
.withColumn('CERT_str1', df['CERT'].cast('string')) \
.withColumn('CERT_str2', df['CERT'].cast(StringType())) #\

print('Post cast')
print(df2.printSchema())


root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)

None
Post cast
root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)
 |-- CERT_str1: string (nullable = true)
 |-- CERT_str2: string (nullable = true)

None


### Coalesce

Coalesce is a word which I frequently started using after learning SQL. In some cases, we are required to replace some columns containing nulls by a constant or maybe another column in the same data. We could use `CASE` statements to priortize replacing NULLs or using `coalesce` comes in handy.

In [None]:
df = spark.createDataFrame(
    [
      ['a1', 5, 5],
      ['a2', 7, 11],
      ['a3', None, 10],
      ['a4', 10, 15]
    ], # Data rows
    ['user', 'day_1', 'day_2'] # Column names
)

df.show()

df2 = df.withColumn('day', coalesce(col('day_1'), col('day_2'), lit(0)))
df2.show()

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    5|    5|
|  a2|    7|   11|
|  a3| null|   10|
|  a4|   10|   15|
+----+-----+-----+

+----+-----+-----+---+
|user|day_1|day_2|day|
+----+-----+-----+---+
|  a1|    5|    5|  5|
|  a2|    7|   11|  7|
|  a3| null|   10| 10|
|  a4|   10|   15| 10|
+----+-----+-----+---+



### Replace values in dataframe

Replacing all values in dataframe

In [None]:
# Pre replace
df.show(2)
# Post replace
df.na.replace(7,17).show(2)

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    5|    5|
|  a2|    7|   11|
+----+-----+-----+
only showing top 2 rows

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    5|    5|
|  a2|   17|   11|
+----+-----+-----+
only showing top 2 rows



### Sort values

Sorting in dataframe

In [None]:
# Default - ascending
df.sort('day_1').show(2)

# Descending sort
df.sort(col('day_1').desc()).show(2)
## ANOTHER WAY TO WRITE THE STATEMENT IS ==> df.sort('ST',ascending=False).show(2)

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a3| null|   10|
|  a1|    5|    5|
+----+-----+-----+
only showing top 2 rows

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a4|   10|   15|
|  a2|    7|   11|
+----+-----+-----+
only showing top 2 rows



# Spark 102 - Some additional methods/functions

Now that we know some basic spark operations from previous section which will be used very frequently. We can move on to look at additional functions which are also helpful. In this section we will be using functions from `pyspark.sql.functions` very often. 

### String functions - concat, concat_ws and substring

In [None]:
# Input the file 
# Using file uploaded to github

url = "https://raw.githubusercontent.com/akash865/spark_101/master/banklist.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

df = spark.read.options(header="true", inferschema = "true", delimiter=",").csv("file://"+SparkFiles.get("banklist.csv"))

print('df.count  :', df.count())
print('df.col ct :', len(df.columns))
print('df.columns:', df.columns)

df.count  : 561
df.col ct : 6
df.columns: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


In [None]:
# Substring - Substring starts from the specified position to the specified length
df2 = df.withColumn('city_index', substring(col('ST'),1,1))
df2.show(2)

# Concat - Concatenates multiple input string columns together into a single string column
df2 = df.withColumn('location', concat('City','ST'))
df2.show(2)

# Concat ws - Concatenates multiple input string columns together with the specified seperator into a single string column
df2 = df.withColumn('location', concat_ws('-','City','ST'))
df2.show(2)


+--------------------+-------------+---+-----+---------------------+------------+----------+
|           Bank Name|         City| ST| CERT|Acquiring Institution|Closing Date|city_index|
+--------------------+-------------+---+-----+---------------------+------------+----------+
|The First State Bank|Barboursville| WV|14361|       MVB Bank, Inc.|    3-Apr-20|         W|
|  Ericson State Bank|      Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|         N|
+--------------------+-------------+---+-----+---------------------+------------+----------+
only showing top 2 rows

+--------------------+-------------+---+-----+---------------------+------------+---------------+
|           Bank Name|         City| ST| CERT|Acquiring Institution|Closing Date|       location|
+--------------------+-------------+---+-----+---------------------+------------+---------------+
|The First State Bank|Barboursville| WV|14361|       MVB Bank, Inc.|    3-Apr-20|BarboursvilleWV|
|  Ericson State Bank|   

### Cross tab

Cross tabulation provides frequency distribution between set of variables.

In [None]:
df2 = df.stat.crosstab('City', 'ST')
df2.show(2)

+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|       City_ST| AL| AR| AZ| CA| CO| CT| FL| GA| HI| IA| ID| IL| IN| KS| KY| LA| MA| MD| MI| MN| MO| MS| NC| NE| NH| NJ| NM| NV| NY| OH| OK| OR| PA| PR| SC| SD| TN| TX| UT| VA| WA| WI| WV| WY|
+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|       Clayton|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|Salt Lake City|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
+--------------+---+---+---+---+---

### Case Statements OR if/else

A simple case statements could be written using `when` or `expr` from `pyspark.sql` library. We could also use UDF (we will see examples later) to do the same. 
<br>
<br>
Using `when` gets confusing sometimes with more elif statements. I prefer to use `when` in case of a simple if/else condition. For criteria with multiple elif statements, its convenient to use `expr`. Please see use of operators `=` and `like` below with wildcard `%`.

In [None]:
df2 = df.withColumn('state_ne', when(col('ST')=='NE', 1).otherwise(0))
df2.show(2)

+--------------------+-------------+---+-----+---------------------+------------+--------+
|           Bank Name|         City| ST| CERT|Acquiring Institution|Closing Date|state_ne|
+--------------------+-------------+---+-----+---------------------+------------+--------+
|The First State Bank|Barboursville| WV|14361|       MVB Bank, Inc.|    3-Apr-20|       0|
|  Ericson State Bank|      Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|       1|
+--------------------+-------------+---+-----+---------------------+------------+--------+
only showing top 2 rows



In [None]:
state_val = expr("""
  IF(ST is NULL, NULL
  ,IF(ST = 'NE', 1
  ,IF(ST = 'WV', 2
  , 3
  )))
""")

df2 = df.withColumn('state_val', state_val)
df2.show(3)


+--------------------+-------------+---+-----+---------------------+------------+---------+
|           Bank Name|         City| ST| CERT|Acquiring Institution|Closing Date|state_val|
+--------------------+-------------+---+-----+---------------------+------------+---------+
|The First State Bank|Barboursville| WV|14361|       MVB Bank, Inc.|    3-Apr-20|        2|
|  Ericson State Bank|      Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|        1|
|City National Ban...|       Newark| NJ|21111|      Industrial Bank|    1-Nov-19|        3|
+--------------------+-------------+---+-----+---------------------+------------+---------+
only showing top 3 rows



In [None]:
bank_val = expr("""
  IF(City is NULL, NULL
  ,IF(City like '%ville%', 'Ville'
  , 'Other'
  ))
""")

df2 = df.withColumn('bank_val', bank_val)
df2.show(2)

+--------------------+-------------+---+-----+---------------------+------------+--------+
|           Bank Name|         City| ST| CERT|Acquiring Institution|Closing Date|bank_val|
+--------------------+-------------+---+-----+---------------------+------------+--------+
|The First State Bank|Barboursville| WV|14361|       MVB Bank, Inc.|    3-Apr-20|   Ville|
|  Ericson State Bank|      Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|   Other|
+--------------------+-------------+---+-----+---------------------+------------+--------+
only showing top 2 rows



### NULL values treatment

Null values are part of data as most of the data we encounter are not clean. A step in cleaning variables include checking for nulls before using any data.
<br>
<br>
The null values in python have a datatype called `NoneType` therefore to check for null values we must always use `isNone()` and `isnotNone()` functions for columns. Any columns having null values can't be passed as an input to any pyspark function because the functions can't take Null values. So we must first check the null values using `isNull` function of the dataframe and should treat them first .
<br>
<br>
The bank list that we are using doesn't contain any null values, but the methods that we're using below are most commonly used to clean our data.
1. Filter nulls using `where`
2. Checking nulls across all columns using `for` loop
3. Replacing null with contant using `fillna`

In [None]:
df = spark.createDataFrame(
    [
      ['a1', 5, 5],
      ['a2', 7, 11],
      ['a3', None, 10],
      ['a4', 10, 15]
    ], # Data rows
    ['user', 'day_1', 'day_2'] # Column names
)

df.show()

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    5|    5|
|  a2|    7|   11|
|  a3| null|   10|
|  a4|   10|   15|
+----+-----+-----+



In [None]:
# 1. Checking a column for NULLS
df.where(col('day_1').isNull()).count()

1

In [None]:
# 2. Checking all columns for NULLS using `for` loop
for col_i in df.columns:
  print('Null count for col', col_i, ':', df.where(col(col_i).isNull()).count())

Null count for col user : 0
Null count for col day_1 : 1
Null count for col day_2 : 0


In [None]:
# 3. Replacing all NULLS with a constant in a list of columns
df = df.fillna(0, subset=['day_1', 'day_2'])

# Post replace
df.show()

+----+-----+-----+
|user|day_1|day_2|
+----+-----+-----+
|  a1|    5|    5|
|  a2|    7|   11|
|  a3|    0|   10|
|  a4|   10|   15|
+----+-----+-----+



### Date formats

This section is going to be a long mostly due to confusions around using date format. Below we will see how we can treat date formats and some addtional methods or functions that are required for date manipulations.

In [None]:
# Dataframe with dates
df = spark.createDataFrame(
    [
      ["2015-09-23", 5],
      ["2016-07-20", 7]
    ], # Data rows
    ["data_date", "months_to_add"] # Column names
)

df.show()

+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2015-09-23|            5|
|2016-07-20|            7|
+----------+-------------+



In [None]:
df.sort(col('data_date'), ascending=False).show()

+----------+-------------+
| data_date|months_to_add|
+----------+-------------+
|2016-07-20|            7|
|2015-09-23|            5|
+----------+-------------+



In [None]:
print('df dtypes	:', df.dtypes)

df dtypes	: [('data_date', 'string'), ('months_to_add', 'bigint')]


In [None]:
df.withColumn('a', F.date_add(col('data_date'), 5)).show()

+----------+-------------+----------+
| data_date|months_to_add|         a|
+----------+-------------+----------+
|2015-09-23|            5|2015-09-28|
|2016-07-20|            7|2016-07-25|
+----------+-------------+----------+

