#  Diving into the basic functions and utilities in pyspark

Assuming you downloaded the pyspark package, to start it you must first create a 'SparkSession'. Just copy the below code to get started. The appName 'test' that I used can be changed to a suitable name for the ETL project.

Note: Your SparkSession is now stored in the variable ```spark```. This variable will be what you call to read data into pyspark dataframes.

In [2]:
#create SparkSession to begin data processing
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

Simply calling ```spark``` below displays the version & specifications of the SparkSession:

In [2]:
spark

<br>

### Load and inspect dataset from csv, using pyspark dataframe
In my directory there is a csv file containing fake employees data, including their ID, Name, Age, Gender, Salary and Spending (in thousands), and their Department specialization (ML- Machine Learning, Fin- Finance, CS- Computer Science). We will load this data into a dataframe 'df_ps' below. 

Setting ```header = True``` allows the dataframe ```df_ps``` to recognize the 'Name', 'Age', etc. as the column name of a column instead of entries like the rows below it. Setting ```inferSchema = True``` allows pyspark to recognize numerical entries, e.g. 'Age' as integers (in this case) instead of string.

The show command displays the full table when the table is small enough. Otherwise it will only display the first 20 rows.

In [4]:
df_ps = spark.read.csv('employees.csv', header = True, inferSchema = True)
df_ps.show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Fin|
|  123|Charlie|  25|     M|   133|      39|  CS|
|  151| Gordon|  26|     M|   142|      35|  CS|
|  152| Violet|  27|     F|   151|      40| Fin|
|  156| Samuel|  26|     M|   111|      35| Fin|
|  119| Rachel|  30|     F|   124|      60|  CS|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45|  CS|
|  213| Shuang|  28|     F|   164|      61|  ML|
|  211|  James|null|     M|  null|      70| Fin|
|  311|   null|  50|  null|   400|     102| Fin|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100|  CS|
|  199|   null|null|  null|   105|    null|  ML|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|

**Remark:** pyspark supports loading data from txt (and any other format) and the method is no different except you specify tab as delimeter via ```option("sep","\t")```. The functionality of pyspark tools is the same regardless of initial format your data is stored in.

In [5]:
df_customer = spark.read.option("sep","\t").csv('customers.txt', header = True, inferSchema = True)
df_customer.show()

+------------+---------+--------+-----+
|CustomerName|ProductID| Product|Price|
+------------+---------+--------+-----+
|        John|     W311|  Wallet|  200|
|      Jordan|     B203|    Belt|  500|
|      Rachel|     B133|     Bag| 1000|
|      Gordon|     B139|     Bag| 1200|
|         Sam|     B155|     Bag| 2300|
|       Emily|     S152|   Shoes|  800|
|       Alice|     W320|  Wallet|  250|
|         Bob|     W320|  Wallet|  250|
|      Daniel|     S300|Suitcase| 1200|
|       Laura|     S305|Suitcase| 1790|
|       Zosso|     W231|   Watch|  420|
|       Evans|     DN18|    null| 2000|
|       Keith|     null|  Burger|   50|
|        null|     P220| Perfume| 1350|
+------------+---------+--------+-----+



<br>

Returning to our smaller dataset df_ps, ```type(...)``` checks the type of object in the brackets. Our dataframe ```df_ps``` is a pyspark dataframe, as expected. Similarly, we can look at the schema of the dataframe (the metadata standard), and the names of each column.

In [6]:
type(df_ps)

pyspark.sql.dataframe.DataFrame

In [7]:
df_ps.printSchema()

root
 |-- empID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Spending: integer (nullable = true)
 |-- Dept: string (nullable = true)



In [8]:
df_ps.columns

['empID', 'Name', 'Age', 'Gender', 'Salary', 'Spending', 'Dept']

We can also check the datatype of the entries in each column via ```dtypes```:

In [9]:
df_ps.dtypes

[('empID', 'int'),
 ('Name', 'string'),
 ('Age', 'int'),
 ('Gender', 'string'),
 ('Salary', 'int'),
 ('Spending', 'int'),
 ('Dept', 'string')]

<br>

Here's two ways to display the first few rows of dataframe (great for huge tables). The first is a better visualization and the second reveals more structure: the dataframe is an array of rows objects.

In [10]:
df_ps.show(5)

+-----+-------+---+------+------+--------+----+
|empID|   Name|Age|Gender|Salary|Spending|Dept|
+-----+-------+---+------+------+--------+----+
|   13|Hatsune| 25|     F|   245|      50|  ML|
|   23|  Alice| 30|     F|   270|      88|  ML|
|  115|    Bob| 23|     M|   120|      46| Fin|
|  123|Charlie| 25|     M|   133|      39|  CS|
|  151| Gordon| 26|     M|   142|      35|  CS|
+-----+-------+---+------+------+--------+----+
only showing top 5 rows



