<a href="https://colab.research.google.com/github/AkshaySarkar/Employee-Big-Data-Processing/blob/main/BigData_Processing2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Installing required packages

!pip install pyspark  findspark wget


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark, wget
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=e1fb854560339142307a42db9f4efdc4391496b5ec90ffb5e49fdd7c541ed8b1
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=b5b8312c0d59bc84eab0ba7588b8d4b81e6b55165476f4d9e24

In [None]:
import findspark

findspark.init()

In [None]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession


In [None]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [None]:
# Downloading the CSV data first into a local `employees.csv` file
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

'employees.csv'

In [None]:
# Reading data from the "emp" CSV file and import it into a DataFrame variable named "employees_df"
df = spark.read.csv("employees.csv", inferSchema=True)

In [None]:
# Defining a Schema for the input data and read the file using the user-defined Schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("Emp_No", IntegerType(), True),
    StructField("Emp_Name", StringType(), True),
    StructField("Salary", IntegerType(), True),
     StructField("Age", IntegerType(), True),
     StructField("Department", StringType(), True)
])

# Reading the file with the user-defined schema
df = spark.read.csv("employees.csv", schema=schema)
df.show()

+------+---------+------+----+----------+
|Emp_No| Emp_Name|Salary| Age|Department|
+------+---------+------+----+----------+
|  NULL| Emp_Name|  NULL|NULL|Department|
|   198|   Donald|  2600|  29|        IT|
|   199|  Douglas|  2600|  34|     Sales|
|   200| Jennifer|  4400|  36| Marketing|
|   201|  Michael| 13000|  32|        IT|
|   202|      Pat|  6000|  39|        HR|
|   203|    Susan|  6500|  36| Marketing|
|   204|  Hermann| 10000|  29|   Finance|
|   205|  Shelley| 12008|  33|   Finance|
|   206|  William|  8300|  37|        IT|
|   100|   Steven| 24000|  39|        IT|
|   101|    Neena| 17000|  27|     Sales|
|   102|      Lex| 17000|  37| Marketing|
|   103|Alexander|  9000|  39| Marketing|
|   104|    Bruce|  6000|  38|        IT|
|   105|    David|  4800|  39|        IT|
|   106|    Valli|  4800|  38|     Sales|
|   107|    Diana|  4200|  35|     Sales|
|   108|    Nancy| 12008|  28|     Sales|
|   109|   Daniel|  9000|  35|        HR|
+------+---------+------+----+----

In [None]:
# Displaying all columns of the DataFrame, along with their respective data types
df.printSchema()

