In [1]:
sc


In [1]:
spark.stop()

In [14]:
sc.stop()

In [1]:
spark.stop()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


config = SparkConf().setMaster('local[4]').setAppName("ETL Pipeline")
sc = SparkContext(conf = config)
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

In [3]:
spark

In [4]:
hremployeeDF = spark.read.format("jdbc")\
.option("url","jdbc:mysql://localhost:3306/hremployeeDb")\
.option("dbtable","HR_Employee").option("user","root").option("password","hadoop@123")\
.option("driver","com.mysql.cj.jdbc.Driver").load()

In [5]:
hremployeeDF.show(2)

+----------+--------------------+------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|           JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|         1|               Sales|   Sales Executi

In [6]:
# Show Physical Plan of execution which is known as DAG
hremployeeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(HR_Employee) [numPartitions=1] [EmployeeID#0,Department#1,JobRole#2,Attrition#3,Gender#4,Age#5,MaritalStatus#6,Education#7,EducationField#8,BusinessTravel#9,JobInvolvement#10,JobLevel#11,JobSatisfaction#12,Hourlyrate#13,Income#14,Salaryhike#15,OverTime#16,Workex#17,YearsSinceLastPromotion#18,EmpSatisfaction#19,TrainingTimesLastYear#20,WorkLifeBalance#21,Performance_Rating#22] PushedFilters: [], ReadSchema: struct<EmployeeID:int,Department:string,JobRole:string,Attrition:string,Gender:string,Age:int,Mar...


### Materialized View of Table

In [7]:
hremployeeDF.createOrReplaceTempView('hremployee')

### 1.Display shape of hremployee table 
* Show number of rows and number of columns

In [8]:
num_of_cols = len(hremployeeDF.columns)

In [9]:
spark.sql(f'''
SELECT COUNT(*) as RowCount,{num_of_cols} as ColCount from hremployee
''').show()

+--------+--------+
|RowCount|ColCount|
+--------+--------+
|    1469|      23|
+--------+--------+



spark.sql('''
SELECT count(*) as ROWS,SIZE(COLLECT_LIST(*)) as COLUMNS_COUNT from hremployee
''')

In [10]:
spark.sql("describe hremployee").show(23)

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|          EmployeeID|      int|   null|
|          Department|   string|   null|
|             JobRole|   string|   null|
|           Attrition|   string|   null|
|              Gender|   string|   null|
|                 Age|      int|   null|
|       MaritalStatus|   string|   null|
|           Education|   string|   null|
|      EducationField|   string|   null|
|      BusinessTravel|   string|   null|
|      JobInvolvement|   string|   null|
|            JobLevel|      int|   null|
|     JobSatisfaction|   string|   null|
|          Hourlyrate|      int|   null|
|              Income|      int|   null|
|          Salaryhike|      int|   null|
|            OverTime|   string|   null|
|              Workex|      int|   null|
|YearsSinceLastPro...|      int|   null|
|     EmpSatisfaction|   string|   null|
|TrainingTimesLast...|      int|   null|
|     WorkLifeBa

### 2.Write a query to show first three employee from each Job Role to join the company.

In [None]:
spark.sql(f'''
SELECT
    JobRole, EmployeeID
FROM
    (SELECT
        JobRole,
        EmployeeID,
        RANK() OVER (PARTITION BY JobRole ORDER BY EmployeeID) AS rank
        FROM
            hremployee
    )
WHERE rank <= 3
ORDER BY JobRole, rank
''').show(50)

### 3.Write a query to show top 3 employee from each Job Role earning highest salary

In [None]:
spark.sql(f'''
SELECT
    EmployeeID,JobRole,Income
FROM
    (SELECT
        Income,
        EmployeeID,
        JobRole,
        RANK() OVER (PARTITION BY JobRole ORDER BY Income desc) AS rank
        FROM
            hremployee
    )
WHERE rank <= 3
ORDER BY JobRole, rank
''').show()

### 4.Show top 3 Highest Package from overall Job Role

In [None]:
spark.sql(f'''
SELECT JobRole, Income FROM hremployee ORDER BY Income DESC LIMIT 3
''').show()

### 5.Lag() - Write a Spark SQL query to show employee in order of  Ascending Order with respect to employee income compared to previous income for each job role.

In [None]:
spark.sql('''
SELECT EmployeeID, JobRole,Income,Previous,(Income - Previous) as Diff
FROM
    (SELECT
        EmployeeID,
        JobRole,
        Income,
        LAG(Income) OVER (PARTITION BY JobRole ORDER BY EmployeeID) AS Previous
    FROM hremployee
    )
ORDER BY JobRole,Diff
''').show()

### 

In [None]:
spark.sql('''
(SELECT
    EmployeeID,
    JobRole,
    Income - LAG(Income,1,0) OVER (PARTITION BY JobRole ORDER BY EmployeeID) AS Difference
FROM hremployee)''').show()

### Lead()
* Row's next records 

In [None]:
spark.sql('''
SELECT 
    EmployeeID,
    Department,
    JobRole,
    Age,
    Gender,
    Income,
    LEAD(Income,2,0) OVER(ORDER BY EmployeeID) as lead
FROM hremployee
''').show() 

### NTILE()
* Dividing Records into number of quarters(percentiles)

In [None]:
spark.sql('''
SELECT 
    EmployeeID,
    Department,
    JobRole,
    Age,
    Gender,
    Income,
    NTILE(4) OVER(ORDER BY Income) as salary_quartiles
FROM hremployee
''').show() 

### Find the Number of Employees in each percentile group 0-25th,25-50th,50-75th,75-100th using percent_rank() and create a new category using case when.

In [None]:
spark.sql('''
SELECT 
    category,
    COUNT(*) AS count
FROM (
    SELECT 
        EmployeeID,
        Income,
        percentile_rank,
        CASE 
            WHEN percentile_rank <= 0.25 THEN '0-25th'
            WHEN percentile_rank <= 0.50 THEN '25-50th'
            WHEN percentile_rank <= 0.75 THEN '50-75th'
            ELSE '75-100th'
        END AS category
    FROM (
        SELECT 
            EmployeeID,
            Income,
            PERCENT_RANK() OVER (partition by Department ORDER BY Income) AS percentile_rank
        FROM hremployee
    ) AS Percentiles
) 
GROUP BY category
ORDER by category
'''
).show()

### Hive Integration with PySpark

In [None]:
spark.stop()

In [None]:
!jps

In [None]:
# Spark integration with Hive Warehouse
# congif for Hive-integration property-name "spark.sql.warehouse.dir"
# value = "/user/hive/warehouse"
spark = (SparkSession.builder.appName("pyspark-Hive-Integration")
        .config("spark.sql.warehouse.dir","/usr/hive/warehouse")
        .enableHiveSupport().getOrCreate())

In [None]:
spark.sql('show databases').show()

In [None]:
spark.sql("""
CREATE DATABASE IF NOT EXISTS airlines
""")

In [None]:
spark.sql(""" SHOW databases""").show()

In [None]:
spark.sql(""" USE AIRLINES""")

In [None]:
spark.sql(""" SHOW TABLES""").show()

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS FLIGHTS(DayofMonth INT,
DayOfWeek INT,Carrier VARCHAR(10),OriginAirportID INT,DestAirportID INT,DepDelay INT,ArrDelay INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

In [None]:
spark.sql('''SHOW TABLES''').show()

In [None]:
spark.sql("""LOAD DATA LOCAL INPATH '/home/hadoop/Downloads/raw_flight_data1.csv'
OVERWRITE INTO TABLE flights""")

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS airports(airport_id INT,city VARCHAR(50),state VARCHAR(50),name VARCHAR(50))
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')""")

In [None]:
spark.sql("""LOAD DATA LOCAL INPATH '/home/hadoop/Downloads/airports1.csv'
OVERWRITE INTO TABLE airports""")

In [None]:
spark.sql(""" select * from airports""").show()

In [None]:
spark.sql(""" select * from flights""").show()

In [None]:
spark.sql(""" select count(*) from flights""").show()

### 1.Extract

In [11]:
flights_df = spark.table('airlines.flights')

In [12]:
airport_df = spark.table('airlines.airports')

In [13]:
flights_df.show(2)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [None]:
airport_df.show(2)

### 2.Transformation

In [14]:
flights_join = flights_df.join(airport_df,on = flights_df.OriginAirportID==airport_df.airport_id,how='inner')

In [None]:
flights_join.show(2)

### 3.Load

In [15]:
flights_join = flights_join.repartition(4)

In [16]:
flights_join.write.parquet("file:///home/hadoop/Downloads/flights")

AnalysisException: 'path file:/home/hadoop/Downloads/flights already exists.;'

In [None]:
flights_parquet_df = spark.read.parquet("file:///home/hadoop/Downloads/flights/")

In [None]:
flights_parquet_df.show(5)

In [None]:
flights_parquet_df.write.parquet("/flights1")

In [20]:
flights_join.write.partitionBy('Carrier').parquet('/airlines')

AnalysisException: 'path hdfs://localhost:9000/airlines already exists.;'

In [17]:
flights_join.show()

+----------+---------+-------+---------------+-------------+--------+--------+----------+-----------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|             city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+-----------------+-----+--------------------+
|         4|        7|     UA|          13930|        12953|     115|      91|     13930|          Chicago|   IL|Chicago O'Hare In...|
|        28|        7|     AS|          14679|        13830|     -11|     -27|     14679|        San Diego|   CA|San Diego Interna...|
|         3|        4|     WN|          11259|        12191|      28|      29|     11259|           Dallas|   TX|   Dallas Love Field|
|        29|        6|     WN|          11292|        13204|       6|     -11|     11292|           Denver|   CO|Denver International|
|         1|        3|     B6|          13204|        1

In [22]:
flights_join.write.bucketBy(col='state',numBuckets = 50).format("csv").saveAsTable("bucket_table")

In [23]:
flights_join.write.partitionBy('Carrier').bucketBy(col = 'state',numBuckets = 30)\
.format("parquet").saveAsTable("part_bucket_table")

In [24]:
spark.sql("select carrier,count(*) from part_bucket_table group by carrier").show()

+-------+--------+
|carrier|count(1)|
+-------+--------+
|     UA|  287601|
|     AA|  291771|
|     EV|  158253|
|     B6|  122297|
|     DL|  385040|
|     OO|  161102|
|     F9|   35821|
|     YV|   53022|
|     US|  235031|
|     MQ|  113634|
|     HA|   18658|
|     AS|   69056|
|     FL|   93013|
|     VX|   34869|
|     WN|  580029|
|     9E|   80221|
+-------+--------+



### Load on MYSQL

In [25]:
connection_properties = {
    'user':"root",
    'password':"hadoop@123",
    'driver':'com.mysql.cj.jdbc.Driver'
}
flights_join.write.jdbc(url = "jdbc:mysql://localhost:3306/flights", table = "airlines",
                        mode = "overwrite",properties = connection_properties)

In [26]:
spark.stop()

In [27]:
sc.stop()