In [1]:
!pip install wget pyspark  findspark


Defaulting to user installation because normal site-packages is not writeable


In [2]:
import findspark

findspark.init()


In [3]:
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/04/25 13:54:24 WARN Utils: Your hostname, javier-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.81.128 instead (on interface ens33)
24/04/25 13:54:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/25 13:54:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
import wget

link_to_data = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv"
wget.download(link_to_data)

'employees.csv'

In [7]:
#Generate a Spark DataFrame from the CSV data
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True)

In [9]:
employees_df.printSchema()

root
 |-- Emp_No: integer (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



In [10]:
# Create a temporary view
employees_df.createOrReplaceTempView("employees")

In [13]:
# SQL query to fetch solely the records from the View where the age exceeds 30
sql_query = """
SELECT *
FROM employees
WHERE age > 30
"""

df_sql1 = spark.sql(sql_query)


In [14]:
df_sql1.show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   199|    Douglas|  2600| 34|     Sales|
|   200|   Jennifer|  4400| 36| Marketing|
|   201|    Michael| 13000| 32|        IT|
|   202|        Pat|  6000| 39|        HR|
|   203|      Susan|  6500| 36| Marketing|
|   205|    Shelley| 12008| 33|   Finance|
|   206|    William|  8300| 37|        IT|
|   100|     Steven| 24000| 39|        IT|
|   102|        Lex| 17000| 37| Marketing|
|   103|  Alexander|  9000| 39| Marketing|
|   104|      Bruce|  6000| 38|        IT|
|   105|      David|  4800| 39|        IT|
|   106|      Valli|  4800| 38|     Sales|
|   107|      Diana|  4200| 35|     Sales|
|   109|     Daniel|  9000| 35|        HR|
|   110|       John|  8200| 31| Marketing|
|   111|     Ismael|  7700| 32|        IT|
|   112|Jose Manuel|  7800| 34|        HR|
|   113|       Luis|  6900| 34|     Sales|
|   116|     Shelli|  2900| 37|   Finance|
+------+---

In [16]:
# SQL query to calculate the average salary of employees grouped by department
sql_query = """
SELECT department, ROUND(AVG(salary)) AS average_salary
FROM employees
GROUP BY department
"""

df_sql2 = spark.sql(sql_query)
df_sql2.show()

+----------+--------------+
|department|average_salary|
+----------+--------------+
|     Sales|        5493.0|
|        HR|        5838.0|
|   Finance|        5731.0|
| Marketing|        6633.0|
|        IT|        7400.0|
+----------+--------------+



In [19]:
# Apply a filter to select records where the department is 'IT'

filtered_df = employees_df.filter(employees_df.Department == 'IT')
filtered_df.show()

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



In [22]:
# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary

from pyspark.sql.functions import col, round

employees_df_with_bonus = employees_df.withColumn("SalaryAfterBonus", round(col("salary") * 1.10))

employees_df_with_bonus.show()


[Stage 14:>                                                         (0 + 0) / 1]

+------+---------+------+---+----------+----------------+
|Emp_No| Emp_Name|Salary|Age|Department|SalaryAfterBonus|
+------+---------+------+---+----------+----------------+
|   198|   Donald|  2600| 29|        IT|          2860.0|
|   199|  Douglas|  2600| 34|     Sales|          2860.0|
|   200| Jennifer|  4400| 36| Marketing|          4840.0|
|   201|  Michael| 13000| 32|        IT|         14300.0|
|   202|      Pat|  6000| 39|        HR|          6600.0|
|   203|    Susan|  6500| 36| Marketing|          7150.0|
|   204|  Hermann| 10000| 29|   Finance|         11000.0|
|   205|  Shelley| 12008| 33|   Finance|         13209.0|
|   206|  William|  8300| 37|        IT|          9130.0|
|   100|   Steven| 24000| 39|        IT|         26400.0|
|   101|    Neena| 17000| 27|     Sales|         18700.0|
|   102|      Lex| 17000| 37| Marketing|         18700.0|
|   103|Alexander|  9000| 39| Marketing|          9900.0|
|   104|    Bruce|  6000| 38|        IT|          6600.0|
|   105|    Da

                                                                                

In [27]:
# Group data by age and calculate the maximum salary for each age group

from pyspark.sql.functions import max

max_salary_by_age = employees_df.groupBy("age").agg(max("salary").alias("MaxSalary")).orderBy("age")

max_salary_by_age.show()


+---+---------+
|age|MaxSalary|
+---+---------+
| 26|     3600|
| 27|    17000|
| 28|    12008|
| 29|    10000|
| 30|     8000|
| 31|     8200|
| 32|    13000|
| 33|    12008|
| 34|     7800|
| 35|     9000|
| 36|     7900|
| 37|    17000|
| 38|     6000|
| 39|    24000|
+---+---------+



In [28]:
# Join the DataFrame with itself based on the "Emp_No" column

joined_df = employees_df.alias("df1").join(employees_df.alias("df2"), "Emp_No")

joined_df.show()


+------+---------+------+---+----------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|    Neena| 

In [29]:
# Calculate the average age of employees

from pyspark.sql.functions import avg 

average_age = employees_df.select(avg("Age").alias("AverageAge"))

average_age.show()

+----------+
|AverageAge|
+----------+
|     33.56|
+----------+



In [31]:
# Calculate the total salary for each department
from pyspark.sql.functions import sum 

total_salary_per_department = employees_df.groupBy("Department").agg(sum("Salary").alias("TotalSalary"))


total_salary_per_department.show()


[Stage 35:>                                                         (0 + 1) / 1]

+----------+-----------+
|Department|TotalSalary|
+----------+-----------+
|     Sales|      71408|
|        HR|      46700|
|   Finance|      57308|
| Marketing|      59700|
|        IT|      74000|
+----------+-----------+



                                                                                

In [34]:
# Sort the DataFrame by age in ascending order and then by salary in descending order

from pyspark.sql.functions import asc, desc

sorted_df = employees_df.orderBy(asc("Age"), desc("Salary"))

sorted_df.show()


+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   137|   Renske|  3600| 26| Marketing|
|   101|    Neena| 17000| 27|     Sales|
|   114|      Den| 11000| 27|   Finance|
|   108|    Nancy| 12008| 28|     Sales|
|   130|    Mozhe|  2800| 28| Marketing|
|   126|    Irene|  2700| 28|        HR|
|   204|  Hermann| 10000| 29|   Finance|
|   115|Alexander|  3100| 29|   Finance|
|   134|  Michael|  2900| 29|     Sales|
|   198|   Donald|  2600| 29|        IT|
|   140|   Joshua|  2500| 29|   Finance|
|   136|    Hazel|  2200| 29|        IT|
|   120|  Matthew|  8000| 30|        HR|
|   110|     John|  8200| 31| Marketing|
|   127|    James|  2400| 31|        HR|
|   201|  Michael| 13000| 32|        IT|
|   111|   Ismael|  7700| 32|        IT|
|   119|    Karen|  2500| 32|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   124|    Kevin|  5800| 33| Marketing|
+------+---------+------+---+----------+
only showing top

In [35]:
# Calculate the number of employees in each department

from pyspark.sql.functions import count

dept_employee_count = employees_df.groupBy("Department").agg(count("Emp_No").alias("EmployeeCount"))

dept_employee_count.show()


+----------+-------------+
|Department|EmployeeCount|
+----------+-------------+
|     Sales|           13|
|        HR|            8|
|   Finance|           10|
| Marketing|            9|
|        IT|           10|
+----------+-------------+



In [36]:
# Apply a filter to select records where the employee's name contains the letter 'o'

filtered_df = employees_df.filter(employees_df["Emp_Name"].like("%o%"))

filtered_df.show()


+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   198|     Donald|  2600| 29|        IT|
|   199|    Douglas|  2600| 34|     Sales|
|   110|       John|  8200| 31| Marketing|
|   112|Jose Manuel|  7800| 34|        HR|
|   130|      Mozhe|  2800| 28| Marketing|
|   133|      Jason|  3300| 38|     Sales|
|   139|       John|  2700| 36|     Sales|
|   140|     Joshua|  2500| 29|   Finance|
+------+-----------+------+---+----------+



In [37]:
spark.stop()