In [11]:
df_ps.head(5)

[Row(empID=13, Name='Hatsune', Age=25, Gender='F', Salary=245, Spending=50, Dept='ML'),
 Row(empID=23, Name='Alice', Age=30, Gender='F', Salary=270, Spending=88, Dept='ML'),
 Row(empID=115, Name='Bob', Age=23, Gender='M', Salary=120, Spending=46, Dept='Fin'),
 Row(empID=123, Name='Charlie', Age=25, Gender='M', Salary=133, Spending=39, Dept='CS'),
 Row(empID=151, Name='Gordon', Age=26, Gender='M', Salary=142, Spending=35, Dept='CS')]

Side note: You can always visualize better after converting to panda dataframe. In fact, it only takes 2 lines to produce a histogram in panda.

<br>

Here, suppose we only care about the department of an employee and their salary (because we want to see if majoring in CS is rewarding enough...). We can extract these columns with the ```select``` function. Note: we store this information in ```df_deptSalaryINDV``` instead of ```df_ps```. As expected, ```df_deptSalaryINDV``` is also a pyspark dataframe.

In [12]:
df_deptSalaryINDV = df_ps.select(['Dept', 'Salary'])
df_deptSalaryINDV.show() 

+----+------+
|Dept|Salary|
+----+------+
|  ML|   245|
|  ML|   270|
| Fin|   120|
|  CS|   133|
|  CS|   142|
| Fin|   151|
| Fin|   111|
|  CS|   124|
|  ML|   151|
|  CS|   217|
|  ML|   164|
| Fin|  null|
| Fin|   400|
|  ML|  1000|
|  CS|  null|
|  ML|   105|
|  ML|  null|
|  CS|   135|
| Fin|   100|
+----+------+



In [13]:
type(df_deptSalaryINDV)

pyspark.sql.dataframe.DataFrame

<br>

A quick way to summarize our data is with the ```describe()``` function. This outputs a new summary table below. Note: categorical variables such as Name, Gender will not have mean and standard dev, and their min/max will just be ordered alphabetically (compared as string types...)

In [14]:
df_ps.describe().show()

+-------+------------------+-----+------------------+------+------------------+------------------+----+
|summary|             empID| Name|               Age|Gender|            Salary|          Spending|Dept|
+-------+------------------+-----+------------------+------+------------------+------------------+----+
|  count|                19|   17|                14|    17|                16|                17|  19|
|   mean|138.68421052631578| null|31.142857142857142|  null|             223.0| 56.23529411764706|null|
| stddev| 83.57302371218634| null| 8.094225319107661|  null|221.51719271123554|22.456985026280535|null|
|    min|                13|Alice|                23|     F|               100|                30|  CS|
|    max|               311|Zosso|                50|     M|              1000|               102|  ML|
+-------+------------------+-----+------------------+------+------------------+------------------+----+



<br>

### Modifying our dataframe

Suppose we want to add a column of salary projection of employees in 5 years, given by x1.2 of their current ones. We showcase this below with the ```withColumn``` function. Note that here we are replacing our dataframe ```df_ps``` by a new one with the added column Proj5year.

In [15]:
df_ps = df_ps.withColumn("Proj5year", df_ps['Salary']*1.2)
df_ps.show()

+-----+-------+----+------+------+--------+----+------------------+
|empID|   Name| Age|Gender|Salary|Spending|Dept|         Proj5year|
+-----+-------+----+------+------+--------+----+------------------+
|   13|Hatsune|  25|     F|   245|      50|  ML|             294.0|
|   23|  Alice|  30|     F|   270|      88|  ML|             324.0|
|  115|    Bob|  23|     M|   120|      46| Fin|             144.0|
|  123|Charlie|  25|     M|   133|      39|  CS|             159.6|
|  151| Gordon|  26|     M|   142|      35|  CS|             170.4|
|  152| Violet|  27|     F|   151|      40| Fin|             181.2|
|  156| Samuel|  26|     M|   111|      35| Fin|             133.2|
|  119| Rachel|  30|     F|   124|      60|  CS|148.79999999999998|
|  157| Daniel|  31|     M|   151|      62|  ML|             181.2|
|  123|  Glenn|  40|     M|   217|      45|  CS|             260.4|
|  213| Shuang|  28|     F|   164|      61|  ML|196.79999999999998|
|  211|  James|null|     M|  null|      70| Fin|

After adding new column we realize that the Proj5year column has decimal values. Suppose we want integers instead, then simply use ```withColumn``` function again, this time replacing the column with itself, but with values casted to integers. 


In [16]:
df_ps = df_ps.withColumn("Proj5year", df_ps['Proj5year'].cast('integer'))
df_ps.show()

