In [1]:
spark.stop()

In [2]:
sc.stop()

In [3]:
from pyspark import SparkConf, SparkContext
# setMaster() - set spark context manager which is local[cpu_cores]
config = SparkConf().setMaster("local[4]").setAppName("ETL Pipeline")
sc = SparkContext(conf=config)

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

In [5]:
spark

In [6]:
hremployeeDF = spark.read.format("jdbc")\
.option("url","jdbc:mysql://localhost:3306/hremployeeDB")\
.option("dbtable","HR_Employee").option("user","root").option("password","hadoop@123")\
.option("driver","com.mysql.cj.jdbc.Driver").load()

In [7]:
hremployeeDF.show()

+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|             JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|         1|               Sales|     Sales

In [8]:
# show physical plan of execution which is known as DAG.
hremployeeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(HR_Employee) [numPartitions=1] [EmployeeID#0,Department#1,JobRole#2,Attrition#3,Gender#4,Age#5,MaritalStatus#6,Education#7,EducationField#8,BusinessTravel#9,JobInvolvement#10,JobLevel#11,JobSatisfaction#12,Hourlyrate#13,Income#14,Salaryhike#15,OverTime#16,Workex#17,YearsSinceLastPromotion#18,EmpSatisfaction#19,TrainingTimesLastYear#20,WorkLifeBalance#21,Performance_Rating#22] PushedFilters: [], ReadSchema: struct<EmployeeID:int,Department:string,JobRole:string,Attrition:string,Gender:string,Age:int,Mar...


### Materialized view of table

In [9]:
hremployeeDF.createOrReplaceTempView("hremployee")

#### Display shape of hr employee table
    * show no of rows and no of columns

In [11]:
from pyspark.sql.functions import *

In [25]:
col_nos = len(hremployeeDF.columns)

In [26]:
spark.sql(f"""
select count(*) as rows, {col_nos} as columns from hremployee
""").show()

+----+-------+
|rows|columns|
+----+-------+
|1469|     23|
+----+-------+



*  This will not work here it will only mysql

spark.sql("""

select count(*) from information_schema.columns WHERE table_name ="HR_Employee"

""")

In [37]:
spark.sql("""
describe hremployee
""").show(100)

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|          EmployeeID|      int|   null|
|          Department|   string|   null|
|             JobRole|   string|   null|
|           Attrition|   string|   null|
|              Gender|   string|   null|
|                 Age|      int|   null|
|       MaritalStatus|   string|   null|
|           Education|   string|   null|
|      EducationField|   string|   null|
|      BusinessTravel|   string|   null|
|      JobInvolvement|   string|   null|
|            JobLevel|      int|   null|
|     JobSatisfaction|   string|   null|
|          Hourlyrate|      int|   null|
|              Income|      int|   null|
|          Salaryhike|      int|   null|
|            OverTime|   string|   null|
|              Workex|      int|   null|
|YearsSinceLastPro...|      int|   null|
|     EmpSatisfaction|   string|   null|
|TrainingTimesLast...|      int|   null|
|     WorkLifeBa

#### 2. write a query to show first 3 employees from each job role to join the company

In [45]:
spark.sql(""" 
select * from
(select EmployeeID, JobRole, RANK() OVER(partition by JobRole order by EmployeeID ) AS RANK
from hremployee) as _ where RANK < 4
""").show()

+----------+--------------------+----+
|EmployeeID|             JobRole|RANK|
+----------+--------------------+----+
|         1|     Sales Executive|   1|
|        28|     Sales Executive|   2|
|        40|     Sales Executive|   3|
|         9|Manufacturing Dir...|   1|
|        16|Manufacturing Dir...|   2|
|        21|Manufacturing Dir...|   3|
|         3|Laboratory Techni...|   1|
|         5|Laboratory Techni...|   2|
|         6|Laboratory Techni...|   3|
|        22|Sales Representative|   1|
|        34|Sales Representative|   2|
|        37|Sales Representative|   3|
|        10|Healthcare Repres...|   1|
|        29|Healthcare Repres...|   2|
|        32|Healthcare Repres...|   3|
|         2|  Research Scientist|   1|
|         4|  Research Scientist|   2|
|        13|  Research Scientist|   3|
|        19|             Manager|   1|
|        26|             Manager|   2|
+----------+--------------------+----+
only showing top 20 rows



In [32]:
hremployeeDF.columns

['EmployeeID',
 'Department',
 'JobRole',
 'Attrition',
 'Gender',
 'Age',
 'MaritalStatus',
 'Education',
 'EducationField',
 'BusinessTravel',
 'JobInvolvement',
 'JobLevel',
 'JobSatisfaction',
 'Hourlyrate',
 'Income',
 'Salaryhike',
 'OverTime',
 'Workex',
 'YearsSinceLastPromotion',
 'EmpSatisfaction',
 'TrainingTimesLastYear',
 'WorkLifeBalance',
 'Performance_Rating']

#### 3. Write a query to show top 3 employee from each job role earning  highest salary

In [49]:
spark.sql(""" 
select * from
(select  JobRole,Income, RANK() OVER(partition by JobRole order by Income desc) AS RANK
from hremployee) as _ where RANK < 4
""").show()

+--------------------+------+----+
|             JobRole|Income|RANK|
+--------------------+------+----+
|     Sales Executive| 13872|   1|
|     Sales Executive| 13770|   2|
|     Sales Executive| 13758|   3|
|Manufacturing Dir...| 13973|   1|
|Manufacturing Dir...| 13826|   2|
|Manufacturing Dir...| 13726|   3|
|Laboratory Techni...|  7403|   1|
|Laboratory Techni...|  6782|   2|
|Laboratory Techni...|  6674|   3|
|Sales Representative|  6632|   1|
|Sales Representative|  5405|   2|
|Sales Representative|  4502|   3|
|Healthcare Repres...| 13966|   1|
|Healthcare Repres...| 13964|   2|
|Healthcare Repres...| 13734|   3|
|  Research Scientist|  9724|   1|
|  Research Scientist|  6962|   2|
|  Research Scientist|  6854|   3|
|             Manager| 19999|   1|
|             Manager| 19943|   2|
+--------------------+------+----+
only showing top 20 rows



#### 4. show top 3 highest package from overall job role

In [50]:
spark.sql(""" 
select * from
(select  JobRole,Income, RANK() OVER(order by Income desc) AS RANK
from hremployee) as _ where RANK < 4
""").show()

+-----------------+------+----+
|          JobRole|Income|RANK|
+-----------------+------+----+
|          Manager| 19999|   1|
|Research Director| 19973|   2|
|          Manager| 19943|   3|
+-----------------+------+----+



#### LAG()

5. show employee in order of ascending order with respect to employee income compared to previous income for each job role

In [68]:
spark.sql(""" 
select *, (Income-prev) as diff from
(select  EmployeeID, JobRole,Income, 
LAG(Income,1) OVER(partition by JobRole order by EmployeeID asc) AS prev
from hremployee) as _  order by JobRole,diff
""").show(100)

+----------+--------------------+------+-----+-----+
|EmployeeID|             JobRole|Income| prev| diff|
+----------+--------------------+------+-----+-----+
|        10|Healthcare Repres...|  5237| null| null|
|       285|Healthcare Repres...|  4741|13496|-8755|
|      1183|Healthcare Repres...|  6842|13966|-7124|
|      1157|Healthcare Repres...|  4148|11245|-7097|
|       205|Healthcare Repres...|  6673|13734|-7061|
|       677|Healthcare Repres...|  4014|10552|-6538|
|       397|Healthcare Repres...|  4522|10965|-6443|
|       833|Healthcare Repres...|  5731|12169|-6438|
|      1065|Healthcare Repres...|  4035|10466|-6431|
|       745|Healthcare Repres...|  4777|10999|-6222|
|       736|Healthcare Repres...|  4240|10388|-6148|
|      1098|Healthcare Repres...|  4069|10124|-6055|
|        89|Healthcare Repres...|  4152|10096|-5944|
|       489|Healthcare Repres...|  4089| 9824|-5735|
|       929|Healthcare Repres...|  7978|13577|-5599|
|       105|Healthcare Repres...|  5163|10673|

In [72]:
spark.sql("""
select  EmployeeID, JobRole,Income, LAG(Income,1,0) OVER(partition by JobRole order by EmployeeID asc) as prev,
Income -LAG(Income,1,0) OVER(partition by JobRole order by EmployeeID asc) AS diff
from hremployee
""").show()

+----------+---------------+------+-----+-----+
|EmployeeID|        JobRole|Income| prev| diff|
+----------+---------------+------+-----+-----+
|         1|Sales Executive|  5993|    0| 5993|
|        28|Sales Executive|  6825| 5993|  832|
|        40|Sales Executive|  5376| 6825|-1449|
|        44|Sales Executive|  8726| 5376| 3350|
|        47|Sales Executive|  4568| 8726|-4158|
|        49|Sales Executive|  5772| 4568| 1204|
|        53|Sales Executive|  5454| 5772| -318|
|        55|Sales Executive|  4157| 5454|-1297|
|        57|Sales Executive|  9069| 4157| 4912|
|        64|Sales Executive|  7637| 9069|-1432|
|        71|Sales Executive|  5473| 7637|-2164|
|        77|Sales Executive|  4312| 5473|-1161|
|        83|Sales Executive| 10239| 4312| 5927|
|        90|Sales Executive|  9619|10239| -620|
|        92|Sales Executive|  5441| 9619|-4178|
|        93|Sales Executive|  5209| 5441| -232|
|        95|Sales Executive|  5010| 5209| -199|
|        97|Sales Executive|  4999| 5010

#### 6. Lead()
* Rows next record

In [75]:
spark.sql("""
select employeeid, department, jobrole, age, gender,income,
LEAD(income,2,0) over(partition by JobRole order by EmployeeID) as next_income
from hremployee
""").show()

+----------+----------+---------------+---+------+------+-----------+
|employeeid|department|        jobrole|age|gender|income|next_income|
+----------+----------+---------------+---+------+------+-----------+
|         1|     Sales|Sales Executive| 41|Female|  5993|       5376|
|        28|     Sales|Sales Executive| 42|  Male|  6825|       8726|
|        40|     Sales|Sales Executive| 33|Female|  5376|       4568|
|        44|     Sales|Sales Executive| 27|  Male|  8726|       5772|
|        47|     Sales|Sales Executive| 34|  Male|  4568|       5454|
|        49|     Sales|Sales Executive| 46|  Male|  5772|       4157|
|        53|     Sales|Sales Executive| 44|Female|  5454|       9069|
|        55|     Sales|Sales Executive| 26|Female|  4157|       7637|
|        57|     Sales|Sales Executive| 35|  Male|  9069|       5473|
|        64|     Sales|Sales Executive| 59|Female|  7637|       4312|
|        71|     Sales|Sales Executive| 59|Female|  5473|      10239|
|        77|     Sal

#### NTILE()

* dividing records

In [76]:
spark.sql("""
select employeeid, department, jobrole, age, gender,income,
NTILE(4) over(order by INCOME) as SALARY_QUARTILES
from hremployee
""").show()

+----------+--------------------+--------------------+---+------+------+----------------+
|employeeid|          department|             jobrole|age|gender|income|SALARY_QUARTILES|
+----------+--------------------+--------------------+---+------+------+----------------+
|       514|Research & Develo...|  Research Scientist| 20|  Male|  1009|               1|
|       728|Research & Develo...|  Research Scientist| 18|  Male|  1051|               1|
|       765|               Sales|Sales Representative| 28|  Male|  1052|               1|
|      1338|               Sales|Sales Representative| 30|  Male|  1081|               1|
|      1365|               Sales|Sales Representative| 29|  Male|  1091|               1|
|       178|Research & Develo...|Laboratory Techni...| 19|  Male|  1102|               1|
|       912|               Sales|Sales Representative| 25|  Male|  1118|               1|
|      1402|Research & Develo...|Laboratory Techni...| 31|Female|  1129|               1|
|       30

#### Find no of employees in each percentile group 0-25th, 25th-50, 50-75,75-100 using percentile rank, case - when

In [101]:
spark.sql("""
SELECT Groups,count(*) as count from

(SELECT *, 
CASE 
WHEN percent<0.25 then '0-25'
WHEN percent<0.50 then '25-50'
WHEN percent<0.75 then '50-75'
else '75-100'
END AS Groups FROM
(select employeeid, income,
PERCENT_RANK() over(PARTITION BY Department order by INCOME) as percent
from hremployee) AS _  ) 
as __ group by Groups

""").show()

+------+-----+
|Groups|count|
+------+-----+
|75-100|  369|
| 25-50|  366|
| 50-75|  367|
|  0-25|  367|
+------+-----+



### Hive Integration with Pyspark

In [1]:
spark.stop()

In [2]:
!jps

13281 ResourceManager
13014 SecondaryNameNode
13448 NodeManager
12587 NameNode
492 Jps
413 SparkSubmit
12782 DataNode


In [3]:
# spark integration with Hive warehouse.
# config name for hive - integration property-name = spark.sql.warehouse.dir" 
# value = "/user/hive/warehouse"
spark = (SparkSession.builder.appName("Pyspark-Hive-Integration")
        .config("spark.sql.warehouse.dir","/user/hive/warehouse")
        .enableHiveSupport().getOrCreate())

In [4]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [5]:
spark.sql("""
create database if not exists airlines
""")

DataFrame[]

In [7]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|    airlines|
|     default|
+------------+



In [6]:
spark.sql("""
use airlines
""")

DataFrame[]

In [8]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [10]:
spark.sql("""
create table if not exists flights(DayofMonth int,DayOfWeek int,
Carrier varchar(20), OriginAirportID int, DestAirportID int, DepDelay int, ArrDelay int)
row format delimited 
fields terminated by ','
linEs terminated by '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

DataFrame[]

In [11]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines|  flights|      false|
+--------+---------+-----------+



In [22]:
spark.sql("""
load data local inpath '/home/hadoop/Downloads/raw_flight_data1.csv'
overwrite into table flights
""")

DataFrame[]

In [13]:
spark.sql("""
create table if not exists airports(airport_id int,city varchar(50), state varchar(50),name varchAr(100)
)
row format delimited 
fields terminated by ','
linEs terminated by '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

DataFrame[]

In [14]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [21]:
spark.sql("""
load data local inpath '/home/hadoop/Downloads/airports1.csv'
overwrite into table airports
""")

DataFrame[]

In [17]:
spark.sql("select count(*) from airports").show()

+--------+
|count(1)|
+--------+
|     366|
+--------+



In [18]:
spark.sql("select count(*) from flights").show()

+--------+
|count(1)|
+--------+
| 2719419|
+--------+



In [24]:
spark.sql("select * from flights").show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

### 1. Extract

In [25]:
flight_df = spark.table("airlines.flights")
airports_df = spark.table("airlines.airports")

In [26]:
airports_df.show()

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256|

###  2. Transformation

In [27]:
flights_join = flight_df.join(airports_df, on = flight_df.OriginAirportID == airports_df.airport_id,how = "inner")

In [29]:
flights_join.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|          city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|        19|        5|     DL|          11433|        13303|      -3|       1|     11433|       Detroit|   MI|Detroit Metro Way...|
|        19|        5|     DL|          14869|        12478|       0|      -8|     14869|Salt Lake City|   UT|Salt Lake City In...|
|        19|        5|     DL|          14057|        14869|      -4|     -15|     14057|      Portland|   OR|Portland Internat...|
|        19|        5|     DL|          15016|        11433|      28|      24|     15016|     St. Louis|   MO|Lambert-St. Louis...|
|        19|        5|     DL|          11193|        12892|      -6|     -1

### 3. Load

In [32]:
flights_join= flights_join.repartition(4)

In [33]:
flights_join.write.parquet("file:///home/hadoop/Downloads/flights")

In [35]:
# Read a parquet file format
flights_parquet_df = spark.read.parquet("file:///home/hadoop/Downloads/flights/")

In [37]:
flights_join.write.parquet("/flights1")

In [39]:
# partitioning
flights_join.write.partitionBy("Carrier").parquet("/airlines")

In [43]:
# bucketing
flights_join.write.bucketBy(col = 'state', numBuckets = 50).format("csv").saveAsTable("bucketed_table")

In [44]:
flights_join.write.partitionBy("Carrier").bucketBy(col = 'state', numBuckets = 30).format("parquet")\
.saveAsTable("part_bucket_table")

In [45]:
# not qurying from materialized table but actual table
spark.sql("select Carrier, count(*) from part_bucket_table group by Carrier").show()

+-------+--------+
|Carrier|count(1)|
+-------+--------+
|     UA|  122443|
|     AA|  124037|
|     EV|   46563|
|     B6|   51381|
|     DL|  134724|
|     OO|   69785|
|     F9|    9811|
|     YV|   14612|
|     US|  100668|
|     MQ|   45926|
|     HA|    4962|
|     AS|   28796|
|     FL|   28053|
|     VX|   14683|
|     WN|  216101|
|     9E|   36030|
+-------+--------+



### Loading on MySQL

In [49]:
connection_properties = {
    'user':"root",
    'password':"hadoop@123",
    'driver':'com.mysql.cj.jdbc.Driver'
}

flights_join.write.jdbc(url = 'jdbc:mysql://localhost:3306/flights', table = "airlines", 
                       mode = "overwrite", properties = connection_properties)

In [50]:
spark.stop()

In [51]:
sc.stop()