In [1]:
#pyspark session setup
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/hadoop/work/spark-3.2.0-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.8"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.8"
#!pip install -q findspark
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName("Dataframe Project").getOrCreate()

21/12/07 14:45:42 WARN util.Utils: Your hostname, hadoop-Lenovo-G50-80 resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp3s0)
21/12/07 14:45:42 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/07 14:45:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.options(header = 'True',inferSchema ='True').csv("OfficeDataProject.csv")
df.printSchema()
df.describe()
df.explain()

                                                                                

root
 |-- employee_id: integer (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

== Physical Plan ==
FileScan csv [employee_id#16,employee_name#17,department#18,state#19,salary#20,age#21,bonus#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hadoop/OfficeDataProject.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<employee_id:int,employee_name:string,department:string,state:string,salary:int,age:int,bon...




                                                                                

In [5]:
#Print Total number of employee in the company 
df.agg(count("employee_id")).show()

+------------------+
|count(employee_id)|
+------------------+
|              1000|
+------------------+



In [6]:
#Print Total number of departments in the company 
df.agg(countDistinct("department")).show()

+-----------------+
|count(department)|
+-----------------+
|                6|
+-----------------+



In [7]:
#Print departments names in the company 
df.select(col("department")).distinct().orderBy(col("department"),ascending=False).show()

+----------+
|department|
+----------+
|     Sales|
|Purchasing|
| Marketing|
|        HR|
|   Finance|
|  Accounts|
+----------+



In [8]:
# Print total Number of emplooyees in each department 
df.groupBy(col("department")).agg(count("*"),sum("salary").alias("Emp Salary"),min("salary")).show()
# Print total Number of emplooyees in each state in each department
df.groupBy(col("department"),col("state")).agg(count("*")).orderBy("department",ascending=True).show()

+----------+--------+----------+-----------+
|department|count(1)|Emp Salary|min(salary)|
+----------+--------+----------+-----------+
|     Sales|     169|    929430|       1103|
|        HR|     171|    988537|       1013|
|   Finance|     162|    835570|       1006|
|Purchasing|     166|    852551|       1105|
| Marketing|     170|    881983|       1031|
|  Accounts|     162|    841873|       1007|
+----------+--------+----------+-----------+

+----------+-----+--------+
|department|state|count(1)|
+----------+-----+--------+
|  Accounts|   NY|      34|
|  Accounts|   CA|      35|
|  Accounts|   WA|      27|
|  Accounts|   AK|      37|
|  Accounts|   LA|      29|
|   Finance|   LA|      29|
|   Finance|   CA|      35|
|   Finance|   NY|      31|
|   Finance|   WA|      30|
|   Finance|   AK|      37|
|        HR|   LA|      41|
|        HR|   NY|      30|
|        HR|   CA|      28|
|        HR|   AK|      25|
|        HR|   WA|      47|
| Marketing|   AK|      42|
| Marketing|   NY

In [9]:
#Print min and max salary in each department and sort salary in assending order
df.groupBy(col("department")).agg(min("salary").alias("min_salary"),max("salary").alias("max_salary")) \
.orderBy(col("min_salary").asc(),col("min_salary").asc()).show()

+----------+----------+----------+
|department|min_salary|max_salary|
+----------+----------+----------+
|   Finance|      1006|      9899|
|  Accounts|      1007|      9890|
|        HR|      1013|      9982|
| Marketing|      1031|      9974|
|     Sales|      1103|      9982|
|Purchasing|      1105|      9985|
+----------+----------+----------+



In [10]:
# Print names of employees working in NY state under Finance department whose bouns are greater than the average  
# bonuses of employees in NY
avg_bonus =df.filter(col("state") == 'NY').groupBy(col("state")).agg(avg("bonus").alias("avg_bonus")) \
            .select("avg_bonus").collect()[0]['avg_bonus'] 
df.select(col("employee_name")).filter((col("salary") >= avg_bonus) & (col("department") == 'Finance')).show()

+--------------------+
|       employee_name|
+--------------------+
|     Melissia Dedman|
|       Megan Gallman|
|     Trena Clemencia|
|        Tyree Soules|
|       Suzanne Trena|
|         Herder Nitz|
|     Exie Georgeanna|
|     Antonio Ruzicka|
|        Clune Norene|
|       Janine Mayeda|
|      Herder Gallman|
|     Casimira Katlyn|
|   Tenenbaum Suzanne|
|       Grigg Debroah|
|          Nena Rocha|
|    Clemencia Locust|
|       Leif Lemaster|
|   Clemencia Rudolph|
|         Janey Amber|
|Ellingsworth Meli...|
+--------------------+
only showing top 20 rows



In [11]:
# raise salary $500 of all th emeployees whose age is greater than 45
df.withColumn("salary",lit(500) + col("salary")).show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6631| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4527| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3622| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  6217| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  6185| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  3343| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7701| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3944| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  4204| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5561| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9808| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

In [12]:
#using udf raise salary $500 of all th emeployees whose age is greater than 45 
from pyspark.sql.types import *

def salary_incr(cur_salary,age):
    if age >= 45:
        salary_incr = cur_salary + 500
    else:
        salary_incr = cur_salary
        
    return salary_incr

SalaryIncrUDF = udf (lambda x,y : salary_incr(x,y),IntegerType())

df.withColumn("Incremented salary",SalaryIncrUDF(col("salary"),col("age"))).show()

+-----------+-------------------+----------+-----+------+---+-----+------------------+
|employee_id|      employee_name|department|state|salary|age|bonus|Incremented salary|
+-----------+-------------------+----------+-----+------+---+-----+------------------+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|              6131|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|              4027|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|              3122|
|       1003|        Tamra Amber|  Accounts|   AK|  5717| 47| 1291|              6217|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|              5685|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|              2843|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|              7201|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|              3444|
|       1008|  Recalde Kensinger|  Accounts

                                                                                

In [13]:
# wrtite file modes
#overwrite
#append
#error
#ignore

In [14]:
# create DF of all the employees whose age is greater than 45 an save them to file
df.write.mode("overwrite").options(header = 'True').csv('output/StudData.csv')

21/12/07 15:23:32 WARN hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-830212574-127.0.1.1-1633618224355:blk_1073742467_1652
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/127.0.0.1:56754 remote=/127.0.0.1:50010]
	at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
	at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
	at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
	at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
	at java.io.FilterInputStream.read(FilterInputStream.java:83)
	at java.io.FilterInputStream.read(FilterInputStream.java:83)
	at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2292)
	at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
	at org.apache.hadoop.hdfs.DFSOutputStream