+-----+-------+----+------+------+--------+----+---------+
|empID|   Name| Age|Gender|Salary|Spending|Dept|Proj5year|
+-----+-------+----+------+------+--------+----+---------+
|   13|Hatsune|  25|     F|   245|      50|  ML|      294|
|   23|  Alice|  30|     F|   270|      88|  ML|      324|
|  115|    Bob|  23|     M|   120|      46| Fin|      144|
|  123|Charlie|  25|     M|   133|      39|  CS|      159|
|  151| Gordon|  26|     M|   142|      35|  CS|      170|
|  152| Violet|  27|     F|   151|      40| Fin|      181|
|  156| Samuel|  26|     M|   111|      35| Fin|      133|
|  119| Rachel|  30|     F|   124|      60|  CS|      148|
|  157| Daniel|  31|     M|   151|      62|  ML|      181|
|  123|  Glenn|  40|     M|   217|      45|  CS|      260|
|  213| Shuang|  28|     F|   164|      61|  ML|      196|
|  211|  James|null|     M|  null|      70| Fin|     null|
|  311|   null|  50|  null|   400|     102| Fin|      480|
|   67| Andrew|  45|     M|  1000|    null|  ML|     120

We can drop columns too, and below with ```drop``` we removed all columns that are non-numerical

In [17]:
df_ps.drop('Name', 'Gender', 'Dept').show()

+-----+----+------+--------+---------+
|empID| Age|Salary|Spending|Proj5year|
+-----+----+------+--------+---------+
|   13|  25|   245|      50|      294|
|   23|  30|   270|      88|      324|
|  115|  23|   120|      46|      144|
|  123|  25|   133|      39|      159|
|  151|  26|   142|      35|      170|
|  152|  27|   151|      40|      181|
|  156|  26|   111|      35|      133|
|  119|  30|   124|      60|      148|
|  157|  31|   151|      62|      181|
|  123|  40|   217|      45|      260|
|  213|  28|   164|      61|      196|
|  211|null|  null|      70|     null|
|  311|  50|   400|     102|      480|
|   67|  45|  1000|    null|     1200|
|  143|null|  null|     100|     null|
|  199|null|   105|    null|      126|
|  289|null|  null|      56|     null|
|   55|null|   135|      30|      162|
|   15|  30|   100|      37|      120|
+-----+----+------+--------+---------+



Similarly, we can rename a column, e.g. Name, with the ```withColumnRenamed``` function.

In [18]:
df_ps.withColumnRenamed('Name', 'RENAMED').show()

+-----+-------+----+------+------+--------+----+---------+
|empID|RENAMED| Age|Gender|Salary|Spending|Dept|Proj5year|
+-----+-------+----+------+------+--------+----+---------+
|   13|Hatsune|  25|     F|   245|      50|  ML|      294|
|   23|  Alice|  30|     F|   270|      88|  ML|      324|
|  115|    Bob|  23|     M|   120|      46| Fin|      144|
|  123|Charlie|  25|     M|   133|      39|  CS|      159|
|  151| Gordon|  26|     M|   142|      35|  CS|      170|
|  152| Violet|  27|     F|   151|      40| Fin|      181|
|  156| Samuel|  26|     M|   111|      35| Fin|      133|
|  119| Rachel|  30|     F|   124|      60|  CS|      148|
|  157| Daniel|  31|     M|   151|      62|  ML|      181|
|  123|  Glenn|  40|     M|   217|      45|  CS|      260|
|  213| Shuang|  28|     F|   164|      61|  ML|      196|
|  211|  James|null|     M|  null|      70| Fin|     null|
|  311|   null|  50|  null|   400|     102| Fin|      480|
|   67| Andrew|  45|     M|  1000|    null|  ML|     120

### Dealing with null values

We return to the original employees dataset with Proj5year not present. This table can be used as reference to compare with the subsequent ones.


In [19]:
df_ps = spark.read.csv('employees.csv', header = True, inferSchema = True)
df_ps.show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Fin|
|  123|Charlie|  25|     M|   133|      39|  CS|
|  151| Gordon|  26|     M|   142|      35|  CS|
|  152| Violet|  27|     F|   151|      40| Fin|
|  156| Samuel|  26|     M|   111|      35| Fin|
|  119| Rachel|  30|     F|   124|      60|  CS|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45|  CS|
|  213| Shuang|  28|     F|   164|      61|  ML|
|  211|  James|null|     M|  null|      70| Fin|
|  311|   null|  50|  null|   400|     102| Fin|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100|  CS|
|  199|   null|null|  null|   105|    null|  ML|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|

Lets first inspect how many null values are in each column of dataframe. There is 2 ways to do this. First, we can pass the dataframe to panda via df_ps.toPandas() and then use ```isnull().sum()```. This can be slow for large datasets...

