# Data Analysis using Spark

 create a DataFrame by loading data from a CSV file and apply transformations and actions using Spark SQL.


In [1]:
# Installing required packages

!pip install pyspark  findspark wget


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark, wget
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=879804f9a02535f2f333cec59f8e64bc49709b264f2ffccb4834cc1040989bcc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=e9837c81733377af37de71a1a6398a24feb9fdcab1a565a6ea2

In [2]:
import findspark

findspark.init()

In [3]:

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

2. Download the CSV data.


In [5]:
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

'employees.csv'

####  Generate a Spark DataFrame from the CSV data

Read data from the provided CSV file, `employees.csv` and import it into a Spark DataFrame variable named `employees_df`.




In [6]:
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True)


####  Define a schema for the data

Construct a schema for the input data and then utilize the defined schema to read the CSV file to create a DataFrame named `employees_df`.


In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("Emp_No", IntegerType(), True),
    StructField("Emp_Name", StringType(), True),
    StructField("Salary", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Department", StringType(), True)
])
employees_df = spark.read.csv("employees.csv", header=True, schema=schema)

####  Display schema of DataFrame

Display the schema of the `employees_df` DataFrame, showing all columns and their respective data types.


In [8]:
# Display all columns of the DataFrame, along with their respective data types
employees_df.printSchema()

