# Learning PySpark in 10 minutes  

Dataframes (DF) in PySpark are used extensively for cleaning of data in files that are present in AWS S3 buckets. The cleaned dataframe is then written back to the bucket in CSV or parquet format.

I have used the famous Oracle `scott/tiger` datasets of `EMP` and `DEPT`. I believe that working with 14 rows of known data gives an easier understanding of what is changing in the underlying dataset.

The 2 files of [emp.csv](emp.csv) and [dept.csv](dept.csv) are also present in here so easy access. I have changed few formats of the hiredate column to make cleaning noticeable.

1. Basic Exploration of the dataset
   1. List the structure of the dataframe with data type of the columns
   2. List the names of the columns in the dataframe
   3. Count of rows in the DF
   4. Count of columns in the DF
   5. Explore a single column to understand the values it contains. Works better on data is numeric.
2. Select distinct values of columns
3. Filter out NULL values of a column.
4. Add a column TotalSalary as addition of Salary and Commission.
5. Do an INNER JOIN with dept file
6. Create a SalaryGroup column that has 3 categories based on salary range.
7. Apply in-build SQL function to columns
8. Create UDF (User Defined Function) and apply to DF.



## Import the required python libraries, create spark session and read emp.csv in a df

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()
emp_df = spark.read.csv("emp.csv",inferSchema=True,header=True)
emp_df.show()

+-----+------+---------+----+-----------+----+----+------+
|empno| ename|      job| mgr|   hiredate| sal| com|deptno|
+-----+------+---------+----+-----------+----+----+------+
| 7369| SMITH|    CLERK|7902| 17-12-1980| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698| 02-20-1981|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|22-FEB-1981|1250| 500|    30|
| 7566| JONES|  MANAGER|7839| 2-APR-1981|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|28/SEP/1981|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839| 1-MAY-1981|2850|null|    30|
| 7782| CLARK|  MANAGER|7839| 9-JUN-1981|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566| 09/12/1982|3000|null|    20|
| 7839|  KING|PRESIDENT|null|17-NOV-1981|5000|null|    10|
| 7844|TURNER| SALESMAN|7698| 8-SEP-1981|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788|12-JAN-1983|1100|null|    20|
| 7900| JAMES|    CLERK|7698| 3-DEC-1981| 950|null|    30|
| 7902|  FORD|  ANALYST|7566| 3-DEC-1981|3000|null|    20|
| 7934|MILLER|    CLERK|7782|23-JAN-1982|1300|null|    1

This is the complete data of the emp table with format changes in the few of the hiredates.

### 1. Basic Exploration of the dataset
1. List the structure of the dataframe with data type of the columns
2. List the names of the columns in the dataframe
3. Count of rows in the DF
4. Count of columns in the DF
5. Explore a single column to understand the values it contains. Works better on data is numeric.

In [2]:
emp_df.printSchema()                   # Notice the hiredate is also string
print(emp_df.columns)                  # list headers in the DF
print(emp_df.count())                  # 14 rows
print(len(emp_df.columns))             # 8 columns
emp_df.describe('hiredate').show()     # date is taken as string hence describe does not give correct MEAN, STDEV, MIN, MAX 
emp_df.describe('sal').show()          # works best for numeric column
emp_df.describe('com').show()
emp_df.describe('ename').show()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- com: integer (nullable = true)
 |-- deptno: integer (nullable = true)

['empno', 'ename', 'job', 'mgr', 'hiredate', 'sal', 'com', 'deptno']
14
8
+-------+----------+
|summary|  hiredate|
+-------+----------+
|  count|        14|
|   mean|      null|
| stddev|      null|
|    min|02-20-1981|
|    max|9-JUN-1981|
+-------+----------+

+-------+------------------+
|summary|               sal|
+-------+------------------+
|  count|                14|
|   mean| 2073.214285714286|
| stddev|1182.5032235162716|
|    min|               800|
|    max|              5000|
+-------+------------------+

+-------+-----------------+
|summary|              com|
+-------+-----------------+
|  count|                4|
|   mean|            550.0|
| stddev|602.7713773341708|
|  

### 2. Select distinct values of columns

In [3]:
print(emp_df.select('job').distinct().count())  # Count of distinct values
emp_df.select('job').distinct().show()          # Display those distinct values
emp_df.select('job','deptno').distinct().show() # Works on multiple columns too
emp_df.dropDuplicates(['job','deptno']).select('job','deptno').show()  # Alternate method using dropDuplicates

5
+---------+
|      job|
+---------+
|  ANALYST|
| SALESMAN|
|    CLERK|
|  MANAGER|
|PRESIDENT|
+---------+

