### Path to CSV

In [61]:
path = r"Datasets\simpleTestDs.csv"

### Initialize Findspark
#### Helps locate PySpark in local environment

In [62]:
import findspark
findspark.init()

### Import Pyspark and Necessary Libraries

In [63]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import upper

### Initialize SparkSession

In [64]:
ss = SparkSession.builder.master('local').appName('appOne').getOrCreate()

### Read the CSV

In [65]:
df = ss.read.options(header=True, inferSchema=True).csv(path)
df.show()

+---+-------+--------+-----+------+
| id|  fname|   lname|  age|   sal|
+---+-------+--------+-----+------+
|  1|    jim|halperts|  135| 40000|
|  2|    pam|  beesly|   34|36,000|
|  3|michael|   scott|60000|    41|
|  4|   Toby|flencher|   44| 66000|
|  5|  oscar|martinez|   38|$42000|
|  6| Angela|  martin|   34| 36000|
+---+-------+--------+-----+------+



### Define UDF

In [66]:
def fixAge(age):
    if age>100:
        return age-100
    return age

fixAgeUDF = udf(lambda x: fixAge(x), IntegerType())

### Transformations
#### 1. fix name errors
#### 2. interchange age and sal for id=3
#### 3. fix age over 100 = age-100
#### 4. fix sal errors
#### 5. set new schema

In [67]:
df = df.withColumn('fname', lower(df.fname))
df = df.withColumn('lname', lower(df.lname))
df = df.withColumn('lname', regexp_replace('lname', 'halperts', 'halpert'))
df = df.withColumn('sal', regexp_replace('sal', ',', ''))
df = df.withColumn('sal', regexp_replace('sal', '\$', ''))

df.show()

+---+-------+--------+-----+-----+
| id|  fname|   lname|  age|  sal|
+---+-------+--------+-----+-----+
|  1|    jim| halpert|  135|40000|
|  2|    pam|  beesly|   34|36000|
|  3|michael|   scott|60000|   41|
|  4|   toby|flencher|   44|66000|
|  5|  oscar|martinez|   38|42000|
|  6| angela|  martin|   34|36000|
+---+-------+--------+-----+-----+



In [68]:
df = df.withColumn("age", when(df["id"]==3, 41).otherwise(df['age']))
df = df.withColumn("sal", when(df["id"]==3, 60000).otherwise(df['sal']))

df.show()

+---+-------+--------+---+-----+
| id|  fname|   lname|age|  sal|
+---+-------+--------+---+-----+
|  1|    jim| halpert|135|40000|
|  2|    pam|  beesly| 34|36000|
|  3|michael|   scott| 41|60000|
|  4|   toby|flencher| 44|66000|
|  5|  oscar|martinez| 38|42000|
|  6| angela|  martin| 34|36000|
+---+-------+--------+---+-----+



In [69]:
df = df.withColumn('age', fixAgeUDF(df.age)) # comes after age-sal interchange for id=3
df.show()

+---+-------+--------+---+-----+
| id|  fname|   lname|age|  sal|
+---+-------+--------+---+-----+
|  1|    jim| halpert| 35|40000|
|  2|    pam|  beesly| 34|36000|
|  3|michael|   scott| 41|60000|
|  4|   toby|flencher| 44|66000|
|  5|  oscar|martinez| 38|42000|
|  6| angela|  martin| 34|36000|
+---+-------+--------+---+-----+



### Check Schema

In [70]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sal: string (nullable = true)



### Change Datatype

In [71]:
df = df.withColumn('sal', df['sal'].cast("float"))
df.show()

+---+-------+--------+---+-------+
| id|  fname|   lname|age|    sal|
+---+-------+--------+---+-------+
|  1|    jim| halpert| 35|40000.0|
|  2|    pam|  beesly| 34|36000.0|
|  3|michael|   scott| 41|60000.0|
|  4|   toby|flencher| 44|66000.0|
|  5|  oscar|martinez| 38|42000.0|
|  6| angela|  martin| 34|36000.0|
+---+-------+--------+---+-------+



### Analysis
#### 1. Top Paid Employee
#### 2. Oldest Employee
#### 3. 5rd Most Paid Employee
#### 4. Employee Whose Name Begins With "M"
#### 5. Employee Whose First Name is Shorter Than 5 Letters

#### Q1 Solution Method (1/2)

In [74]:
df.withColumn('rank', expr("rank() over (order by sal desc)")).filter('rank = 1').show()

+---+-----+--------+---+-------+----+
| id|fname|   lname|age|    sal|rank|
+---+-----+--------+---+-------+----+
|  4| toby|flencher| 44|66000.0|   1|
+---+-----+--------+---+-------+----+



#### Q1 Solution Method (2/2)

In [75]:
df.withColumn('rank', rank().over(Window.orderBy(col('sal').desc()))).filter('rank=1').show()

+---+-----+--------+---+-------+----+
| id|fname|   lname|age|    sal|rank|
+---+-----+--------+---+-------+----+
|  4| toby|flencher| 44|66000.0|   1|
+---+-----+--------+---+-------+----+



#### Q2 Solution

In [76]:
df.withColumn('rank', rank().over(Window.orderBy(col('age').desc()))).filter('rank = 1').show()

+---+-----+--------+---+-------+----+
| id|fname|   lname|age|    sal|rank|
+---+-----+--------+---+-------+----+
|  4| toby|flencher| 44|66000.0|   1|
+---+-----+--------+---+-------+----+



#### Q3 Solution

In [80]:
df.withColumn('rank', rank().over(Window.orderBy(col('sal').desc()))).filter("rank = 5").show()

+---+------+------+---+-------+----+
| id| fname| lname|age|    sal|rank|
+---+------+------+---+-------+----+
|  2|   pam|beesly| 34|36000.0|   5|
|  6|angela|martin| 34|36000.0|   5|
+---+------+------+---+-------+----+



#### Q4 Solution

In [78]:
df.select('*').filter("lower(fname) like 'm%'").show()

+---+-------+-----+---+-------+
| id|  fname|lname|age|    sal|
+---+-------+-----+---+-------+
|  3|michael|scott| 41|60000.0|
+---+-------+-----+---+-------+



#### Q5 Solution

In [79]:
df.select("*").filter("len(fname) < 5").show()

+---+-----+--------+---+-------+
| id|fname|   lname|age|    sal|
+---+-----+--------+---+-------+
|  1|  jim| halpert| 35|40000.0|
|  2|  pam|  beesly| 34|36000.0|
|  4| toby|flencher| 44|66000.0|
+---+-----+--------+---+-------+

