# Chapter 3 - Spark Structured APIs 


**In This notebook we will practice Spark using Python (PySpark) including all code examples from the book**



In [0]:
# loading CSV data, Option 1
df = spark.read.csv("/Volumes/workspace/default/data/BigMart Sales.csv")

In [0]:
# read data, optiion 2
df = spark.read.format("csv").load("/Volumes/workspace/default/data/BigMart Sales.csv")

In [0]:
# if we want preview data of this dataframe, we can use show() method or display() method or limit() method
df.limit(10).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11
0,Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales
1,FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
2,DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
3,FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
4,FDX07,19.2,Regular,0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
5,NCD19,8.93,Low Fat,0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052
6,FDP36,10.395,Regular,0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088
7,FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528
8,FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636
9,FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986


In [0]:
# To let spark to get column names from first record we need to add an option to read method, to add option we use the syntax option(option name, option value) 
df1 = spark.read.csv("/Volumes/workspace/default/data/BigMart Sales.csv", header=True)


In [0]:
df1.limit(10).display()

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales
FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052
FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636
FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986
FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535


In [0]:
# To check datatypes, we use a method called printSchema() to get column names and datatype for each column
df1.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)




as we see spark assigns a default data type for all the columns as string, to assign column data type we 
have two options first is to let Spark suggest the data type based on data available in the DataFrame or 
second option is to explicitly define datatype for each column. To let Spark, suggest columns data types 
we use the option **InferSchema**    


In [0]:
df1 = spark.read.csv("/Volumes/workspace/default/data/BigMart Sales.csv", header=True , inferSchema=True)

In [0]:
# now let's check columns data type
df1.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



In [0]:
# also we can define the schema manually 
# option 1 
schema = '''
    Item_Identifier string,
    Item_Weight double,
    Item_Fat_Content string,
    Item_Visibility double,
    Item_Type string,
    Item_MRP double,
    Outlet_Identifier string,
    Outlet_Establishment_Year integer,
    Outlet_Size string,
    Outlet_Location_Type string,
    Outlet_Type string,
    Item_Outlet_Sales double

'''

df1 = spark.read.csv("/Volumes/workspace/default/data/BigMart Sales.csv", header= True , schema=schema)

In [0]:
df1.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



In [0]:
from pyspark.sql.types import *

In [0]:
# option 2


schema = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', DoubleType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_Visibility', DoubleType(), True),
    StructField('Item_Type', StringType(), True),
    StructField('Item_MRP', DoubleType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', IntegerType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Outlet_Location_Type', StringType(), True),
    StructField('Outlet_Type', StringType(), True),
    StructField('Item_Outlet_Sales', DoubleType(), True)

])

df1 = spark.read.csv("/Volumes/workspace/default/data/BigMart Sales.csv", header= True , schema=schema)

In [0]:
df1.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



## Data Manipulation 
After we defined DataFrame let’s see how we can manipulate data in a DataFrame, in this section we 
will work with Employees Data from Chicago Open Data Portal

In [0]:
# defining the schema of the dataset
schema = StructType([
    StructField("NAME",StringType(),True), 
    StructField("JOB_TITLE",StringType(),True), 
    StructField("DEPARTMENET",StringType(),True), 
    StructField("FULL_PART_TIME",StringType(),True), 
    StructField("MONTHLY_HOURLY",StringType(),True), 
    StructField("TYPICAL_HOURS",IntegerType(),True), 
    StructField("ANNUAL_SALARY",DoubleType(),True), 
    StructField("HOURLY_RATE",DoubleType(),True) 
])

In [0]:
emp_data = spark.read.csv("/Volumes/workspace/default/data/Employee_Data.csv", header=True, schema=schema)


### Discover DataFrame

#### 1- SHOW method 

To display DataFrame data we need to use Show() method, we have different options in show method 
as following: 

- show(truncate:boolean) -> Truncate Boolean value to truncate any string 
values that over than 20 characters 
- show(number of rows) -> By default when we call show() method it will 
display only 20 records from the DataFrame, we 
can override that by specifying number of 
returned records 


#### 2- First & Head methods
To return the first record of the DataFrame, use the method first() or head(), result will be of type Row 


In [0]:
emp_data.head()

Row(NAME='AARON,  JEFFERY M', JOB_TITLE='SERGEANT', DEPARTMENET='POLICE', FULL_PART_TIME='F', MONTHLY_HOURLY='Salary', TYPICAL_HOURS=None, ANNUAL_SALARY=101442.0, HOURLY_RATE=None)

In [0]:
# we can use indexing to acces data inside the row
emp_data.head()[0]

'AARON,  JEFFERY M'

#### 3- Select columns
In this example we need to SELECT data for Employee Name, Department, and Job Title, we can use 
select() method with column names mentioned explicitly as input to this method 

In [0]:
emp_data.select('NAME','DEPARTMENET','JOB_TITLE').show(5,False)

+-------------------+----------------+--------------------------------------+
|NAME               |DEPARTMENET     |JOB_TITLE                             |
+-------------------+----------------+--------------------------------------+
|AARON,  JEFFERY M  |POLICE          |SERGEANT                              |
|AARON,  KARINA     |POLICE          |POLICE OFFICER (ASSIGNED AS DETECTIVE)|
|AARON,  KIMBERLEI R|GENERAL SERVICES|CHIEF CONTRACT EXPEDITER              |
|ABAD JR,  VICENTE M|WATER MGMNT     |CIVIL ENGINEER IV                     |
|ABARCA,  EMMANUEL  |TRANSPORTN      |CONCRETE LABORER                      |
+-------------------+----------------+--------------------------------------+
only showing top 5 rows


In [0]:
# we can select column names with col function
from pyspark.sql.functions import *
emp_data.select(col('NAME'),col('DEPARTMENET'),col('JOB_TITLE')).show(5,False)

+-------------------+----------------+--------------------------------------+
|NAME               |DEPARTMENET     |JOB_TITLE                             |
+-------------------+----------------+--------------------------------------+
|AARON,  JEFFERY M  |POLICE          |SERGEANT                              |
|AARON,  KARINA     |POLICE          |POLICE OFFICER (ASSIGNED AS DETECTIVE)|
|AARON,  KIMBERLEI R|GENERAL SERVICES|CHIEF CONTRACT EXPEDITER              |
|ABAD JR,  VICENTE M|WATER MGMNT     |CIVIL ENGINEER IV                     |
|ABARCA,  EMMANUEL  |TRANSPORTN      |CONCRETE LABORER                      |
+-------------------+----------------+--------------------------------------+
only showing top 5 rows