In [20]:
import pandas
df_ps.toPandas().isnull().sum()

empID       0
Name        2
Age         5
Gender      2
Salary      3
Spending    2
Dept        0
dtype: int64

Second way is more standard sql. We use ```when``` clause which is analogous to sql ```where```. Here we use the strategy of selecting the counts of each column after ```when``` converts nulls to non-nulls (here to ```1```) and vice versa in the columns.

In [21]:
from pyspark.sql.functions import isnull, count, when

df_ps.select([count(when(isnull(c), 1)).alias(c) for c in df_ps.columns]).show()

+-----+----+---+------+------+--------+----+
|empID|Name|Age|Gender|Salary|Spending|Dept|
+-----+----+---+------+------+--------+----+
|    0|   2|  5|     2|     3|       2|   0|
+-----+----+---+------+------+--------+----+



The simplest form of drop is to drop rows with ```any``` instance of null values (by default ```how = 'any'```). If we specify ```how = 'all'```, then we only delete rows with all null values. Note: ```dropna()``` is shorthand for ```na.drop()```, and they both work.

In [22]:
df_ps.dropna().show()

+-----+-------+---+------+------+--------+----+
|empID|   Name|Age|Gender|Salary|Spending|Dept|
+-----+-------+---+------+------+--------+----+
|   13|Hatsune| 25|     F|   245|      50|  ML|
|   23|  Alice| 30|     F|   270|      88|  ML|
|  115|    Bob| 23|     M|   120|      46| Fin|
|  123|Charlie| 25|     M|   133|      39|  CS|
|  151| Gordon| 26|     M|   142|      35|  CS|
|  152| Violet| 27|     F|   151|      40| Fin|
|  156| Samuel| 26|     M|   111|      35| Fin|
|  119| Rachel| 30|     F|   124|      60|  CS|
|  157| Daniel| 31|     M|   151|      62|  ML|
|  123|  Glenn| 40|     M|   217|      45|  CS|
|  213| Shuang| 28|     F|   164|      61|  ML|
|   15|  Laura| 30|     F|   100|      37| Fin|
+-----+-------+---+------+------+--------+----+



The next parameter is ```thresh```. When specified, a row will be deleted whenever (num of non-null values in row) < thresh. As shown below, you should compare with the original dataset [82] to check that this is expected

In [23]:
df_ps.dropna(thresh = 5).show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Fin|
|  123|Charlie|  25|     M|   133|      39|  CS|
|  151| Gordon|  26|     M|   142|      35|  CS|
|  152| Violet|  27|     F|   151|      40| Fin|
|  156| Samuel|  26|     M|   111|      35| Fin|
|  119| Rachel|  30|     F|   124|      60|  CS|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45|  CS|
|  213| Shuang|  28|     F|   164|      61|  ML|
|  211|  James|null|     M|  null|      70| Fin|
|  311|   null|  50|  null|   400|     102| Fin|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100|  CS|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|     M|   135|      30|  CS|
|   15|  Laura|  30|

You can also remove null values in specific columns by specifying ```subset```. Below we only removed them for Name and Salary.

In [24]:
df_ps.dropna(subset = ['Name', 'Salary']).show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Fin|
|  123|Charlie|  25|     M|   133|      39|  CS|
|  151| Gordon|  26|     M|   142|      35|  CS|
|  152| Violet|  27|     F|   151|      40| Fin|
|  156| Samuel|  26|     M|   111|      35| Fin|
|  119| Rachel|  30|     F|   124|      60|  CS|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45|  CS|
|  213| Shuang|  28|     F|   164|      61|  ML|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|   55|  Keith|null|     M|   135|      30|  CS|
|   15|  Laura|  30|     F|   100|      37| Fin|
+-----+-------+----+------+------+--------+----+



Instead of dropping, we can also fill the null values. We also do multiple instance of fill, via dictionary as shown.

What the dictionary below does is replace all null in Name with Steve, and all null  in Gender with M.

In [25]:
df_ps.fillna({'Name': 'Steve', 'Gender': 'M'}).show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Fin|
|  123|Charlie|  25|     M|   133|      39|  CS|
|  151| Gordon|  26|     M|   142|      35|  CS|
|  152| Violet|  27|     F|   151|      40| Fin|
|  156| Samuel|  26|     M|   111|      35| Fin|
|  119| Rachel|  30|     F|   124|      60|  CS|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45|  CS|
|  213| Shuang|  28|     F|   164|      61|  ML|
|  211|  James|null|     M|  null|      70| Fin|
|  311|  Steve|  50|     M|   400|     102| Fin|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100|  CS|
|  199|  Steve|null|     M|   105|    null|  ML|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|