root
 |-- Emp_No: integer (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



####  Create a temporary view

Create a temporary view named `employees` for the `employees_df` DataFrame, enabling Spark SQL queries on the data.


In [9]:
# Create a temporary view named "employees" for the DataFrame
employees_df.createOrReplaceTempView("employees")

####  Execute an SQL query

Compose and execute an SQL query to fetch the records from the `employees` view where the age of employees exceeds 30. Then, display the result of the SQL query, showcasing the filtered records.


In [10]:
spark.sql("SELECT * FROM employees WHERE age>30").show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   199|    Douglas|  2600| 34|     Sales|
|   200|   Jennifer|  4400| 36| Marketing|
|   201|    Michael| 13000| 32|        IT|
|   202|        Pat|  6000| 39|        HR|
|   203|      Susan|  6500| 36| Marketing|
|   205|    Shelley| 12008| 33|   Finance|
|   206|    William|  8300| 37|        IT|
|   100|     Steven| 24000| 39|        IT|
|   102|        Lex| 17000| 37| Marketing|
|   103|  Alexander|  9000| 39| Marketing|
|   104|      Bruce|  6000| 38|        IT|
|   105|      David|  4800| 39|        IT|
|   106|      Valli|  4800| 38|     Sales|
|   107|      Diana|  4200| 35|     Sales|
|   109|     Daniel|  9000| 35|        HR|
|   110|       John|  8200| 31| Marketing|
|   111|     Ismael|  7700| 32|        IT|
|   112|Jose Manuel|  7800| 34|        HR|
|   113|       Luis|  6900| 34|     Sales|
|   116|     Shelli|  2900| 37|   Finance|
+------+---

#### Calculate Average Salary by Department

Compose an SQL query to retrieve the average salary of employees grouped by department. Display the result.


In [11]:
# SQL query to calculate the average salary of employees grouped by department
spark.sql("SELECT Department, AVG(Salary) AS avgr FROM employees GROUP BY Department").show()

+----------+-----------------+
|Department|             avgr|
+----------+-----------------+
|     Sales|5492.923076923077|
|        HR|           5837.5|
|   Finance|           5730.8|
| Marketing|6633.333333333333|
|        IT|           7400.0|
+----------+-----------------+



####  Filter and Display IT Department Employees

Apply a filter on the `employees_df` DataFrame to select records where the department is `'IT'`. Display the filtered DataFrame.


In [12]:
# Apply a filter to select records where the department is 'IT'
spark.sql("SELECT *  FROM employees WHERE Department='IT'").show()


+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



####  Add 10% Bonus to Salaries

Perform a transformation to add a new column named "SalaryAfterBonus" to the DataFrame. Calculate the new salary by adding a 10% bonus to each employee's salary.


In [13]:
!pip install PyArrow



In [14]:
from pyspark.sql.functions import col
from pyspark.sql.functions import expr

# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary
employees_df = employees_df.withColumn("SalaryAfterBonus", expr("Salary * 1.1"))


####  Maximum Salary by Age

Group the data by age and calculate the maximum salary for each age group. Display the result.


In [15]:
from pyspark.sql.functions import max

# Group data by age and calculate the maximum salary for each age group
spark.sql("SELECT max(Salary),Age as maxsal  FROM employees GROUP BY Age").show()


+-----------+------+
|max(Salary)|maxsal|
+-----------+------+
|       8200|    31|
|       7800|    34|
|      12008|    28|
|      17000|    27|
|       3600|    26|
|      17000|    37|
|       9000|    35|
|      24000|    39|
|       6000|    38|
|      10000|    29|
|      13000|    32|
|      12008|    33|
|       8000|    30|
|       7900|    36|
+-----------+------+



####  Self-Join on Employee Data

Join the "employees_df" DataFrame with itself based on the "Emp_No" column. Display the result.


In [16]:
# Join the DataFrame with itself based on the "Emp_No" column
joined_df = employees_df.join(employees_df, 'Emp_No', 'inner')
print(joined_df)

DataFrame[Emp_No: int, Emp_Name: string, Salary: int, Age: int, Department: string, SalaryAfterBonus: decimal(13,1), Emp_Name: string, Salary: int, Age: int, Department: string, SalaryAfterBonus: decimal(13,1)]


#### Calculate Average Employee Age

Calculate the average age of employees using the built-in aggregation function. Display the result.


In [17]:
# Calculate the average age of employees
from pyspark.sql.functions import avg
average_age_df = employees_df.agg(avg("Age").alias("AverageAge"))
average_age_df.show()


+----------+
|AverageAge|
+----------+
|     33.56|
+----------+



#### Calculate Total Salary by Department

Calculate the total salary for each department using the built-in aggregation function. Display the result.


In [18]:
# Calculate the total salary for each department. Hint - User GroupBy and Aggregate functions
from pyspark.sql.functions import sum
sum_dep_df = employees_df.groupBy('Department').agg(sum('Salary').alias('SumOfSal'))
sum_dep_df.show()


+----------+--------+
|Department|SumOfSal|
+----------+--------+
|     Sales|   71408|
|        HR|   46700|
|   Finance|   57308|
| Marketing|   59700|
|        IT|   74000|
+----------+--------+



#### Sort Data by Age and Salary

Apply a transformation to sort the DataFrame by age in ascending order and then by salary in descending order. Display the sorted DataFrame.


In [None]:
# Sort the DataFrame by age in ascending order and then by salary in descending order
sorted_df = employees_df.orderBy(col("Age").asc(), col("Salary").desc())


#### Count Employees in Each Department

Calculate the number of employees in each department. Display the result.


In [19]:
from pyspark.sql.functions import count

# Calculate the number of employees in each department
count_dep_df = employees_df.groupBy('Department').agg(count('Emp_No').alias('countofemb'))
count_dep_df.show()

+----------+----------+
|Department|countofemb|
+----------+----------+
|     Sales|        13|
|        HR|         8|
|   Finance|        10|
| Marketing|         9|
|        IT|        10|
+----------+----------+



####  Filter Employees with the letter o in the Name

Apply a filter to select records where the employee's name contains the letter `'o'`. Display the filtered DataFrame.


In [21]:
# Apply a filter to select records where the employee's name contains the letter 'o'
spark.sql("SELECT * FROM employees WHERE Emp_Name LIKE '%o%'").show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   198|     Donald|  2600| 29|        IT|
|   199|    Douglas|  2600| 34|     Sales|
|   110|       John|  8200| 31| Marketing|
|   112|Jose Manuel|  7800| 34|        HR|
|   130|      Mozhe|  2800| 28| Marketing|
|   133|      Jason|  3300| 38|     Sales|
|   139|       John|  2700| 36|     Sales|
|   140|     Joshua|  2500| 29|   Finance|
+------+-----------+------+---+----------+



####  This project was created as part of earning the IBM Data Engineering Professional Certificate, using the work from Raghul Ramesh & Lavanya T S