In [0]:
# selectExpr() similar to SQL
emp_data.selectExpr('NAME as Full_Name','DEPARTMENET as Dept','JOB_TITLE').show(5,False)

+-------------------+----------------+--------------------------------------+
|Full_Name          |Dept            |JOB_TITLE                             |
+-------------------+----------------+--------------------------------------+
|AARON,  JEFFERY M  |POLICE          |SERGEANT                              |
|AARON,  KARINA     |POLICE          |POLICE OFFICER (ASSIGNED AS DETECTIVE)|
|AARON,  KIMBERLEI R|GENERAL SERVICES|CHIEF CONTRACT EXPEDITER              |
|ABAD JR,  VICENTE M|WATER MGMNT     |CIVIL ENGINEER IV                     |
|ABARCA,  EMMANUEL  |TRANSPORTN      |CONCRETE LABORER                      |
+-------------------+----------------+--------------------------------------+
only showing top 5 rows


#### Exercise

write application to generate report with the following:
  - Employee name
  - monthly salary (annual over 30 days)
  - hourly Salary (hourly rate * tybical hours)

In [0]:
emp_data.printSchema()

root
 |-- NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- DEPARTMENET: string (nullable = true)
 |-- FULL_PART_TIME: string (nullable = true)
 |-- MONTHLY_HOURLY: string (nullable = true)
 |-- TYPICAL_HOURS: integer (nullable = true)
 |-- ANNUAL_SALARY: double (nullable = true)
 |-- HOURLY_RATE: double (nullable = true)



In [0]:
emp_data.selectExpr("NAME as Full_name","ANNUAL_SALARY / 30.0 as Monthly_Salary","HOURLY_RATE * TYPICAL_HOURS as Hourly_Salary").show(10,False)

+---------------------+--------------+-------------+
|Full_name            |Monthly_Salary|Hourly_Salary|
+---------------------+--------------+-------------+
|AARON,  JEFFERY M    |3381.4        |NULL         |
|AARON,  KARINA       |3137.4        |NULL         |
|AARON,  KIMBERLEI R  |3700.8        |NULL         |
|ABAD JR,  VICENTE M  |3826.0        |NULL         |
|ABARCA,  EMMANUEL    |NULL          |1748.8       |
|ABARCA,  FRANCES J   |1602.6        |NULL         |
|ABASCAL,  REECE E    |NULL          |397.2        |
|ABBATACOLA,  ROBERT J|NULL          |1974.0       |
|ABBATEMARCO,  JAMES J|3445.0        |NULL         |
|ABBATE,  TERRY M     |3111.8        |NULL         |
+---------------------+--------------+-------------+
only showing top 10 rows


#### Sort DataFrame

In [0]:
emp_data.select(col("NAME"),col("DEPARTMENET"),col("ANNUAL_SALARY")).sort("ANNUAL_SALARY",ascending=False).show(10,False)

+----------------------+--------------+-------------+
|NAME                  |DEPARTMENET   |ANNUAL_SALARY|
+----------------------+--------------+-------------+
|RHEE,  JAMIE L        |AVIATION      |275004.0     |
|JOHNSON,  EDDIE T     |POLICE        |260004.0     |
|LIGHTFOOT,  LORI E    |MAYOR'S OFFICE|216210.0     |
|FORD II,  RICHARD C   |FIRE          |202728.0     |
|NANCE HOLT,  ANNETTE M|FIRE          |197736.0     |
|RICCIO,  ANTHONY J    |POLICE        |197724.0     |
|CLASSEN,  MAURICE A   |MAYOR'S OFFICE|195000.0     |
|BRODERSEN,  ERNEST F  |FIRE          |187680.0     |
|HELMOLD,  BRIAN       |FIRE          |187680.0     |
|SAMPEY,  TIMOTHY T    |FIRE          |187680.0     |
+----------------------+--------------+-------------+
only showing top 10 rows


In [0]:
# or we can use orderBy() function
emp_data.select(col("NAME"),col("DEPARTMENET"),col("ANNUAL_SALARY")).orderBy(col("ANNUAL_SALARY").desc()).show(10,False)

+----------------------+--------------+-------------+
|NAME                  |DEPARTMENET   |ANNUAL_SALARY|
+----------------------+--------------+-------------+
|RHEE,  JAMIE L        |AVIATION      |275004.0     |
|JOHNSON,  EDDIE T     |POLICE        |260004.0     |
|LIGHTFOOT,  LORI E    |MAYOR'S OFFICE|216210.0     |
|FORD II,  RICHARD C   |FIRE          |202728.0     |
|NANCE HOLT,  ANNETTE M|FIRE          |197736.0     |
|RICCIO,  ANTHONY J    |POLICE        |197724.0     |
|CLASSEN,  MAURICE A   |MAYOR'S OFFICE|195000.0     |
|BRODERSEN,  ERNEST F  |FIRE          |187680.0     |
|HELMOLD,  BRIAN       |FIRE          |187680.0     |
|SAMPEY,  TIMOTHY T    |FIRE          |187680.0     |
+----------------------+--------------+-------------+
only showing top 10 rows


#### Distinct Records
To get distinct records out of a DataFrame to check duplications, or to get a distinct values of a column 
you can use distinct() function

In [0]:
emp_data.select(col("JOB_TITLE")).distinct().show(truncate=False)

+--------------------------------------+
|JOB_TITLE                             |
+--------------------------------------+
|FRM OF LABORER-CONCRETE               |
|POLICE OFFICER (PER ARBITRATION AWARD)|
|STORES LABORER                        |
|FIRE ENGINEER-PARAMEDIC               |
|COORD OF STREET PERMITS               |
|MANAGER OF ADMINSTRATIVE ADJUDICATION |
|ASST MANAGER - BD OF ELECTIONS        |
|POLICE LEGAL OFFICER I                |
|PERSONNEL ASSISTANT                   |
|ASST DIR OF NEWS AFFAIRS              |
|PROCUREMENT SPECIALIST                |
|PROJECT COORD                         |
|WATER RATE TAKER                      |
|LABOR RELATIONS SUPVSR                |
|CIVIL ENGINEER IV                     |
|NULL                                  |
|SUPERINTENDENT'S CHIEF OF STAFF       |
|SANITATION LABORER                    |
|FILTRATION ENGINEER IV                |
|FINGERPRINT TECHNICIAN II             |
+--------------------------------------+
only showing top