+---------+------+
|      job|deptno|
+---------+------+
|  ANALYST|    20|
|  MANAGER|    10|
|  MANAGER|    30|
|PRESIDENT|    10|
|    CLERK|    20|
| SALESMAN|    30|
|    CLERK|    10|
|  MANAGER|    20|
|    CLERK|    30|
+---------+------+

+---------+------+
|      job|deptno|
+---------+------+
|  ANALYST|    20|
|  MANAGER|    10|
|  MANAGER|    30|
|PRESIDENT|    10|
|    CLERK|    20|
| SALESMAN|    30|
|    CLERK|    10|
|  MANAGER|    20|
|    CLERK|    30|
+---------+------+



### 3. Filter out NULL values of a column.

In [4]:
emp_df.dropna(subset=('com')).show()
emp_df.filter(col('com').isNotNull()).show()     # Alternate method

+-----+------+--------+----+-----------+----+----+------+
|empno| ename|     job| mgr|   hiredate| sal| com|deptno|
+-----+------+--------+----+-----------+----+----+------+
| 7499| ALLEN|SALESMAN|7698| 02-20-1981|1600| 300|    30|
| 7521|  WARD|SALESMAN|7698|22-FEB-1981|1250| 500|    30|
| 7654|MARTIN|SALESMAN|7698|28/SEP/1981|1250|1400|    30|
| 7844|TURNER|SALESMAN|7698| 8-SEP-1981|1500|   0|    30|
+-----+------+--------+----+-----------+----+----+------+

+-----+------+--------+----+-----------+----+----+------+
|empno| ename|     job| mgr|   hiredate| sal| com|deptno|
+-----+------+--------+----+-----------+----+----+------+
| 7499| ALLEN|SALESMAN|7698| 02-20-1981|1600| 300|    30|
| 7521|  WARD|SALESMAN|7698|22-FEB-1981|1250| 500|    30|
| 7654|MARTIN|SALESMAN|7698|28/SEP/1981|1250|1400|    30|
| 7844|TURNER|SALESMAN|7698| 8-SEP-1981|1500|   0|    30|
+-----+------+--------+----+-----------+----+----+------+



### 4. Add a column TotalSalary as addition of Salary and Commission.
This is an example of appending a column to the existing dataframe. Notice that since commission is NULL for many rows, we use the `coalesce` function to `convert NULL to zero` so that addition is possible.
* since `total_salary` does not exist, it gets created, had it existed then it would be overwritten
* `col` is used to get column name
* `coalesce` goes through its parameters till it finds a NOT NULL value
* `lit` outputs a constant value

In [5]:
emp_df = emp_df.withColumn('total_salary',col('sal')+coalesce(emp_df['com'],lit('0')))
emp_df.show()    # Original DF now has an additional column

+-----+------+---------+----+-----------+----+----+------+------------+
|empno| ename|      job| mgr|   hiredate| sal| com|deptno|total_salary|
+-----+------+---------+----+-----------+----+----+------+------------+
| 7369| SMITH|    CLERK|7902| 17-12-1980| 800|null|    20|       800.0|
| 7499| ALLEN| SALESMAN|7698| 02-20-1981|1600| 300|    30|      1900.0|
| 7521|  WARD| SALESMAN|7698|22-FEB-1981|1250| 500|    30|      1750.0|
| 7566| JONES|  MANAGER|7839| 2-APR-1981|2975|null|    20|      2975.0|
| 7654|MARTIN| SALESMAN|7698|28/SEP/1981|1250|1400|    30|      2650.0|
| 7698| BLAKE|  MANAGER|7839| 1-MAY-1981|2850|null|    30|      2850.0|
| 7782| CLARK|  MANAGER|7839| 9-JUN-1981|2450|null|    10|      2450.0|
| 7788| SCOTT|  ANALYST|7566| 09/12/1982|3000|null|    20|      3000.0|
| 7839|  KING|PRESIDENT|null|17-NOV-1981|5000|null|    10|      5000.0|
| 7844|TURNER| SALESMAN|7698| 8-SEP-1981|1500|   0|    30|      1500.0|
| 7876| ADAMS|    CLERK|7788|12-JAN-1983|1100|null|    20|      

### 5. Do an INNER JOIN with dept file
`Outer join`, `union`, and `union all` can be done on similar lines.
Read dept.csv into a dataframe

In [6]:
dept_df = spark.read.csv("dept.csv",inferSchema=True,header=True)
dept_df.show()

+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+



we create a new DF only for learning. This can be done in the same DF.
* `sal > 10` is not required in the join, it is only there to show muti join condition
* `drop` is done to remove the extra `deptno` column that would have the same values
* `broadcast` is an optimization. The small table is broadcast to all nodes for faster join