root
 |-- Emp_No: integer (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



In [None]:
# Creating a temporary view named "employees" for the DataFrame
df.createOrReplaceTempView("employees")

In [None]:
# SQL query to fetch solely the records from the View where the age exceeds 30
result = spark.sql("SELECT * FROM employees WHERE Age > 30")
result.show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   199|    Douglas|  2600| 34|     Sales|
|   200|   Jennifer|  4400| 36| Marketing|
|   201|    Michael| 13000| 32|        IT|
|   202|        Pat|  6000| 39|        HR|
|   203|      Susan|  6500| 36| Marketing|
|   205|    Shelley| 12008| 33|   Finance|
|   206|    William|  8300| 37|        IT|
|   100|     Steven| 24000| 39|        IT|
|   102|        Lex| 17000| 37| Marketing|
|   103|  Alexander|  9000| 39| Marketing|
|   104|      Bruce|  6000| 38|        IT|
|   105|      David|  4800| 39|        IT|
|   106|      Valli|  4800| 38|     Sales|
|   107|      Diana|  4200| 35|     Sales|
|   109|     Daniel|  9000| 35|        HR|
|   110|       John|  8200| 31| Marketing|
|   111|     Ismael|  7700| 32|        IT|
|   112|Jose Manuel|  7800| 34|        HR|
|   113|       Luis|  6900| 34|     Sales|
|   116|     Shelli|  2900| 37|   Finance|
+------+---

In [None]:
# calculating the average salary of employees grouped by department
result = spark.sql("SELECT Department, AVG(Salary) AS avg_salary FROM employees GROUP BY Department")
result.show()

+----------+-----------------+
|Department|       avg_salary|
+----------+-----------------+
|     Sales|5492.923076923077|
|        HR|           5837.5|
|Department|             NULL|
|   Finance|           5730.8|
| Marketing|6633.333333333333|
|        IT|           7400.0|
+----------+-----------------+



In [None]:
# Applying a filter to select records where the department is 'IT'
result = spark.sql("SELECT * FROM employees WHERE Department = 'IT'")
result.show()

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



In [None]:
from pyspark.sql.functions import col

# Adding a new column "SalaryAfterBonus" with 10% bonus added to the original salary
df = df.withColumn("SalaryAfterBonus", col("Salary") * 1.1)
df.show()

+------+---------+------+----+----------+------------------+
|Emp_No| Emp_Name|Salary| Age|Department|  SalaryAfterBonus|
+------+---------+------+----+----------+------------------+
|  NULL| Emp_Name|  NULL|NULL|Department|              NULL|
|   198|   Donald|  2600|  29|        IT|2860.0000000000005|
|   199|  Douglas|  2600|  34|     Sales|2860.0000000000005|
|   200| Jennifer|  4400|  36| Marketing|            4840.0|
|   201|  Michael| 13000|  32|        IT|14300.000000000002|
|   202|      Pat|  6000|  39|        HR| 6600.000000000001|
|   203|    Susan|  6500|  36| Marketing| 7150.000000000001|
|   204|  Hermann| 10000|  29|   Finance|           11000.0|
|   205|  Shelley| 12008|  33|   Finance|13208.800000000001|
|   206|  William|  8300|  37|        IT|            9130.0|
|   100|   Steven| 24000|  39|        IT|26400.000000000004|
|   101|    Neena| 17000|  27|     Sales|           18700.0|
|   102|      Lex| 17000|  37| Marketing|           18700.0|
|   103|Alexander|  9000

In [None]:
from pyspark.sql.functions import max
from pyspark.sql import functions as F

# Grouping data by age and calculate the maximum salary for each age group
result = df.groupBy("Age").agg(F.max("Salary").alias("max_salary"))
result.show()

+----+----------+
| Age|max_salary|
+----+----------+
|  31|      8200|
|  34|      7800|
|  28|     12008|
|  27|     17000|
|  26|      3600|
|NULL|      NULL|
|  37|     17000|
|  35|      9000|
|  39|     24000|
|  38|      6000|
|  29|     10000|
|  32|     13000|
|  33|     12008|
|  30|      8000|
|  36|      7900|
+----+----------+



In [None]:
# Joining the DataFrame with itself based on the "Emp_No" column
result = df.alias("df1").join(df.alias("df2"), "Emp_No")
result.show()

+------+---------+------+---+----------+------------------+---------+------+---+----------+------------------+
|Emp_No| Emp_Name|Salary|Age|Department|  SalaryAfterBonus| Emp_Name|Salary|Age|Department|  SalaryAfterBonus|
+------+---------+------+---+----------+------------------+---------+------+---+----------+------------------+
|   198|   Donald|  2600| 29|        IT|2860.0000000000005|   Donald|  2600| 29|        IT|2860.0000000000005|
|   199|  Douglas|  2600| 34|     Sales|2860.0000000000005|  Douglas|  2600| 34|     Sales|2860.0000000000005|
|   200| Jennifer|  4400| 36| Marketing|            4840.0| Jennifer|  4400| 36| Marketing|            4840.0|
|   201|  Michael| 13000| 32|        IT|14300.000000000002|  Michael| 13000| 32|        IT|14300.000000000002|
|   202|      Pat|  6000| 39|        HR| 6600.000000000001|      Pat|  6000| 39|        HR| 6600.000000000001|
|   203|    Susan|  6500| 36| Marketing| 7150.000000000001|    Susan|  6500| 36| Marketing| 7150.000000000001|
|

In [None]:
# Calculating the average age of employees
from pyspark.sql.functions import avg
average_age = df.agg(avg("Age").alias("average_age"))

average_age.show()

+-----------+
|average_age|
+-----------+
|      33.56|
+-----------+



In [None]:
#Calculating the total salary for each department
from pyspark.sql.functions import sum
total_salary_per_department = df.groupBy("department").agg(sum("salary").alias("total_salary"))

total_salary_per_department.show()

+----------+------------+
|department|total_salary|
+----------+------------+
|     Sales|       71408|
|        HR|       46700|
|Department|        NULL|
|   Finance|       57308|
| Marketing|       59700|
|        IT|       74000|
+----------+------------+



In [None]:
# Sorting the DataFrame by age in ascending order and then by salary in descending order
from pyspark.sql import functions as F
sorted_df = df.orderBy(F.asc("Age"),F.desc("Salary"))

sorted_df.show()

+------+---------+------+----+----------+------------------+
|Emp_No| Emp_Name|Salary| Age|Department|  SalaryAfterBonus|
+------+---------+------+----+----------+------------------+
|  NULL| Emp_Name|  NULL|NULL|Department|              NULL|
|   137|   Renske|  3600|  26| Marketing|3960.0000000000005|
|   101|    Neena| 17000|  27|     Sales|           18700.0|
|   114|      Den| 11000|  27|   Finance|12100.000000000002|
|   108|    Nancy| 12008|  28|     Sales|13208.800000000001|
|   130|    Mozhe|  2800|  28| Marketing|3080.0000000000005|
|   126|    Irene|  2700|  28|        HR|2970.0000000000005|
|   204|  Hermann| 10000|  29|   Finance|           11000.0|
|   115|Alexander|  3100|  29|   Finance|3410.0000000000005|
|   134|  Michael|  2900|  29|     Sales|3190.0000000000005|
|   198|   Donald|  2600|  29|        IT|2860.0000000000005|
|   140|   Joshua|  2500|  29|   Finance|            2750.0|
|   136|    Hazel|  2200|  29|        IT|            2420.0|
|   120|  Matthew|  8000

In [None]:
from pyspark.sql.functions import count

employee_count_per_department = df.groupBy("Department").count()

employee_count_per_department.show()

+----------+-----+
|Department|count|
+----------+-----+
|     Sales|   13|
|        HR|    8|
|Department|    1|
|   Finance|   10|
| Marketing|    9|
|        IT|   10|
+----------+-----+



In [None]:
# Applying a filter to select records where the employee's name contains the letter 'o'
from pyspark.sql.functions import col

filtered_df = df.filter(col("Emp_Name").like("%o%"))

filtered_df.show()

+------+-----------+------+---+----------+------------------+
|Emp_No|   Emp_Name|Salary|Age|Department|  SalaryAfterBonus|
+------+-----------+------+---+----------+------------------+
|   198|     Donald|  2600| 29|        IT|2860.0000000000005|
|   199|    Douglas|  2600| 34|     Sales|2860.0000000000005|
|   110|       John|  8200| 31| Marketing|            9020.0|
|   112|Jose Manuel|  7800| 34|        HR|            8580.0|
|   130|      Mozhe|  2800| 28| Marketing|3080.0000000000005|
|   133|      Jason|  3300| 38|     Sales|3630.0000000000005|
|   139|       John|  2700| 36|     Sales|2970.0000000000005|
|   140|     Joshua|  2500| 29|   Finance|            2750.0|
+------+-----------+------+---+----------+------------------+