In [0]:
# or we can use freqItems() function
emp_data.freqItems(['JOB_TITLE']).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### Filter Records
In some cases, we need to filter data based on a certain logic either to reduce number of records or to 
get specific records of interest. To filter a DataFrame based on a condition or a set of conditions we 
need to use filter()

In [0]:
# filtering data to get health dept.
emp_data.filter(col("DEPARTMENET") == "HEALTH").show(truncate=False)

+---------------------------+-----------------------------------------------+-----------+--------------+--------------+-------------+-------------+-----------+
|NAME                       |JOB_TITLE                                      |DEPARTMENET|FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+---------------------------+-----------------------------------------------+-----------+--------------+--------------+-------------+-------------+-----------+
|ABREU,  ROSITA             |PERSONAL COMPUTER OPERATOR III                 |HEALTH     |F             |Salary        |NULL         |73104.0      |NULL       |
|ABUBAKER,  MOHAMMED G      |SENIOR ENVIRONMENTAL INSPECTOR                 |HEALTH     |F             |Salary        |NULL         |88044.0      |NULL       |
|ACEVEDO,  DAILHA           |COMMUNICABLE DISEASE CONTROL INVESTIGATOR II   |HEALTH     |F             |Salary        |NULL         |76584.0      |NULL       |
|ADAMS THOMAS,  CHEEAREASE R|PUBLIC HEAL

In [0]:
# we can make more than one condition.
# option 1
emp_data.filter( (col('DEPARTMENET')=="HEALTH")  ).filter((col('MONTHLY_HOURLY')=="Salary")).show(truncate=False)

+---------------------------+-----------------------------------------------+-----------+--------------+--------------+-------------+-------------+-----------+
|NAME                       |JOB_TITLE                                      |DEPARTMENET|FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+---------------------------+-----------------------------------------------+-----------+--------------+--------------+-------------+-------------+-----------+
|ABREU,  ROSITA             |PERSONAL COMPUTER OPERATOR III                 |HEALTH     |F             |Salary        |NULL         |73104.0      |NULL       |
|ABUBAKER,  MOHAMMED G      |SENIOR ENVIRONMENTAL INSPECTOR                 |HEALTH     |F             |Salary        |NULL         |88044.0      |NULL       |
|ACEVEDO,  DAILHA           |COMMUNICABLE DISEASE CONTROL INVESTIGATOR II   |HEALTH     |F             |Salary        |NULL         |76584.0      |NULL       |
|ADAMS THOMAS,  CHEEAREASE R|PUBLIC HEAL

In [0]:
# option 2
# we can expr() to make conditions like SQL
emp_data.filter(expr("DEPARTMENET='HEALTH' AND MONTHLY_HOURLY='Salary'")).show(truncate=False)

+---------------------------+-----------------------------------------------+-----------+--------------+--------------+-------------+-------------+-----------+
|NAME                       |JOB_TITLE                                      |DEPARTMENET|FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+---------------------------+-----------------------------------------------+-----------+--------------+--------------+-------------+-------------+-----------+
|ABREU,  ROSITA             |PERSONAL COMPUTER OPERATOR III                 |HEALTH     |F             |Salary        |NULL         |73104.0      |NULL       |
|ABUBAKER,  MOHAMMED G      |SENIOR ENVIRONMENTAL INSPECTOR                 |HEALTH     |F             |Salary        |NULL         |88044.0      |NULL       |
|ACEVEDO,  DAILHA           |COMMUNICABLE DISEASE CONTROL INVESTIGATOR II   |HEALTH     |F             |Salary        |NULL         |76584.0      |NULL       |
|ADAMS THOMAS,  CHEEAREASE R|PUBLIC HEAL

#### droping Duplicates
In some cases, we need to drop out the duplicate records either because of business requirements in 
our application or in a purpose of testing DataFrame to check if we have duplicate records or not. 
We can execute dropDuplicates method on specific columns or across all columns in a DataFrame, let’s 
filter duplicated records on department name, and job title columns so we can demonstrate 
dropDuplicates behavior, lets select data from two columns and examine duplication

In [0]:
emp_data.select('JOB_TITLE', 'DEPARTMENET').orderBy(col('DEPARTMENET'),ascending=False).show(100,False)

+----------------------------------+-----------+
|JOB_TITLE                         |DEPARTMENET|
+----------------------------------+-----------+
|CONSTRUCTION LABORER              |WATER MGMNT|
|OPERATING ENGINEER-GROUP A        |WATER MGMNT|
|WATER CHEMIST II                  |WATER MGMNT|
|STEAMFITTER                       |WATER MGMNT|
|FOREMAN OF WATER PIPE CONSTRUCTION|WATER MGMNT|
|CONSTRUCTION LABORER              |WATER MGMNT|
|WATER RATE TAKER                  |WATER MGMNT|
|OPERATING ENGINEER-GROUP A        |WATER MGMNT|
|ASST CHIEF OPERATING ENGINEER     |WATER MGMNT|
|EMERGENCY CREW DISPATCHER         |WATER MGMNT|
|HOISTING ENGINEER                 |WATER MGMNT|
|CONSTRUCTION LABORER              |WATER MGMNT|
|HOISTING ENGINEER                 |WATER MGMNT|
|MOTOR TRUCK DRIVER                |WATER MGMNT|
|CIVIL ENGINEER IV                 |WATER MGMNT|
|CIVIL ENGINEER IV                 |WATER MGMNT|
|SEWER BRICKLAYER                  |WATER MGMNT|
|MOTOR TRUCK DRIVER 

In [0]:
# now let's try to drop duplicates on both columns and check the dataframe
emp_data.select('JOB_TITLE', 'DEPARTMENET').dropDuplicates(['JOB_TITLE','DEPARTMENET']).orderBy(col('DEPARTMENET'),ascending=False).show(100,False)