In [7]:
joined_df = emp_df.join(other=broadcast(dept_df),how='inner', on=[emp_df.deptno==dept_df.deptno, emp_df.sal>='10']).drop(dept_df.deptno)
joined_df.show()

+-----+------+---------+----+-----------+----+----+------+------------+----------+--------+
|empno| ename|      job| mgr|   hiredate| sal| com|deptno|total_salary|     dname|     loc|
+-----+------+---------+----+-----------+----+----+------+------------+----------+--------+
| 7369| SMITH|    CLERK|7902| 17-12-1980| 800|null|    20|       800.0|  RESEARCH|  DALLAS|
| 7499| ALLEN| SALESMAN|7698| 02-20-1981|1600| 300|    30|      1900.0|     SALES| CHICAGO|
| 7521|  WARD| SALESMAN|7698|22-FEB-1981|1250| 500|    30|      1750.0|     SALES| CHICAGO|
| 7566| JONES|  MANAGER|7839| 2-APR-1981|2975|null|    20|      2975.0|  RESEARCH|  DALLAS|
| 7654|MARTIN| SALESMAN|7698|28/SEP/1981|1250|1400|    30|      2650.0|     SALES| CHICAGO|
| 7698| BLAKE|  MANAGER|7839| 1-MAY-1981|2850|null|    30|      2850.0|     SALES| CHICAGO|
| 7782| CLARK|  MANAGER|7839| 9-JUN-1981|2450|null|    10|      2450.0|ACCOUNTING|NEW YORK|
| 7788| SCOTT|  ANALYST|7566| 09/12/1982|3000|null|    20|      3000.0|  RESEARC

### 6. Create a SalaryGroup column that has 3 categories based on salary range.
We add a new column `salaryGroup` based on multiple salary ranges. This is to show multiple `when` statements being used together

In [8]:
emp_df = emp_df.withColumn('salaryGroup',when(emp_df["sal"]<1550,'lowSalary')
                  .when((emp_df["sal"]>=1550) & (emp_df["sal"]< 3000),'avgSalary')
                  .when(emp_df["sal"]>=3000,'highSalary').when(emp_df["sal"].isNull(),'Error'))
emp_df.show()

+-----+------+---------+----+-----------+----+----+------+------------+-----------+
|empno| ename|      job| mgr|   hiredate| sal| com|deptno|total_salary|salaryGroup|
+-----+------+---------+----+-----------+----+----+------+------------+-----------+
| 7369| SMITH|    CLERK|7902| 17-12-1980| 800|null|    20|       800.0|  lowSalary|
| 7499| ALLEN| SALESMAN|7698| 02-20-1981|1600| 300|    30|      1900.0|  avgSalary|
| 7521|  WARD| SALESMAN|7698|22-FEB-1981|1250| 500|    30|      1750.0|  lowSalary|
| 7566| JONES|  MANAGER|7839| 2-APR-1981|2975|null|    20|      2975.0|  avgSalary|
| 7654|MARTIN| SALESMAN|7698|28/SEP/1981|1250|1400|    30|      2650.0|  lowSalary|
| 7698| BLAKE|  MANAGER|7839| 1-MAY-1981|2850|null|    30|      2850.0|  avgSalary|
| 7782| CLARK|  MANAGER|7839| 9-JUN-1981|2450|null|    10|      2450.0|  avgSalary|
| 7788| SCOTT|  ANALYST|7566| 09/12/1982|3000|null|    20|      3000.0| highSalary|
| 7839|  KING|PRESIDENT|null|17-NOV-1981|5000|null|    10|      5000.0| high

### 7. Apply in-build SQL function to columns
1. Creating a new DF with select function
2. Using SQL like functions to modify the Uppercase strings to Initcap
3. Alias needs to be used to give appropriate name especially when a function is applied to the column
4. Changing the column header for `sal` to `salary`

In [9]:
smaller_df = emp_df.select("empno",initcap(col("ename")).alias("ename"),initcap(col("job")).alias("job"),col("sal").alias("salary"))
smaller_df.show()

+-----+------+---------+------+
|empno| ename|      job|salary|
+-----+------+---------+------+
| 7369| Smith|    Clerk|   800|
| 7499| Allen| Salesman|  1600|
| 7521|  Ward| Salesman|  1250|
| 7566| Jones|  Manager|  2975|
| 7654|Martin| Salesman|  1250|
| 7698| Blake|  Manager|  2850|
| 7782| Clark|  Manager|  2450|
| 7788| Scott|  Analyst|  3000|
| 7839|  King|President|  5000|
| 7844|Turner| Salesman|  1500|
| 7876| Adams|    Clerk|  1100|
| 7900| James|    Clerk|   950|
| 7902|  Ford|  Analyst|  3000|
| 7934|Miller|    Clerk|  1300|
+-----+------+---------+------+