```Imputer``` can also be used to replace null values, and below we select the columns to impute, declare the strategy as ```median``` (which replaces all null values with the median of non-nulls), and proceed to transform our dataframe according to this strategy. Another commonly used strategy is ```mean```.

In [26]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['Age', 'Salary', 'Spending'], outputCols=['Age', 'Salary', 'Spending'])
imputer.setStrategy('median')

model = imputer.fit(df_ps)
model.transform(df_ps).show()

+-----+-------+---+------+------+--------+----+
|empID|   Name|Age|Gender|Salary|Spending|Dept|
+-----+-------+---+------+------+--------+----+
|   13|Hatsune| 25|     F|   245|      50|  ML|
|   23|  Alice| 30|     F|   270|      88|  ML|
|  115|    Bob| 23|     M|   120|      46| Fin|
|  123|Charlie| 25|     M|   133|      39|  CS|
|  151| Gordon| 26|     M|   142|      35|  CS|
|  152| Violet| 27|     F|   151|      40| Fin|
|  156| Samuel| 26|     M|   111|      35| Fin|
|  119| Rachel| 30|     F|   124|      60|  CS|
|  157| Daniel| 31|     M|   151|      62|  ML|
|  123|  Glenn| 40|     M|   217|      45|  CS|
|  213| Shuang| 28|     F|   164|      61|  ML|
|  211|  James| 28|     M|   142|      70| Fin|
|  311|   null| 50|  null|   400|     102| Fin|
|   67| Andrew| 45|     M|  1000|      50|  ML|
|  143|  Stacy| 28|     F|   142|     100|  CS|
|  199|   null| 28|  null|   105|      50|  ML|
|  289|  Zosso| 28|     M|   142|      56|  ML|
|   55|  Keith| 28|     M|   135|      3


### Filtering dataframe by conditions/clause

The ```filter()``` function only takes in one parameter, which can be any set of conditions which it will filter by. The example below should prove enough to explain the syntax of the function, which you can generalize.
Note: The "not" logical operator is ~, which I did not use

Suppose we only want to know the name of all male, not older than 30, with salary less than 150K or more than 800K. We also want the department they belong to, but we don't care *exactly* how much they earn. We can ```filter``` the dataframe then ```select``` the relevant columns, as follows:

In [27]:
df_ps.filter((df_ps['Gender'] == 'M') & ((df_ps['Salary'] < 150) | (df_ps['Salary'] > 800))).select(['Name', 'Dept']).show()

+-------+----+
|   Name|Dept|
+-------+----+
|    Bob| Fin|
|Charlie|  CS|
| Gordon|  CS|
| Samuel| Fin|
| Andrew|  ML|
|  Keith|  CS|
+-------+----+




### Various methods of replacements

The easiest method we will look at is a direct replace via ```replace()``` function. For example, you want to replace all instances of CS with SWE and Fin with Bus. You can do what is shown below:

In [28]:
df_ps.replace(['CS', 'Fin'], ['SWE', 'Bus']).show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Bus|
|  123|Charlie|  25|     M|   133|      39| SWE|
|  151| Gordon|  26|     M|   142|      35| SWE|
|  152| Violet|  27|     F|   151|      40| Bus|
|  156| Samuel|  26|     M|   111|      35| Bus|
|  119| Rachel|  30|     F|   124|      60| SWE|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45| SWE|
|  213| Shuang|  28|     F|   164|      61|  ML|
|  211|  James|null|     M|  null|      70| Bus|
|  311|   null|  50|  null|   400|     102| Bus|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100| SWE|
|  199|   null|null|  null|   105|    null|  ML|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|

```replace``` also has a third parameter called subset to narrow down to the columns which you want to consider replacements. It has not only practical use, but it is good practice to do this since it takes a very long time to scan a large file; scanning a column can save significant time.

In [29]:
#same output as above replace(), since CS, Fin is uniquely present in Dept
df_ps.replace(['CS', 'Fin'], ['SWE', 'Bus'], ['Dept']).show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   245|      50|  ML|
|   23|  Alice|  30|     F|   270|      88|  ML|
|  115|    Bob|  23|     M|   120|      46| Bus|
|  123|Charlie|  25|     M|   133|      39| SWE|
|  151| Gordon|  26|     M|   142|      35| SWE|
|  152| Violet|  27|     F|   151|      40| Bus|
|  156| Samuel|  26|     M|   111|      35| Bus|
|  119| Rachel|  30|     F|   124|      60| SWE|
|  157| Daniel|  31|     M|   151|      62|  ML|
|  123|  Glenn|  40|     M|   217|      45| SWE|
|  213| Shuang|  28|     F|   164|      61|  ML|
|  211|  James|null|     M|  null|      70| Bus|
|  311|   null|  50|  null|   400|     102| Bus|
|   67| Andrew|  45|     M|  1000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100| SWE|
|  199|   null|null|  null|   105|    null|  ML|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|