+--------------------------------------+-----------+
|JOB_TITLE                             |DEPARTMENET|
+--------------------------------------+-----------+
|WATER CHEMIST III                     |WATER MGMNT|
|POOL MOTOR TRUCK DRIVER               |WATER MGMNT|
|PAINTER                               |WATER MGMNT|
|PLUMBING INSPECTOR                    |WATER MGMNT|
|HOUSE DRAIN INSPECTOR                 |WATER MGMNT|
|MANAGING DEPUTY COMMISSIONER          |WATER MGMNT|
|SUPERVISING CLERK                     |WATER MGMNT|
|HEAD STOREKEEPER                      |WATER MGMNT|
|MASON INSPECTOR                       |WATER MGMNT|
|FOREMAN OF PLUMBERS                   |WATER MGMNT|
|CHIEF CONTRACT EXPEDITER              |WATER MGMNT|
|MICROBIOLOGIST III                    |WATER MGMNT|
|FOREMAN OF WATER PIPE CONSTRUCTION    |WATER MGMNT|
|SEWER BRICKLAYER                      |WATER MGMNT|
|DEPUTY COMMISSIONER                   |WATER MGMNT|
|SUPERINTENDENT OF WATER METERS        |WATER 

#### Handle Null Values
We have three approaches in handling NULL values: 
  -  Approach 1: Replace Null value with other default value 
  - Approach 2: Drop records which has Null values 

In [0]:
## Approch 1 : Replace null values with other values
'''
DataFrame.na.fill(value):
   Replace null values with a certain values it will 
    work if all columns that has null values with the 
    same datatype, for example if we have two 
    column with Integer data type and one column 
    with String Data type and we typed 
    DataFrame.na.fill(0.0) none of the Null values 
    will be replaced  
'''
emp_data.select("ANNUAL_SALARY").show(30,False)

+-------------+
|ANNUAL_SALARY|
+-------------+
|101442.0     |
|94122.0      |
|111024.0     |
|114780.0     |
|NULL         |
|48078.0      |
|NULL         |
|NULL         |
|103350.0     |
|93354.0      |
|NULL         |
|68616.0      |
|56304.0      |
|84054.0      |
|87006.0      |
|105804.0     |
|68616.0      |
|101442.0     |
|94476.0      |
|116280.0     |
|99324.0      |
|NULL         |
|61884.0      |
|NULL         |
|90024.0      |
|91080.0      |
|48078.0      |
|48078.0      |
|72510.0      |
|80016.0      |
+-------------+
only showing top 30 rows


In [0]:
emp_data.select("ANNUAL_SALARY").na.fill(0.0).show(30,False)

+-------------+
|ANNUAL_SALARY|
+-------------+
|101442.0     |
|94122.0      |
|111024.0     |
|114780.0     |
|0.0          |
|48078.0      |
|0.0          |
|0.0          |
|103350.0     |
|93354.0      |
|0.0          |
|68616.0      |
|56304.0      |
|84054.0      |
|87006.0      |
|105804.0     |
|68616.0      |
|101442.0     |
|94476.0      |
|116280.0     |
|99324.0      |
|0.0          |
|61884.0      |
|0.0          |
|90024.0      |
|91080.0      |
|48078.0      |
|48078.0      |
|72510.0      |
|80016.0      |
+-------------+
only showing top 30 rows


In [0]:
# handling null in multiple columns with diffirent data types
emp_data.na.fill(0.0,['ANNUAL_SALARY','HOURLY_RATE']).na.fill(8.0,['TYPICAL_HOURS'])\
        .na.fill("Missing",['FULL_PART_TIME']).show(30,False)


+------------------------+--------------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|NAME                    |JOB_TITLE                             |DEPARTMENET     |FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+------------------------+--------------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|AARON,  JEFFERY M       |SERGEANT                              |POLICE          |F             |Salary        |8            |101442.0     |0.0        |
|AARON,  KARINA          |POLICE OFFICER (ASSIGNED AS DETECTIVE)|POLICE          |F             |Salary        |8            |94122.0      |0.0        |
|AARON,  KIMBERLEI R     |CHIEF CONTRACT EXPEDITER              |GENERAL SERVICES|F             |Salary        |8            |111024.0     |0.0        |
|ABAD JR,  VICENTE M     |CIVIL ENGINEER IV                     |WATER MGMNT     |

In [0]:
## Approch 2 : Drop null values
# to drop any record with null values
emp_data.na.drop(how='any').show(30,False)
# we'll got no records as all recoreds have null values

+----+---------+-----------+--------------+--------------+-------------+-------------+-----------+
|NAME|JOB_TITLE|DEPARTMENET|FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+----+---------+-----------+--------------+--------------+-------------+-------------+-----------+
+----+---------+-----------+--------------+--------------+-------------+-------------+-----------+



In [0]:
# we can also drop the record  only if have nulls in all columns
emp_data.na.drop(how='all').show(30,False)

+------------------------+--------------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|NAME                    |JOB_TITLE                             |DEPARTMENET     |FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+------------------------+--------------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|AARON,  JEFFERY M       |SERGEANT                              |POLICE          |F             |Salary        |NULL         |101442.0     |NULL       |
|AARON,  KARINA          |POLICE OFFICER (ASSIGNED AS DETECTIVE)|POLICE          |F             |Salary        |NULL         |94122.0      |NULL       |
|AARON,  KIMBERLEI R     |CHIEF CONTRACT EXPEDITER              |GENERAL SERVICES|F             |Salary        |NULL         |111024.0     |NULL       |
|ABAD JR,  VICENTE M     |CIVIL ENGINEER IV                     |WATER MGMNT     |

In [0]:
'''
We can merge between use of “any” or “All” with specifying some certain columns, in the following 
example we will drop the record if TYPICAL_HOURS, and HOURLY_RATE columns both have null values
'''

emp_data.na.drop(how='any',subset=['TYPICAL_HOURS','HOURLY_RATE']).show(30,False)

+----------------------+----------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|NAME                  |JOB_TITLE                         |DEPARTMENET     |FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+----------------------+----------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|ABARCA,  EMMANUEL     |CONCRETE LABORER                  |TRANSPORTN      |F             |Hourly        |40           |NULL         |43.72      |
|ABASCAL,  REECE E     |TRAFFIC CONTROL AIDE-HOURLY       |OEMC            |NULL          |Hourly        |20           |NULL         |19.86      |
|ABBATACOLA,  ROBERT J |ELECTRICAL MECHANIC               |AVIATION        |F             |Hourly        |40           |NULL         |49.35      |
|ABBOTT,  BETTY L      |SENIOR PROGRAMMER/ANALYST         |FAMILY & SUPPORT|P             |Hourly        |20          

