In [1]:

!pip install pyspark  findspark wget



In [2]:

import findspark

findspark.init()

In [3]:

# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:

# Creating a SparkContext object  

sc = SparkContext.getOrCreate()

# Creating a SparkSession  

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/09/23 22:57:18 WARN Utils: Your hostname, gabriel-BOD-WXX9 resolves to a loopback address: 127.0.1.1; using 192.168.204.211 instead (on interface wlp0s20f3)
24/09/23 22:57:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/23 22:57:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:

# Download the CSV data first into a local `employees.csv` file
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

'employees (1).csv'

In [6]:

from pyspark.sql.types import *

In [7]:
employees_df = spark.read.option("header","true").csv("employees.csv")

In [8]:
employees_df.show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+
only showing top

**defining schema**

In [9]:
employees_df_schema = StructType([
    StructField("Emp_no", StringType(),nullable=True),
    StructField("Emp_name",StringType(),True),
    StructField("Salary",IntegerType(),True),
    StructField("Age",IntegerType(),True),
    StructField("Department",StringType(),True),
    ])

In [10]:
employees_df = spark.read.option("header","true").schema(employees_df_schema).csv("employees.csv")
employees_df.printSchema()

root
 |-- Emp_no: string (nullable = true)
 |-- Emp_name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



**creating temp view and using sparkql**

In [11]:
employees_df.createTempView("employees")

**age >30**

In [14]:
spark.sql("SELECT * FROM employees WHERE age >30 ").show(5)

+------+--------+------+---+----------+
|Emp_no|Emp_name|Salary|Age|Department|
+------+--------+------+---+----------+
|   199| Douglas|  2600| 34|     Sales|
|   200|Jennifer|  4400| 36| Marketing|
|   201| Michael| 13000| 32|        IT|
|   202|     Pat|  6000| 39|        HR|
|   203|   Susan|  6500| 36| Marketing|
+------+--------+------+---+----------+
only showing top 5 rows



**avg salary by dpt**

In [17]:
spark.sql(" SELECT ROUND(AVG(Salary),2) as avg_salary, Department FROM employees GROUP BY Department").show()

+----------+----------+
|avg_salary|Department|
+----------+----------+
|   5492.92|     Sales|
|    5837.5|        HR|
|    5730.8|   Finance|
|   6633.33| Marketing|
|    7400.0|        IT|
+----------+----------+



**filter department IT**

In [20]:
employees_df.filter("Department =='IT'").show()

+------+--------+------+---+----------+
|Emp_no|Emp_name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



In [46]:
from pyspark.sql.functions import col,round,max,avg,sum,count

In [32]:
employees_df = employees_df.withColumn("salary_after_bonus",col("Salary")*1.10)

employees_df.show(3)

+------+--------+------+---+----------+------------------+
|Emp_no|Emp_name|Salary|Age|Department|salary_after_bonus|
+------+--------+------+---+----------+------------------+
|   198|  Donald|  2600| 29|        IT|2860.0000000000005|
|   199| Douglas|  2600| 34|     Sales|2860.0000000000005|
|   200|Jennifer|  4400| 36| Marketing|            4840.0|
+------+--------+------+---+----------+------------------+
only showing top 3 rows



In [37]:
employees_df.groupBy("Age").agg(max("Salary")).show(3)

+---+-----------+
|Age|max(Salary)|
+---+-----------+
| 31|       8200|
| 34|       7800|
| 28|      12008|
+---+-----------+
only showing top 3 rows



**self join**

In [39]:
employees_df.join(employees_df,'Emp_no','inner').show()

+------+---------+------+---+----------+------------------+---------+------+---+----------+------------------+
|Emp_no| Emp_name|Salary|Age|Department|salary_after_bonus| Emp_name|Salary|Age|Department|salary_after_bonus|
+------+---------+------+---+----------+------------------+---------+------+---+----------+------------------+
|   198|   Donald|  2600| 29|        IT|2860.0000000000005|   Donald|  2600| 29|        IT|2860.0000000000005|
|   199|  Douglas|  2600| 34|     Sales|2860.0000000000005|  Douglas|  2600| 34|     Sales|2860.0000000000005|
|   200| Jennifer|  4400| 36| Marketing|            4840.0| Jennifer|  4400| 36| Marketing|            4840.0|
|   201|  Michael| 13000| 32|        IT|14300.000000000002|  Michael| 13000| 32|        IT|14300.000000000002|
|   202|      Pat|  6000| 39|        HR| 6600.000000000001|      Pat|  6000| 39|        HR| 6600.000000000001|
|   203|    Susan|  6500| 36| Marketing| 7150.000000000001|    Susan|  6500| 36| Marketing| 7150.000000000001|
|

**average age**

In [43]:

employees_df.agg(avg("Age")).show()

+--------+
|avg(Age)|
+--------+
|   33.56|
+--------+



In [44]:
employees_df.groupBy("Department").agg(sum("Salary")).show()

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|      71408|
|        HR|      46700|
|   Finance|      57308|
| Marketing|      59700|
|        IT|      74000|
+----------+-----------+



#### Sort Data by Age and Salary

In [45]:
employees_df.sort("Age","Salary",ascending=[True,False]).show()

+------+---------+------+---+----------+------------------+
|Emp_no| Emp_name|Salary|Age|Department|salary_after_bonus|
+------+---------+------+---+----------+------------------+
|   137|   Renske|  3600| 26| Marketing|3960.0000000000005|
|   101|    Neena| 17000| 27|     Sales|           18700.0|
|   114|      Den| 11000| 27|   Finance|12100.000000000002|
|   108|    Nancy| 12008| 28|     Sales|13208.800000000001|
|   130|    Mozhe|  2800| 28| Marketing|3080.0000000000005|
|   126|    Irene|  2700| 28|        HR|2970.0000000000005|
|   204|  Hermann| 10000| 29|   Finance|           11000.0|
|   115|Alexander|  3100| 29|   Finance|3410.0000000000005|
|   134|  Michael|  2900| 29|     Sales|3190.0000000000005|
|   198|   Donald|  2600| 29|        IT|2860.0000000000005|
|   140|   Joshua|  2500| 29|   Finance|            2750.0|
|   136|    Hazel|  2200| 29|        IT|            2420.0|
|   120|  Matthew|  8000| 30|        HR|            8800.0|
|   110|     John|  8200| 31| Marketing|

**employee by dpt**

In [48]:
employees_df.groupBy("Department").agg(count("Emp_no")).show()

+----------+-------------+
|Department|count(Emp_no)|
+----------+-------------+
|     Sales|           13|
|        HR|            8|
|   Finance|           10|
| Marketing|            9|
|        IT|           10|
+----------+-------------+



**employee name has o**

In [50]:
employees_df.filter(employees_df["Emp_name"].like("%o%")).show()

+------+-----------+------+---+----------+------------------+
|Emp_no|   Emp_name|Salary|Age|Department|salary_after_bonus|
+------+-----------+------+---+----------+------------------+
|   198|     Donald|  2600| 29|        IT|2860.0000000000005|
|   199|    Douglas|  2600| 34|     Sales|2860.0000000000005|
|   110|       John|  8200| 31| Marketing|            9020.0|
|   112|Jose Manuel|  7800| 34|        HR|            8580.0|
|   130|      Mozhe|  2800| 28| Marketing|3080.0000000000005|
|   133|      Jason|  3300| 38|     Sales|3630.0000000000005|
|   139|       John|  2700| 36|     Sales|2970.0000000000005|
|   140|     Joshua|  2500| 29|   Finance|            2750.0|
+------+-----------+------+---+----------+------------------+