#### Replacement given more types of conditions/clauses

If we want to do something more complex, like replace salary by 0 if it's less than 200K and the dept is ML, and double salary otherwise, then we can do this via a combination of ```withColumn``` for replacement of a column and ```when``` function to specify 1st the clause, and 2nd what to do if clause is satisfied, and ```otherwise``` to specify what to do if clause not satisfied.

Note: If you want to keep the values which fails the clause non-null, you must specify what happens via ```.otherwise```!

The syntax looks like the following:

In [31]:
df_ps.withColumn("Salary", when((df_ps['Dept'] == 'ML') & (df_ps['Salary'] < 200), 0).otherwise(df_ps['Salary']*2)).show()

+-----+-------+----+------+------+--------+----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|
+-----+-------+----+------+------+--------+----+
|   13|Hatsune|  25|     F|   490|      50|  ML|
|   23|  Alice|  30|     F|   540|      88|  ML|
|  115|    Bob|  23|     M|   240|      46| Fin|
|  123|Charlie|  25|     M|   266|      39|  CS|
|  151| Gordon|  26|     M|   284|      35|  CS|
|  152| Violet|  27|     F|   302|      40| Fin|
|  156| Samuel|  26|     M|   222|      35| Fin|
|  119| Rachel|  30|     F|   248|      60|  CS|
|  157| Daniel|  31|     M|     0|      62|  ML|
|  123|  Glenn|  40|     M|   434|      45|  CS|
|  213| Shuang|  28|     F|     0|      61|  ML|
|  211|  James|null|     M|  null|      70| Fin|
|  311|   null|  50|  null|   800|     102| Fin|
|   67| Andrew|  45|     M|  2000|    null|  ML|
|  143|  Stacy|null|     F|  null|     100|  CS|
|  199|   null|null|  null|     0|    null|  ML|
|  289|  Zosso|null|     M|  null|      56|  ML|
|   55|  Keith|null|

<br>

### Using Groupby to group data by categories to provide summaries
Below we try to draw some conclusion from grouping data by the Dept, Gender, and Age categories, using ```groupBy``` function. This should be intuitive.

In [32]:
df_ps.groupBy('Dept').avg().show()

+----+------------------+--------+-----------+-------------+
|Dept|        avg(empID)|avg(Age)|avg(Salary)|avg(Spending)|
+----+------------------+--------+-----------+-------------+
| Fin|             160.0|    31.2|      176.4|         55.0|
|  CS|             119.0|   30.25|      150.2|         51.5|
|  ML|137.28571428571428|    31.8|      322.5|         63.4|
+----+------------------+--------+-----------+-------------+



In [35]:
df_ps.groupBy('Gender').sum().select(['Gender', 'sum(Spending)']).show()

+------+-------------+
|Gender|sum(Spending)|
+------+-------------+
|     F|          436|
|  null|          102|
|     M|          418|
+------+-------------+



In [36]:
df_ps.groupBy('Dept').count().show()

+----+-----+
|Dept|count|
+----+-----+
| Fin|    6|
|  CS|    6|
|  ML|    7|
+----+-----+



You can also groupby multiple columns. Below is one possible practical use case:

In [37]:
df_ps.dropna(subset = ['Gender']).drop('empID').groupBy('Dept', 'Gender').avg().show()

+----+------+------------------+------------------+------------------+
|Dept|Gender|          avg(Age)|       avg(Salary)|     avg(Spending)|
+----+------+------------------+------------------+------------------+
|  ML|     F|27.666666666666668|226.33333333333334| 66.33333333333333|
| Fin|     M|              24.5|             115.5|50.333333333333336|
| Fin|     F|              28.5|             125.5|              38.5|
|  CS|     F|              30.0|             124.0|              80.0|
|  CS|     M|30.333333333333332|            156.75|             37.25|
|  ML|     M|              38.0|             575.5|              59.0|
+----+------+------------------+------------------+------------------+



<br>

```groupBy``` is not needed to use aggregate functions, such as sum(), count(), etc. Below are examples of using aggregate functions without ```groupBy```.

In [38]:
from pyspark.sql.functions import *

total salary for all employees:

In [39]:
df_ps.select(sum('Salary')).show()

+-----------+
|sum(Salary)|
+-----------+
|       3568|
+-----------+



average age of female employees:

In [40]:
df_ps.filter(df_ps['Gender'] == 'F').select(avg('Age').alias('female_Avg_Age')).show()

+------------------+
|    female_Avg_Age|
+------------------+
|28.333333333333332|
+------------------+



total non-null entries in age columns:

In [43]:
df_ps.select(count('Age')).show()