In [0]:
# we also can count number of nulls in specific columns
emp_data.filter(col('FULL_PART_TIME').isNull()).count()


7

In [0]:
# count non null values
emp_data.filter(col('FULL_PART_TIME').isNotNull()).count()

33579

### DataFrame Alter Operations

In this section we will check the process which effect on the structure of the DataFrame, some of these 
operations can be done using select and expr as mentioned before, following list of operations we will 
cover in this section:
-  Add New column
- Rename column name
-  Remove existing column
- Alter column type 

#### Add new column
To add a new column, we will use withColumn method, syntax for this method is as following:

**withColumn**(new column name:String, 
expression of the new column: Column) 

In [0]:
emp_data.withColumn('MANAGEMENT_FLAG', expr('DEPARTMENET LIKE "%MGMNT%"')).select('DEPARTMENET','MANAGEMENT_FLAG').show(30,False)

+----------------+---------------+
|DEPARTMENET     |MANAGEMENT_FLAG|
+----------------+---------------+
|POLICE          |false          |
|POLICE          |false          |
|GENERAL SERVICES|false          |
|WATER MGMNT     |true           |
|TRANSPORTN      |false          |
|POLICE          |false          |
|OEMC            |false          |
|AVIATION        |false          |
|FIRE            |false          |
|POLICE          |false          |
|FAMILY & SUPPORT|false          |
|POLICE          |false          |
|FIRE            |false          |
|POLICE          |false          |
|POLICE          |false          |
|FIRE            |false          |
|POLICE          |false          |
|POLICE          |false          |
|FIRE            |false          |
|WATER MGMNT     |true           |
|FIRE            |false          |
|GENERAL SERVICES|false          |
|LAW             |false          |
|STREETS & SAN   |false          |
|POLICE          |false          |
|FIRE            |fa

In [0]:
hourly_emp = emp_data.filter(col('HOURLY_RATE') > 0)\
    .withColumn('HOURLY_PAYMENT', col('HOURLY_RATE') * col('TYPICAL_HOURS'))\
    .select('NAME','JOB_TITLE','DEPARTMENET','HOURLY_RATE','TYPICAL_HOURS','HOURLY_PAYMENT')


In [0]:
hourly_emp.show(50,False)

+----------------------+----------------------------------+----------------+-----------+-------------+------------------+
|NAME                  |JOB_TITLE                         |DEPARTMENET     |HOURLY_RATE|TYPICAL_HOURS|HOURLY_PAYMENT    |
+----------------------+----------------------------------+----------------+-----------+-------------+------------------+
|ABARCA,  EMMANUEL     |CONCRETE LABORER                  |TRANSPORTN      |43.72      |40           |1748.8            |
|ABASCAL,  REECE E     |TRAFFIC CONTROL AIDE-HOURLY       |OEMC            |19.86      |20           |397.2             |
|ABBATACOLA,  ROBERT J |ELECTRICAL MECHANIC               |AVIATION        |49.35      |40           |1974.0            |
|ABBOTT,  BETTY L      |SENIOR PROGRAMMER/ANALYST         |FAMILY & SUPPORT|2.65       |20           |53.0              |
|ABDULLAH,  RASHAD     |ELECTRICAL MECHANIC (AUTOMOTIVE)  |GENERAL SERVICES|49.35      |40           |1974.0            |
|ABDUL-SHAKUR,  TAHIR  |

#### Rename Column
To rename a column simply we need to use withColumnRenamed method as following

**withColumnRenamed**(old column name:String, 
new column name:String) 

In [0]:
emp_data.withColumnRenamed('NAME','EMP_NAME')\
    .withColumnRenamed('JOB_TITLE','TITLE').show(50,False)

+------------------------+------------------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|EMP_NAME                |TITLE                                     |DEPARTMENET     |FULL_PART_TIME|MONTHLY_HOURLY|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+------------------------+------------------------------------------+----------------+--------------+--------------+-------------+-------------+-----------+
|AARON,  JEFFERY M       |SERGEANT                                  |POLICE          |F             |Salary        |NULL         |101442.0     |NULL       |
|AARON,  KARINA          |POLICE OFFICER (ASSIGNED AS DETECTIVE)    |POLICE          |F             |Salary        |NULL         |94122.0      |NULL       |
|AARON,  KIMBERLEI R     |CHIEF CONTRACT EXPEDITER                  |GENERAL SERVICES|F             |Salary        |NULL         |111024.0     |NULL       |
|ABAD JR,  VICENTE M     |CIVIL ENGINEER IV               

#### Drop Column
To drop a column we need to use **_drop_** method on the required column

In [0]:
emp_data.drop('MONTHLY_HOURLY').show(50,False)

+------------------------+------------------------------------------+----------------+--------------+-------------+-------------+-----------+
|NAME                    |JOB_TITLE                                 |DEPARTMENET     |FULL_PART_TIME|TYPICAL_HOURS|ANNUAL_SALARY|HOURLY_RATE|
+------------------------+------------------------------------------+----------------+--------------+-------------+-------------+-----------+
|AARON,  JEFFERY M       |SERGEANT                                  |POLICE          |F             |NULL         |101442.0     |NULL       |
|AARON,  KARINA          |POLICE OFFICER (ASSIGNED AS DETECTIVE)    |POLICE          |F             |NULL         |94122.0      |NULL       |
|AARON,  KIMBERLEI R     |CHIEF CONTRACT EXPEDITER                  |GENERAL SERVICES|F             |NULL         |111024.0     |NULL       |
|ABAD JR,  VICENTE M     |CIVIL ENGINEER IV                         |WATER MGMNT     |F             |NULL         |114780.0     |NULL       |
|ABARC

#### Alter Column Data Type
To alter a column data type, we can use the **_cast_** method on the required column

In [0]:
emp_data.select(expr('ANNUAL_SALARY/30'), expr('ANNUAL_SALARY/12 as MONTHLY_SALARY ').cast(IntegerType())).show(50,False)

