In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 25.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=6302f3d59fbfd752a039c44de913a2483fcf7879bc9f5aec13a206dd1fee1985
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:
#importing required libraries for Spark Dataframe
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.functions import sum,avg,max,min,mean,count
spark = SparkSession.builder.appName("Mini Project").getOrCreate()

In [4]:
#reading office data
df = spark.read.options(header='True', inferSchema='True').\
     csv('/content/sample_data/OfficeDataProject.csv')
df.show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  3704| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5061| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9308| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

In [5]:
#total number of employees in company
df.count()

1000

In [12]:
#total number of department in company
#1st method
df.select("department").dropDuplicates(["department"]).count()
#2nd method 
df.groupBy('department').count().count()

6

In [13]:
#department names in company 
df.select("department").dropDuplicates(["department"]).show()

+----------+
|department|
+----------+
|     Sales|
|        HR|
|   Finance|
|Purchasing|
| Marketing|
|  Accounts|
+----------+



In [14]:
#total number of employees in each dept
df.groupBy('department').count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|  169|
|        HR|  171|
|   Finance|  162|
|Purchasing|  166|
| Marketing|  170|
|  Accounts|  162|
+----------+-----+



In [15]:
#total number of employees in each state
df.groupBy('state').count().show()

+-----+-----+
|state|count|
+-----+-----+
|   LA|  205|
|   CA|  205|
|   WA|  208|
|   NY|  173|
|   AK|  209|
+-----+-----+



In [16]:
#total number of employees in each state and each dept
df.groupBy('state','department').count().show()

+-----+----------+-----+
|state|department|count|
+-----+----------+-----+
|   CA|     Sales|   42|
|   CA| Marketing|   33|
|   NY|  Accounts|   34|
|   NY|     Sales|   27|
|   CA|   Finance|   35|
|   CA|  Accounts|   35|
|   CA|Purchasing|   32|
|   WA|        HR|   47|
|   AK|Purchasing|   30|
|   WA|  Accounts|   27|
|   WA|Purchasing|   38|
|   AK|     Sales|   38|
|   AK|  Accounts|   37|
|   WA| Marketing|   39|
|   LA|        HR|   41|
|   LA|     Sales|   35|
|   AK|        HR|   25|
|   LA|   Finance|   29|
|   AK|   Finance|   37|
|   LA|Purchasing|   45|
+-----+----------+-----+
only showing top 20 rows



In [17]:
#minimum and maximum salaries in each dept, sort in ascending order
df.groupBy("department").agg(min("salary").alias("min"), max("salary").alias("max")).\
orderBy(col("max").asc(), col("min").asc()).show()

+----------+----+----+
|department| min| max|
+----------+----+----+
|  Accounts|1007|9890|
|   Finance|1006|9899|
| Marketing|1031|9974|
|        HR|1013|9982|
|     Sales|1103|9982|
|Purchasing|1105|9985|
+----------+----+----+



In [21]:
df.show(4)

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|      Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
+-----------+-----------------+----------+-----+------+---+-----+
only showing top 4 rows



In [18]:
# Names of employees working in NY state under Finance Dept whose Bonuses are greater
# than avg bonuses of employees in NY State

In [22]:
avgBonusDf = df.filter(df.state == "NY").groupBy("state").agg(avg("bonus").alias("avg_bonus"))
avgBonus = avgBonusDf.select("avg_bonus").collect()[0]['avg_bonus']
print('Average Bonus:', avgBonus)
df.filter((df.state == "NY") & (df.department == "Finance") & (df.bonus > avgBonus)).show()

Average Bonus: 1251.3468208092486
+-----------+--------------------+----------+-----+------+---+-----+
|employee_id|       employee_name|department|state|salary|age|bonus|
+-----------+--------------------+----------+-----+------+---+-----+
|       1035|       Vivan Sifford|   Finance|   NY|  1129| 35| 1261|
|       1073|      Herder Gallman|   Finance|   NY|  1988| 31| 1402|
|       1082|          Nena Rocha|   Finance|   NY|  3417| 25| 1647|
|       1087|       Leif Lemaster|   Finance|   NY|  8642| 45| 1782|
|       1100|Ellingsworth Meli...|   Finance|   NY|  7845| 32| 1358|
|       1127|        Escoto Gilma|   Finance|   NY|  3426| 41| 1285|
|       1161|     Georgeanna Laub|   Finance|   NY|  2469| 26| 1679|
|       1175|     Durio Tenenbaum|   Finance|   NY|  2253| 42| 1684|
|       1180|       Juliana Grigg|   Finance|   NY|  8178| 42| 1617|
|       1215|        Tiffani Benz|   Finance|   NY|  1665| 41| 1969|
|       1220|          Nitz Ilana|   Finance|   NY|  2443| 50| 1342|


In [23]:
#raise the salaries of all employees by 500 whose age is greater than 45 using udf

#normal function
def incr_salary(age, currentSalary):
  if age > 45:
    return currentSalary + 500
  return currentSalary

incrSalaryUDF = udf(lambda x,y : incr_salary(x,y), IntegerType())  #udf function

df.withColumn("salary", incrSalaryUDF(col("age"), col("salary"))).show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  6217| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  4204| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5061| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9308| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

In [24]:
#create DF of all the employees whose age > 45 and save them in file
df.filter(df.age > 45).write.csv("/content/sample_data/output_45")