### 8. Create UDF (User Defined Function) and apply to DF
User Defined Functions (UDF) should be the last resort. Builtin functions are tested and perform better.
The above activity of creating the column `salaryGroup` could also have been done using an UDF.
Using and UDf involves:
1. Writing the function in Python
2. Registering the function as UDF
3. Invoking the UDF
  
As examples, 2 UDFs are created.
1. Functionality similar to `initcap()` function, it will capitalize the first letter and lowercase the rest of the string.
2. Apply a set of formats to hiredate to convert it into ISO type.

In [10]:
def capitalizeStr(mystr):
    return mystr[0].upper() + mystr[1:].lower()

capStrUDF = udf(lambda z: capitalizeStr(z),StringType())

In [11]:
from datetime import datetime
from dateutil.parser import *

def str2date (dt_str):
    try:
        return parse(dt_str)
    except ValueError:
        date_formats = ['%d-%m-%Y', '%d/%m/%Y', '%d.%m.%Y', '%m-%d-%Y', '%d-%b-%Y', '%d/%b/%Y','%Y-%m-%d'] 
        for date_format in date_formats:
            try:
                return datetime.strptime(dt_str,date_format) 
            except ValueError:
                pass
    raise ValueError('No valid date format found')

In [12]:
convertDateUDF = udf(lambda z: str2date(z), DateType())

In [14]:
emp_df = emp_df.withColumn('hiredateISO',convertDateUDF(col('hiredate'))).withColumn('job',capStrUDF(col('job')))
emp_df.show()

+-----+------+---------+----+-----------+----+----+------+------------+-----------+-----------+
|empno| ename|      job| mgr|   hiredate| sal| com|deptno|total_salary|salaryGroup|hiredateISO|
+-----+------+---------+----+-----------+----+----+------+------------+-----------+-----------+
| 7369| SMITH|    Clerk|7902| 17-12-1980| 800|null|    20|       800.0|  lowSalary| 1980-12-17|
| 7499| ALLEN| Salesman|7698| 02-20-1981|1600| 300|    30|      1900.0|  avgSalary| 1981-02-20|
| 7521|  WARD| Salesman|7698|22-FEB-1981|1250| 500|    30|      1750.0|  lowSalary| 1981-02-22|
| 7566| JONES|  Manager|7839| 2-APR-1981|2975|null|    20|      2975.0|  avgSalary| 1981-04-02|
| 7654|MARTIN| Salesman|7698|28/SEP/1981|1250|1400|    30|      2650.0|  lowSalary| 1981-09-28|
| 7698| BLAKE|  Manager|7839| 1-MAY-1981|2850|null|    30|      2850.0|  avgSalary| 1981-05-01|
| 7782| CLARK|  Manager|7839| 9-JUN-1981|2450|null|    10|      2450.0|  avgSalary| 1981-06-09|
| 7788| SCOTT|  Analyst|7566| 09/12/1982

### 9. Rename column
Renaming the column sal to salary and com to commission

In [17]:
emp_df = emp_df.withColumnRenamed('sal','salary').withColumnRenamed('com','commission')
emp_df.show()

+-----+------+---------+----+-----------+------+----------+------+------------+-----------+-----------+
|empno| ename|      job| mgr|   hiredate|salary|commission|deptno|total_salary|salaryGroup|hiredateISO|
+-----+------+---------+----+-----------+------+----------+------+------------+-----------+-----------+
| 7369| SMITH|    Clerk|7902| 17-12-1980|   800|      null|    20|       800.0|  lowSalary| 1980-12-17|
| 7499| ALLEN| Salesman|7698| 02-20-1981|  1600|       300|    30|      1900.0|  avgSalary| 1981-02-20|
| 7521|  WARD| Salesman|7698|22-FEB-1981|  1250|       500|    30|      1750.0|  lowSalary| 1981-02-22|
| 7566| JONES|  Manager|7839| 2-APR-1981|  2975|      null|    20|      2975.0|  avgSalary| 1981-04-02|
| 7654|MARTIN| Salesman|7698|28/SEP/1981|  1250|      1400|    30|      2650.0|  lowSalary| 1981-09-28|
| 7698| BLAKE|  Manager|7839| 1-MAY-1981|  2850|      null|    30|      2850.0|  avgSalary| 1981-05-01|
| 7782| CLARK|  Manager|7839| 9-JUN-1981|  2450|      null|    1

### 10. Writing the dataframe back to disk
Below command will write the DF back to disk in the folder `emp_output`. As per Spark and Hadoop functionality, one file is created per worker.

In [None]:
emp_df.coalesce(1).write.csv('emp_output',header=True)