+--------------------+--------------+
|(ANNUAL_SALARY / 30)|MONTHLY_SALARY|
+--------------------+--------------+
|3381.4              |8453          |
|3137.4              |7843          |
|3700.8              |9252          |
|3826.0              |9565          |
|NULL                |NULL          |
|1602.6              |4006          |
|NULL                |NULL          |
|NULL                |NULL          |
|3445.0              |8612          |
|3111.8              |7779          |
|NULL                |NULL          |
|2287.2              |5718          |
|1876.8              |4692          |
|2801.8              |7004          |
|2900.2              |7250          |
|3526.8              |8817          |
|2287.2              |5718          |
|3381.4              |8453          |
|3149.2              |7873          |
|3876.0              |9690          |
|3310.8              |8277          |
|NULL                |NULL          |
|2062.8              |5157          |
|NULL       

In [0]:
emp_data.select(expr('ANNUAL_SALARY/30'), expr('ANNUAL_SALARY/12 as MONTHLY_SALARY ').cast(IntegerType())).printSchema()

root
 |-- (ANNUAL_SALARY / 30): double (nullable = true)
 |-- MONTHLY_SALARY: integer (nullable = true)



### Multiple DataFrames Operations

A common scenario in most applications is that we need to join, or merge between two different 
datasets which have two different information we need to bring together. 

_we will use Sales data and Product data._

In [0]:
Sales_scema = StructType([
  StructField("REQ_ID", IntegerType(), True), 
StructField("EMP_ID", IntegerType(), True), 
StructField("CUSTOMER_ID", IntegerType(), True), 
StructField("PRODUCT_ID", IntegerType(), True), 
StructField("PRICE", IntegerType(), True)
])

product_schema = StructType([
  StructField("PRODUCT_ID", IntegerType(), True), 
  StructField("PRODUCT_NAME", StringType(), True), 
  StructField("MANUFACTURER_NAME", StringType(), True), 
  StructField("PRODUCT_COLOR", StringType(), True) 
])

In [0]:
# loading Datasets
sales_df = spark.read.csv('/Volumes/workspace/default/data/sales_data.csv', header=True, schema=Sales_scema)
product_df = spark.read.csv('/Volumes/workspace/default/data/product_info.csv', header=True, schema=product_schema)

#### Inner Join
Inner join will join between left and right DataFrame based on matching keys in both DataFrames, and 
will return only selected records from both for the matched keys.

Left DataFrame.join(Right DataFrame, Join Condition, Join type) 

In [0]:
detialed_sales = sales_df.join(product_df, sales_df['PRODUCT_ID'] == product_df['PRODUCT_ID'])

In [0]:
detialed_sales.show(50,False)


+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|PRODUCT_ID|PRODUCT_NAME|MANUFACTURER_NAME|PRODUCT_COLOR|
+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|1     |101   |1001       |12        |2500 |12        |HTC One     |HTC              |Black        |
|2     |101   |1002       |15        |3000 |15        |Galaxy A3   |Samsung          |White        |
|3     |102   |1002       |13        |5999 |13        |HTC One     |HTC              |Gray         |
|4     |103   |1005       |17        |11000|17        |Galaxy A3   |Samsung          |Blue         |
|5     |102   |1003       |14        |3999 |14        |HTC One     |HTC              |Blue         |
|6     |104   |1005       |22        |5999 |22        |IPHONE 6    |Apple            |white        |
|7     |101   |1001       |16        |11000|16        |Galaxy A3   |Samsung          |Black

#### Left Join
Spark will get all data from the left side DataFrame, and matched records from right side DataFrame, 
and in case a key is not matching we will get right side DataFrame column with NULL values 

In [0]:
detialed_sales = sales_df.join(product_df, sales_df['PRODUCT_ID'] == product_df['PRODUCT_ID'], 'left')

In [0]:
detialed_sales.show(50,False)

+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|PRODUCT_ID|PRODUCT_NAME|MANUFACTURER_NAME|PRODUCT_COLOR|
+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|1     |101   |1001       |12        |2500 |12        |HTC One     |HTC              |Black        |
|2     |101   |1002       |15        |3000 |15        |Galaxy A3   |Samsung          |White        |
|3     |102   |1002       |13        |5999 |13        |HTC One     |HTC              |Gray         |
|4     |103   |1005       |17        |11000|17        |Galaxy A3   |Samsung          |Blue         |
|5     |102   |1003       |14        |3999 |14        |HTC One     |HTC              |Blue         |
|6     |104   |1005       |22        |5999 |22        |IPHONE 6    |Apple            |white        |
|7     |101   |1001       |16        |11000|16        |Galaxy A3   |Samsung          |Black

#### Right Join
Spark will get all data from the right side DataFrame, and matched records from left side DataFrame, 
and in case a key is not matching we will get left side DataFrame column with NULL values

In [0]:
detialed_sales = sales_df.join(product_df,sales_df['PRODUCT_ID'] == product_df['PRODUCT_ID'], 'right')
detialed_sales.show(50,False)

+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|PRODUCT_ID|PRODUCT_NAME|MANUFACTURER_NAME|PRODUCT_COLOR|
+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|NULL  |NULL  |NULL       |NULL      |NULL |24        |Honor 10    |Huawei           |Cyan         |
|8     |103   |1001       |15        |3000 |15        |Galaxy A3   |Samsung          |White        |
|16    |109   |1007       |20        |5999 |20        |Galaxy A5   |Samsung          |Blue         |
|18    |110   |1006       |21        |3560 |21        |IPHONE 6    |Apple            |Gold         |
|17    |111   |1007       |22        |5999 |22        |IPHONE 6    |Apple            |white        |
|14    |107   |1010       |17        |11000|17        |Galaxy A3   |Samsung          |Blue         |
|NULL  |NULL  |NULL       |NULL      |NULL |23        |GT2         |Huawei           |Black

#### Full Join
In Full outer join we get all the records from left and right side DataFrames, and in case of keys don’t 
has no equivalent value from any side other side columns will be NULL values as we can see from 
following example

In [0]:
detialed_sales = sales_df.join(product_df,sales_df['PRODUCT_ID'] == product_df['PRODUCT_ID'], 'full')


In [0]:
detialed_sales.show(50,False)