+----------+
|count(Age)|
+----------+
|        14|
+----------+



<br>

### Joining: Outer/Left/Right/Inner Join

First, let's examine the 2 fake datasets below we've seen before again. The first is customer purchase history at some luxury brand, and second is employee data at company ABC. For ease of comparison, we remove all rows with any null values for both tables, first.

In [68]:
df_employee = spark.read.csv('employees.csv', header = True, inferSchema = True).dropna()
df_customer = spark.read.option("sep","\t").csv('customers.txt', header = True, inferSchema = True).dropna()

In [69]:
#data extracted from customers.txt, with null value removed
df_customer.show()

+------------+---------+--------+-----+
|CustomerName|ProductID| Product|Price|
+------------+---------+--------+-----+
|        John|     W311|  Wallet|  200|
|      Jordan|     B203|    Belt|  500|
|      Rachel|     B133|     Bag| 1000|
|      Gordon|     B139|     Bag| 1200|
|      Samuel|     B155|     Bag| 2300|
|       Emily|     S152|   Shoes|  800|
|       Alice|     W320|  Wallet|  250|
|         Bob|     W320|  Wallet|  250|
|      Daniel|     S300|Suitcase| 1200|
|       Laura|     S305|Suitcase| 1790|
|       Zosso|     W231|   Watch|  420|
|       Evans|     DN18|    Deez| 2000|
|      Rachel|     DN26|    Deez|10000|
|         Bob|     DN26|    Deez|10000|
+------------+---------+--------+-----+



In [70]:
#data extracted from employees.csv, with null value removed
df_employee.show()

+-----+-------+---+------+------+--------+----+
|empID|   Name|Age|Gender|Salary|Spending|Dept|
+-----+-------+---+------+------+--------+----+
|   13|Hatsune| 25|     F|   245|      50|  ML|
|   23|  Alice| 30|     F|   270|      88|  ML|
|  115|    Bob| 23|     M|   120|      46| Fin|
|  123|Charlie| 25|     M|   133|      39|  CS|
|  151| Gordon| 26|     M|   142|      35|  CS|
|  152| Violet| 27|     F|   151|      40| Fin|
|  156| Samuel| 26|     M|   111|      35| Fin|
|  119| Rachel| 30|     F|   124|      60|  CS|
|  157| Daniel| 31|     M|   151|      62|  ML|
|  123|  Glenn| 40|     M|   217|      45|  CS|
|  213| Shuang| 28|     F|   164|      61|  ML|
|   15|  Laura| 30|     F|   100|      37| Fin|
+-----+-------+---+------+------+--------+----+



To perform any form of join, we specify the column(s) to join on and the join type. Below is an example for each type of join. We will only join on the name for this particular sets of tables, but for any join, the idea is the same.

<br>

As can be seen, an outer join will join the two tables such that if an employee has no purchase records, it will just replace the entries on Customer table with ```null```. Similarly, if a customer has no employee records, its employee data will all be ```null```.

In [71]:
showCols = ['Name', 'Salary', 'CustomerName', 'Product']
df_employee.join(df_customer, df_employee.Name == df_customer.CustomerName, 'outer').show()

+-----+-------+----+------+------+--------+----+------------+---------+--------+-----+
|empID|   Name| Age|Gender|Salary|Spending|Dept|CustomerName|ProductID| Product|Price|
+-----+-------+----+------+------+--------+----+------------+---------+--------+-----+
| null|   null|null|  null|  null|    null|null|      Jordan|     B203|    Belt|  500|
|  213| Shuang|  28|     F|   164|      61|  ML|        null|     null|    null| null|
|  123|Charlie|  25|     M|   133|      39|  CS|        null|     null|    null| null|
|   13|Hatsune|  25|     F|   245|      50|  ML|        null|     null|    null| null|
|  115|    Bob|  23|     M|   120|      46| Fin|         Bob|     W320|  Wallet|  250|
|  115|    Bob|  23|     M|   120|      46| Fin|         Bob|     DN26|    Deez|10000|
| null|   null|null|  null|  null|    null|null|        John|     W311|  Wallet|  200|
|  156| Samuel|  26|     M|   111|      35| Fin|      Samuel|     B155|     Bag| 2300|
| null|   null|null|  null|  null|    null|

Next is inner join. As seen below, only the people who are both employees and customer (from the tables shown above) show up. Those who are only employees or only customers are removed from the joined table.

In [72]:
df_employee.join(df_customer, df_employee.Name == df_customer.CustomerName, 'inner').show()

