In [1]:
sc

In [2]:
spark

In [3]:
sc.stop()
spark.stop()

In [4]:
from pyspark import SparkContext,SparkConf
# 2 in local means the number of cores allocated
# setMaster() used to set Spark Context Manager which is local [core_of_cpu]
config =  SparkConf().setMaster("local[4]").setAppName("ETL Session")

sc = SparkContext(conf = config)

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETL Pipeline').getOrCreate()

In [6]:
spark

In [7]:
sc

In [8]:
hremployeeDF = spark.read.format('jdbc')\
    .option('url', 'jdbc:mysql://localhost:3306/hremployeedb')\
    .option('dbtable', 'HR_Employee')\
    .option('user', 'root')\
    .option('password', 'hadoop@123')\
    .option('driver', 'com.mysql.cj.jdbc.Driver').load()

In [9]:
hremployeeDF.show()

+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|             JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|         1|               Sales|     Sales

In [10]:
# Show Physical plan of execution which is known as DAG.
hremployeeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(HR_Employee) [numPartitions=1] [EmployeeID#0,Department#1,JobRole#2,Attrition#3,Gender#4,Age#5,MaritalStatus#6,Education#7,EducationField#8,BusinessTravel#9,JobInvolvement#10,JobLevel#11,JobSatisfaction#12,Hourlyrate#13,Income#14,Salaryhike#15,OverTime#16,Workex#17,YearsSinceLastPromotion#18,EmpSatisfaction#19,TrainingTimesLastYear#20,WorkLifeBalance#21,Performance_Rating#22] PushedFilters: [], ReadSchema: struct<EmployeeID:int,Department:string,JobRole:string,Attrition:string,Gender:string,Age:int,Mar...


#### Materialized View of Table

In [11]:
hremployeeDF.createOrReplaceTempView("hremployee")

#### 1. Display shape of hremployee table
        * Show number of rows & number of columns

In [12]:
spark.sql("""
    SELECT COUNT(*) AS ROW_COUNT
    FROM hremployee
""").show()

+---------+
|ROW_COUNT|
+---------+
|     1469|
+---------+



In [13]:
num_of_cols = len(hremployeeDF.columns)

In [14]:
len(hremployeeDF.collect()[0])

23

In [15]:
spark.sql(f"""
    SELECT COUNT(*) AS ROW_COUNT,
    {num_of_cols} AS COLUMN_COUNT
    FROM hremployee
""").show()

+---------+------------+
|ROW_COUNT|COLUMN_COUNT|
+---------+------------+
|     1469|          23|
+---------+------------+



#### MySql query to find the shape of the database
spark.sql("""
    SELECT COUNT(*) AS ROW_COUNT,
    SIZE(COLLECT_LIST(*)) AS COLUMN_COUNT
    FROM hremployee
""")

In [16]:
hremployeeDF.columns

['EmployeeID',
 'Department',
 'JobRole',
 'Attrition',
 'Gender',
 'Age',
 'MaritalStatus',
 'Education',
 'EducationField',
 'BusinessTravel',
 'JobInvolvement',
 'JobLevel',
 'JobSatisfaction',
 'Hourlyrate',
 'Income',
 'Salaryhike',
 'OverTime',
 'Workex',
 'YearsSinceLastPromotion',
 'EmpSatisfaction',
 'TrainingTimesLastYear',
 'WorkLifeBalance',
 'Performance_Rating']

#### 2. Write a Query to show first 3 employee from each job role to join the company


In [17]:
spark.sql("""
    SELECT EmployeeID,JobRole
    FROM (
        SELECT EmployeeID,JobRole,
        row_number() OVER(partition by JobRole ORDER BY EmployeeID)
        AS rank
        FROM hremployee
        ) AS _
    WHERE rank < 4
""").show()

+----------+--------------------+
|EmployeeID|             JobRole|
+----------+--------------------+
|         1|     Sales Executive|
|        28|     Sales Executive|
|        40|     Sales Executive|
|         9|Manufacturing Dir...|
|        16|Manufacturing Dir...|
|        21|Manufacturing Dir...|
|         3|Laboratory Techni...|
|         5|Laboratory Techni...|
|         6|Laboratory Techni...|
|        22|Sales Representative|
|        34|Sales Representative|
|        37|Sales Representative|
|        10|Healthcare Repres...|
|        29|Healthcare Repres...|
|        32|Healthcare Repres...|
|         2|  Research Scientist|
|         4|  Research Scientist|
|        13|  Research Scientist|
|        19|             Manager|
|        26|             Manager|
+----------+--------------------+
only showing top 20 rows



#### 3. Top 3 employee with highest salary in Job Role.

In [18]:
spark.sql("""
    SELECT EmployeeID,JobRole,Income
    FROM (
        SELECT EmployeeID,JobRole,Income,
        row_number() OVER(partition by JobRole ORDER BY Income DESC)
        AS rank
        FROM hremployee
        ) AS _
    WHERE rank < 4
""").show(21)

+----------+--------------------+------+
|EmployeeID|             JobRole|Income|
+----------+--------------------+------+
|        99|     Sales Executive| 13872|
|       545|     Sales Executive| 13770|
|       839|     Sales Executive| 13758|
|       722|Manufacturing Dir...| 13973|
|       628|Manufacturing Dir...| 13826|
|       744|Manufacturing Dir...| 13726|
|       678|Laboratory Techni...|  7403|
|       817|Laboratory Techni...|  6782|
|       945|Laboratory Techni...|  6674|
|       565|Sales Representative|  6632|
|      1308|Sales Representative|  5405|
|      1220|Sales Representative|  4502|
|      1181|Healthcare Repres...| 13966|
|       317|Healthcare Repres...| 13964|
|       190|Healthcare Repres...| 13734|
|        68|  Research Scientist|  9724|
|      1315|  Research Scientist|  6962|
|      1305|  Research Scientist|  6854|
|       191|             Manager| 19999|
|       852|             Manager| 19943|
|       166|             Manager| 19926|
+----------+----

#### Top 3 highest package from overall Job Role

In [19]:
spark.sql("""
    SELECT EmployeeID,JobRole,Income
    FROM (
        SELECT EmployeeID,JobRole,Income,
        row_number() OVER(ORDER BY Income DESC)
        AS rank
        FROM hremployee
        ) AS _
    ORDER BY rank
    LIMIT 3
""").show()

+----------+-----------------+------+
|EmployeeID|          JobRole|Income|
+----------+-----------------+------+
|       191|          Manager| 19999|
|       747|Research Director| 19973|
|       852|          Manager| 19943|
+----------+-----------------+------+



#### write a spark query to shoq employee in order of Ascending with respect to employee income compared to previous income for each job role.

In [20]:
spark.sql("""
        SELECT EmployeeID,JobRole,Income,
        lag(Income, 1) OVER(PARTITION BY JobRole ORDER BY EmployeeID) prev_inc,
        Income - lag(Income, 1) OVER(PARTITION BY JobRole ORDER BY EmployeeID)
        AS diff
        FROM hremployee
        ORDER BY JobRole,diff
""").show(100)

+----------+--------------------+------+--------+-----+
|EmployeeID|             JobRole|Income|prev_inc| diff|
+----------+--------------------+------+--------+-----+
|        10|Healthcare Repres...|  5237|    null| null|
|       285|Healthcare Repres...|  4741|   13496|-8755|
|      1183|Healthcare Repres...|  6842|   13966|-7124|
|      1157|Healthcare Repres...|  4148|   11245|-7097|
|       205|Healthcare Repres...|  6673|   13734|-7061|
|       677|Healthcare Repres...|  4014|   10552|-6538|
|       397|Healthcare Repres...|  4522|   10965|-6443|
|       833|Healthcare Repres...|  5731|   12169|-6438|
|      1065|Healthcare Repres...|  4035|   10466|-6431|
|       745|Healthcare Repres...|  4777|   10999|-6222|
|       736|Healthcare Repres...|  4240|   10388|-6148|
|      1098|Healthcare Repres...|  4069|   10124|-6055|
|        89|Healthcare Repres...|  4152|   10096|-5944|
|       489|Healthcare Repres...|  4089|    9824|-5735|
|       929|Healthcare Repres...|  7978|   13577

#### LEAD() -> Row's Next Records

In [21]:
spark.sql("""
        SELECT EmployeeID,Department,jobRole,age,gender,Income,Workex,
        lead(Income, 2, 0) OVER(ORDER BY EmployeeID) next_inc
        FROM hremployee
""").show()

+----------+--------------------+--------------------+---+------+------+------+--------+
|EmployeeID|          Department|             jobRole|age|gender|Income|Workex|next_inc|
+----------+--------------------+--------------------+---+------+------+------+--------+
|         1|               Sales|     Sales Executive| 41|Female|  5993|     8|    2090|
|         2|Research & Develo...|  Research Scientist| 49|  Male|  5130|    10|    2909|
|         3|Research & Develo...|Laboratory Techni...| 37|  Male|  2090|     7|    3468|
|         4|Research & Develo...|  Research Scientist| 33|Female|  2909|     8|    3068|
|         5|Research & Develo...|Laboratory Techni...| 27|  Male|  3468|     6|    2670|
|         6|Research & Develo...|Laboratory Techni...| 32|  Male|  3068|     8|    2693|
|         7|Research & Develo...|Laboratory Techni...| 59|Female|  2670|    12|    9526|
|         8|Research & Develo...|Laboratory Techni...| 30|  Male|  2693|     1|    5237|
|         9|Research 

#### NTILE() 
        * Dividing records into number of Quarters.

In [22]:
spark.sql("""
        SELECT EmployeeID,Department,jobRole,age,gender,Income,
        NTILE(4) OVER(ORDER BY income) salary_quartiles
        FROM hremployee
""").show()

+----------+--------------------+--------------------+---+------+------+----------------+
|EmployeeID|          Department|             jobRole|age|gender|Income|salary_quartiles|
+----------+--------------------+--------------------+---+------+------+----------------+
|       514|Research & Develo...|  Research Scientist| 20|  Male|  1009|               1|
|       728|Research & Develo...|  Research Scientist| 18|  Male|  1051|               1|
|       765|               Sales|Sales Representative| 28|  Male|  1052|               1|
|      1338|               Sales|Sales Representative| 30|  Male|  1081|               1|
|      1365|               Sales|Sales Representative| 29|  Male|  1091|               1|
|       178|Research & Develo...|Laboratory Techni...| 19|  Male|  1102|               1|
|       912|               Sales|Sales Representative| 25|  Male|  1118|               1|
|      1402|Research & Develo...|Laboratory Techni...| 31|Female|  1129|               1|
|       30

#### Find the number of Employee in each percentile_group 0-25 ,25-50,50-75 based on Income using percent_rank and case when.

In [23]:
spark.sql("""
SELECT percentile_range, count(*) Emp_Count
FROM (
    SELECT
        INCOME,
        PERCENT_RANK() OVER(PARTITION BY DEPARTMENT ORDER BY INCOME) * 100 as percent_rank,
        CASE 
            WHEN PERCENT_RANK() OVER(PARTITION BY DEPARTMENT ORDER BY INCOME) * 100 < 25 THEN '0-25'
            WHEN PERCENT_RANK() OVER(PARTITION BY DEPARTMENT ORDER BY INCOME) * 100 < 50 THEN '25-50'
            WHEN PERCENT_RANK() OVER(PARTITION BY DEPARTMENT ORDER BY INCOME) * 100 < 75 THEN '50-75'
            ELSE '75-100'
        END as percentile_range
    FROM hremployee
    ) _ 
GROUP BY percentile_range
ORDER BY percentile_range
""").show()


+----------------+---------+
|percentile_range|Emp_Count|
+----------------+---------+
|            0-25|      367|
|           25-50|      366|
|           50-75|      367|
|          75-100|      369|
+----------------+---------+



#### Hive Integration with Pyspark

In [2]:
spark.stop()

2482 NameNode
3268 NodeManager
3093 ResourceManager
12327 SparkSubmit
12455 Jps
2683 DataNode
2924 SecondaryNameNode


In [4]:
# Spark Integration  with Hive warehouse.
# Config name for Hive-Integration 'spark.sql.warhouse.dir' value = '/user/hive/warehouse'
spark = (SparkSession.builder.appName('Pyspark-Hive')
         .config('spark.sql.warehouse.dir','/user/hive/warehouse')
        .enableHiveSupport().getOrCreate())

In [5]:
spark

In [6]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|    airlines|
|     default|
+------------+



In [7]:
 spark.sql("""
     CREATE DATABASE IF NOT EXISTS airlines
 """)

DataFrame[]

In [8]:
spark.sql("""
     use airlines
 """)

DataFrame[]

In [9]:
spark.sql("""
     show databases
 """).show()

+------------+
|databaseName|
+------------+
|    airlines|
|     default|
+------------+



In [10]:
spark.sql("""
    show tables
 """).show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [11]:
spark.sql("""
     create table if not exists flights(
             DayOfMonth int, 
             DayOfWeeK int,
             Carrier varchar(10),
             OriginAirportID int,
             DestAirportID int,
             DepDelay int,
             ArrDelay int
             )
             row format delimited
             fields terminated by ','
             lines terminated by '\n'
             STORED AS TEXTFILE
             TBLPROPERTIES('skip.header.line.count' = '1')
 """)

DataFrame[]

In [12]:
spark.sql("""show tables""").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [13]:
spark.sql("""load data local inpath '/home/hadoop/Downloads/raw_flight_data1.csv' overwrite into table flights
 """)

DataFrame[]

In [14]:
spark.sql("""
    create table if not exists airports(airport_id int, city varchar(50),state varchar(50) )
    row format delimited
    fields terminated by ','
    lines terminated by '\n'
    stored as textfile
    tblproperties ('skip.header.line.count' = '1')
""")

DataFrame[]

In [15]:
spark.sql("""load data local inpath '/home/hadoop/Downloads/airports1.csv' overwrite into table airports""")

DataFrame[]

In [16]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [17]:
spark.sql('select * from flights').show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayOfMonth|DayOfWeeK|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

### 1. Extract

In [18]:
flights_df = spark.table("airlines.flights")
airports_df = spark.table("airlines.airports")

In [19]:
airports_df.show(2)

+----------+-----------+-----+
|airport_id|       city|state|
+----------+-----------+-----+
|     10165|Adak Island|   AK|
|     10299|  Anchorage|   AK|
+----------+-----------+-----+
only showing top 2 rows



### 2. Transformation

In [20]:
flights_join = flights_df.join(airports_df, on= flights_df.OriginAirportID == airports_df.airport_id , how='inner')

In [21]:
flights_join.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+
|DayOfMonth|DayOfWeeK|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|          city|state|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+
|        19|        5|     DL|          11433|        13303|      -3|       1|     11433|       Detroit|   MI|
|        19|        5|     DL|          14869|        12478|       0|      -8|     14869|Salt Lake City|   UT|
|        19|        5|     DL|          14057|        14869|      -4|     -15|     14057|      Portland|   OR|
|        19|        5|     DL|          15016|        11433|      28|      24|     15016|     St. Louis|   MO|
|        19|        5|     DL|          11193|        12892|      -6|     -11|     11193|    Cincinnati|   OH|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+
o

### 3. Load

In [22]:
flights_join = flights_join.repartition(4)

In [23]:
flights_df.write.parquet("file:///home/hadoop/Downloads/flights/")

AnalysisException: 'path file:/home/hadoop/Downloads/flights already exists.;'

In [24]:
# Read a parquet file format
flights_parquet_df = spark.read.parquet("file:///home/hadoop/Downloads/flights/")

In [None]:
flights_join.write.parquet("/flights1")

In [None]:
flights_join.write.partitionBy('Carrier').parquet('/airlines')

In [26]:
flights_join.write.bucketBy(col = 'state', numBuckets = 50).format("csv").saveAsTable("bucketed_table")

AnalysisException: 'Table `bucketed_table` already exists.;'

In [34]:
flights_join.write.partitionBy('Carrier').bucketBy(col = 'state', numBuckets = 30)\
.format("csv").saveAsTable("Part_bucket_table")

In [28]:
spark.sql("SELECT carrier, count(*) from part_bucketed_table group by carrier").show()

+-------+--------+
|carrier|count(1)|
+-------+--------+
|     UA|  122443|
|     AA|  124037|
|     EV|   46563|
|     B6|   51381|
|     DL|  134724|
|     OO|   69785|
|     F9|    9811|
|     YV|   14612|
|     US|  100668|
|     MQ|   45926|
|     HA|    4962|
|     AS|   28796|
|     FL|   28053|
|     VX|   14683|
|     WN|  216101|
|     9E|   36030|
+-------+--------+



### Load on MySQL

In [35]:
connection_properties = {
    'user' : 'root',
    'password' : 'hadoop@123',
    'driver' : 'com.mysql.cj.jdbc.Driver'
}

flights_join.write.jdbc(url="jdbc:mysql://localhost:3306/flights",table='airlines',
                        mode="overwrite", properties=connection_properties)

In [36]:
spark.stop()