+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|PRODUCT_ID|PRODUCT_NAME|MANUFACTURER_NAME|PRODUCT_COLOR|
+------+------+-----------+----------+-----+----------+------------+-----------------+-------------+
|NULL  |NULL  |NULL       |NULL      |NULL |24        |Honor 10    |Huawei           |Cyan         |
|8     |103   |1001       |15        |3000 |15        |Galaxy A3   |Samsung          |White        |
|16    |109   |1007       |20        |5999 |20        |Galaxy A5   |Samsung          |Blue         |
|18    |110   |1006       |21        |3560 |21        |IPHONE 6    |Apple            |Gold         |
|17    |111   |1007       |22        |5999 |22        |IPHONE 6    |Apple            |white        |
|14    |107   |1010       |17        |11000|17        |Galaxy A3   |Samsung          |Blue         |
|NULL  |NULL  |NULL       |NULL      |NULL |23        |GT2         |Huawei           |Black

#### Special Joins
we have two special join types which has a behavior of a lookup more than a join behavior as following 
-  **left_semi**: lookup on right side DataFrame using left side DataFrame join keys, and return left 
side records only which has a matching values (exist) in right side DataFrame, in simple way  
lookup on right side DataFrame and get matching records only. 

- **left_anti** :  lookup on right side DataFrame using left side DataFrame join keys, and return left 
side records only which has a no matching values (exist) in right side DataFrame, in way lookup 
on right side DataFrame and get not matching (not existing) records only. 

##### Left Semi Join

In [0]:
detialed_sales = sales_df.join(product_df,sales_df['PRODUCT_ID'] == product_df['PRODUCT_ID'], 'left_semi')
detialed_sales.show(50,False)

+------+------+-----------+----------+-----+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|
+------+------+-----------+----------+-----+
|1     |101   |1001       |12        |2500 |
|2     |101   |1002       |15        |3000 |
|3     |102   |1002       |13        |5999 |
|4     |103   |1005       |17        |11000|
|5     |102   |1003       |14        |3999 |
|6     |104   |1005       |22        |5999 |
|7     |101   |1001       |16        |11000|
|8     |103   |1001       |15        |3000 |
|9     |105   |1003       |20        |5999 |
|10    |102   |1002       |12        |2500 |
|11    |106   |1006       |18        |4999 |
|12    |106   |1008       |19        |4500 |
|13    |108   |1002       |21        |3560 |
|14    |107   |1010       |17        |11000|
|15    |110   |1003       |16        |11000|
|16    |109   |1007       |20        |5999 |
|17    |111   |1007       |22        |5999 |
|18    |110   |1006       |21        |3560 |
|19    |108   |1009       |18        |4999 |
|20    |10

##### Left Anti Join

In [0]:
detialed_sales = sales_df.join(product_df,sales_df['PRODUCT_ID'] == product_df['PRODUCT_ID'], 'left_anti')
detialed_sales.show(50,False)

+------+------+-----------+----------+-----+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|
+------+------+-----------+----------+-----+
|21    |210   |1020       |30        |4500 |
|22    |106   |1001       |45        |5000 |
+------+------+-----------+----------+-----+



#### Joins tips tp reach max performace

- **Tip#1 :  Filter as early as possible**
    - To reduce the shuffle of data between worker nodes and avoid network congestion it is recommended 
to filter records from one of DataFrames to be joined or even both if possible to reduce the amount of 
data need to be shuffled and to enhance application performance.
-  **Tip#2 : Broadcast small DataFrames**
    - Spark has a useful technique to enhance joins and reduce shuffle between worker nodes, in case we 
have one of the two DataFrames are small like a lookup data or a reference data, we can broadcast this 
DataFrame to all worker nodes which has the partitions for the other larger DataFrame, then we will 
not need to do any kind of shuffling between nodes which will enhance the application performance 
positively. s_ales_data.join(**broadcast**(product_data),joinCondition,"inner").explain(false
 ) _
 
 - **Tip#3: Co-locate DataFrames** 
    - If partitions from both joined DataFrame exist in the same worker node, there will be no need for 
shuffling and join performance will be maximum as both left side and right side data collocated on the 
same worker node. Two RDDs will be colocated if they have the same partitioner and were shuffled as 
part of the same action.

### Analytical Queries 
The target from big data applications after all is to get insights, aggregation and analytical functions are 
our tools to get the required insights from our data, spark rich with analytical and aggregation 
functions that can provide us with required insights.

#### Count & DistinctCount

In [0]:
# getting all records count 
sales_df.count()

22

In [0]:
# we can specify column 
sales_df.select(count('CUSTOMER_ID')).show()

+------------------+
|count(CUSTOMER_ID)|
+------------------+
|                22|
+------------------+



In [0]:
# count consider nulls when counting all the the records but not when specifying a column
emp_data.select(count('*')).show()


+--------+
|count(1)|
+--------+
|   33586|
+--------+



In [0]:
# let's specify the column 
emp_data.select(count('ANNUAL_SALARY')).show()
# we will not get the same number as count('column') will not count nulls

+--------------------+
|count(ANNUAL_SALARY)|
+--------------------+
|               25635|
+--------------------+



In [0]:
# we also have the function CountDistinct which will count the distinct values in a column
sales_df.select(countDistinct('CUSTOMER_ID')).show()

+---------------------------+
|count(DISTINCT CUSTOMER_ID)|
+---------------------------+
|                         10|
+---------------------------+



#### Min & Max 

In [0]:
# to get max value of a column we use max()
sales_df.select(max('PRICE')).show()

+----------+
|max(PRICE)|
+----------+
|     11000|
+----------+



In [0]:
# to get the min value of a column we use min()
sales_df.select(min('PRICE')).show()

+----------+
|min(PRICE)|
+----------+
|      2500|
+----------+



In [0]:
'''
As you notice we can pass the column name directly as a string or pass an expression as following:
'''
sales_df.select(max(expr('PRICE * 10'))).show()

+-----------------+
|max((PRICE * 10))|
+-----------------+
|           110000|
+-----------------+



In [0]:
# we can cimbine all in one 
sales_df.select(min('PRICE').alias('min_price'),max('PRICE').alias('max_price'),count('*').alias('total_orders')).show()

+---------+---------+------------+
|min_price|max_price|total_orders|
+---------+---------+------------+
|     2500|    11000|          22|
+---------+---------+------------+



#### Sum & Avarege