+-----+------+---+------+------+--------+----+------------+---------+--------+-----+
|empID|  Name|Age|Gender|Salary|Spending|Dept|CustomerName|ProductID| Product|Price|
+-----+------+---+------+------+--------+----+------------+---------+--------+-----+
|   23| Alice| 30|     F|   270|      88|  ML|       Alice|     W320|  Wallet|  250|
|  115|   Bob| 23|     M|   120|      46| Fin|         Bob|     DN26|    Deez|10000|
|  115|   Bob| 23|     M|   120|      46| Fin|         Bob|     W320|  Wallet|  250|
|  151|Gordon| 26|     M|   142|      35|  CS|      Gordon|     B139|     Bag| 1200|
|  156|Samuel| 26|     M|   111|      35| Fin|      Samuel|     B155|     Bag| 2300|
|  119|Rachel| 30|     F|   124|      60|  CS|      Rachel|     DN26|    Deez|10000|
|  119|Rachel| 30|     F|   124|      60|  CS|      Rachel|     B133|     Bag| 1000|
|  157|Daniel| 31|     M|   151|      62|  ML|      Daniel|     S300|Suitcase| 1200|
|   15| Laura| 30|     F|   100|      37| Fin|       Laura|     S

A left join as seen below keeps the employees in the table even if they are not customers. Their customer details will just be padded with null. However, it will remove the customers who are not employees from the table.

In [73]:
df_employee.join(df_customer, df_employee.Name == df_customer.CustomerName, 'left').show()

+-----+-------+---+------+------+--------+----+------------+---------+--------+-----+
|empID|   Name|Age|Gender|Salary|Spending|Dept|CustomerName|ProductID| Product|Price|
+-----+-------+---+------+------+--------+----+------------+---------+--------+-----+
|   13|Hatsune| 25|     F|   245|      50|  ML|        null|     null|    null| null|
|   23|  Alice| 30|     F|   270|      88|  ML|       Alice|     W320|  Wallet|  250|
|  115|    Bob| 23|     M|   120|      46| Fin|         Bob|     DN26|    Deez|10000|
|  115|    Bob| 23|     M|   120|      46| Fin|         Bob|     W320|  Wallet|  250|
|  123|Charlie| 25|     M|   133|      39|  CS|        null|     null|    null| null|
|  151| Gordon| 26|     M|   142|      35|  CS|      Gordon|     B139|     Bag| 1200|
|  152| Violet| 27|     F|   151|      40| Fin|        null|     null|    null| null|
|  156| Samuel| 26|     M|   111|      35| Fin|      Samuel|     B155|     Bag| 2300|
|  119| Rachel| 30|     F|   124|      60|  CS|      R

A right join just works the opposite way as the left join, i.e. it keeps all customers but removes employee data if employee is not also a customer.

In [74]:
df_employee.join(df_customer, df_employee.Name == df_customer.CustomerName, 'right').show()

+-----+------+----+------+------+--------+----+------------+---------+--------+-----+
|empID|  Name| Age|Gender|Salary|Spending|Dept|CustomerName|ProductID| Product|Price|
+-----+------+----+------+------+--------+----+------------+---------+--------+-----+
| null|  null|null|  null|  null|    null|null|        John|     W311|  Wallet|  200|
| null|  null|null|  null|  null|    null|null|      Jordan|     B203|    Belt|  500|
|  119|Rachel|  30|     F|   124|      60|  CS|      Rachel|     B133|     Bag| 1000|
|  151|Gordon|  26|     M|   142|      35|  CS|      Gordon|     B139|     Bag| 1200|
|  156|Samuel|  26|     M|   111|      35| Fin|      Samuel|     B155|     Bag| 2300|
| null|  null|null|  null|  null|    null|null|       Emily|     S152|   Shoes|  800|
|   23| Alice|  30|     F|   270|      88|  ML|       Alice|     W320|  Wallet|  250|
|  115|   Bob|  23|     M|   120|      46| Fin|         Bob|     W320|  Wallet|  250|
|  157|Daniel|  31|     M|   151|      62|  ML|      D

<br>

### Other possibly useful features

First, let's take a dataset like the one below:

In [5]:
df_labels = spark.read.csv('labels.csv', inferSchema = 'True')
df_labels.show()

+---+---+
|_c0|_c1|
+---+---+
|  1|  0|
|  1|  1|
|  1|  2|
|  2|  1|
|  0|  1|
|  1|  2|
|  2|  2|
|  1|  1|
|  1|  0|
|  1|  0|
|  0|  0|
|  1|  1|
|  2|  0|
|  0|  2|
|  0|  2|
|  0|  1|
|  0|  0|
+---+---+



We can count the number of matching occurences in the two columns via a count combined with conditional, as below.

Note: Of course, if you join the tables in the other way, i.e. join employees to customer table, then left/right join will produce the opposite effect to above.

In [58]:
#import to html
import os

os.system('jupyter nbconvert --to html Basic_Data_Manip.ipynb')

0