In [0]:
# to get the sum of a column we use sum()
sales_df.select(sum('PRICE')).show()

+----------+
|sum(PRICE)|
+----------+
|    124612|
+----------+



In [0]:
# to get the sum of distinct values in a column we use sumDistinct()
sales_df.select(sumDistinct('PRICE')).show()



+-------------------+
|sum(DISTINCT PRICE)|
+-------------------+
|              44557|
+-------------------+



In [0]:
sales_df.groupBy('PRODUCT_ID').agg(sum('PRICE').cast(IntegerType())).orderBy('PRODUCT_ID').show()

+----------+-----------------------+
|PRODUCT_ID|CAST(sum(PRICE) AS INT)|
+----------+-----------------------+
|        12|                   5000|
|        13|                   5999|
|        14|                   3999|
|        15|                   6000|
|        16|                  22000|
|        17|                  22000|
|        18|                   9998|
|        19|                   9000|
|        20|                  11998|
|        21|                   7120|
|        22|                  11998|
|        30|                   4500|
|        45|                   5000|
+----------+-----------------------+



In [0]:
product_df.printSchema()

root
 |-- PRODUCT_ID: integer (nullable = true)
 |-- PRODUCT_NAME: string (nullable = true)
 |-- MANUFACTURER_NAME: string (nullable = true)
 |-- PRODUCT_COLOR: string (nullable = true)



In [0]:
sales_df.printSchema()

root
 |-- REQ_ID: integer (nullable = true)
 |-- EMP_ID: integer (nullable = true)
 |-- CUSTOMER_ID: integer (nullable = true)
 |-- PRODUCT_ID: integer (nullable = true)
 |-- PRICE: integer (nullable = true)



#### Exercise

write application to generate sales report with the following:
  - Customer ID
  - Average Sales (AVG_PURCHASE)
  - Minimum Sales (MIN_PURCHASE)
  - Products Pruchased 
  - Employees Assigned

In [0]:
sales_df.show(10,False)

+------+------+-----------+----------+-----+
|REQ_ID|EMP_ID|CUSTOMER_ID|PRODUCT_ID|PRICE|
+------+------+-----------+----------+-----+
|1     |101   |1001       |12        |2500 |
|2     |101   |1002       |15        |3000 |
|3     |102   |1002       |13        |5999 |
|4     |103   |1005       |17        |11000|
|5     |102   |1003       |14        |3999 |
|6     |104   |1005       |22        |5999 |
|7     |101   |1001       |16        |11000|
|8     |103   |1001       |15        |3000 |
|9     |105   |1003       |20        |5999 |
|10    |102   |1002       |12        |2500 |
+------+------+-----------+----------+-----+
only showing top 10 rows


In [0]:
product_df.show(10,False)

+----------+------------+-----------------+-------------+
|PRODUCT_ID|PRODUCT_NAME|MANUFACTURER_NAME|PRODUCT_COLOR|
+----------+------------+-----------------+-------------+
|12        |HTC One     |HTC              |Black        |
|13        |HTC One     |HTC              |Gray         |
|14        |HTC One     |HTC              |Blue         |
|15        |Galaxy A3   |Samsung          |White        |
|16        |Galaxy A3   |Samsung          |Black        |
|17        |Galaxy A3   |Samsung          |Blue         |
|18        |Galaxy A5   |Samsung          |White        |
|19        |Galaxy A5   |Samsung          |Black        |
|20        |Galaxy A5   |Samsung          |Blue         |
|21        |IPHONE 6    |Apple            |Gold         |
+----------+------------+-----------------+-------------+
only showing top 10 rows


In [0]:
sales_df = sales_df.withColumnRenamed('PRODUCT_ID','S_PRODUCT_ID')
product_df = product_df.withColumnRenamed('PRODUCT_ID','P_PRODUCT_ID')
sales_prod = sales_df.join(product_df, sales_df.S_PRODUCT_ID == product_df.P_PRODUCT_ID, 'left')



In [0]:
sales_prod.show(10,False)


+------+------+-----------+------------+-----+------------+------------+-----------------+-------------+
|REQ_ID|EMP_ID|CUSTOMER_ID|S_PRODUCT_ID|PRICE|P_PRODUCT_ID|PRODUCT_NAME|MANUFACTURER_NAME|PRODUCT_COLOR|
+------+------+-----------+------------+-----+------------+------------+-----------------+-------------+
|1     |101   |1001       |12          |2500 |12          |HTC One     |HTC              |Black        |
|2     |101   |1002       |15          |3000 |15          |Galaxy A3   |Samsung          |White        |
|3     |102   |1002       |13          |5999 |13          |HTC One     |HTC              |Gray         |
|4     |103   |1005       |17          |11000|17          |Galaxy A3   |Samsung          |Blue         |
|5     |102   |1003       |14          |3999 |14          |HTC One     |HTC              |Blue         |
|6     |104   |1005       |22          |5999 |22          |IPHONE 6    |Apple            |white        |
|7     |101   |1001       |16          |11000|16       

In [0]:
sales_prod.groupby('CUSTOMER_ID').agg(avg(col('PRICE')).alias('AVG_PURCHASE'),max(col('PRICE')).alias('MAX_PURCHASE'),min(col('PRICE')).alias('MIN_PURCHASE'),count(col('S_PRODUCT_ID')).alias('PRODUCTS_PURCHASED'),count(col('EMP_ID')).alias('EMPLOYEES_ASSIGNED')).orderBy('CUSTOMER_ID').show(truncate=False)

+-----------+-----------------+------------+------------+------------------+------------------+
|CUSTOMER_ID|AVG_PURCHASE     |MAX_PURCHASE|MIN_PURCHASE|PRODUCTS_PURCHASED|EMPLOYEES_ASSIGNED|
+-----------+-----------------+------------+------------+------------------+------------------+
|1001       |5375.0           |11000       |2500        |4                 |4                 |
|1002       |3764.75          |5999        |2500        |4                 |4                 |
|1003       |6999.333333333333|11000       |3999        |3                 |3                 |
|1005       |8499.5           |11000       |5999        |2                 |2                 |
|1006       |4279.5           |4999        |3560        |2                 |2                 |
|1007       |5999.0           |5999        |5999        |2                 |2                 |
|1008       |4500.0           |4500        |4500        |1                 |1                 |
|1009       |4999.0           |4999     