# Pandas vs Pyspark Fetaures

In [3]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Pyspark vs Pandas").getOrCreate()
from pyspark.sql import functions as F

In [10]:
# Read CSV file using Pandas
df_pd = pd.read_csv("emp.csv")
# Dfeault datatypes
df_pd.dtypes

eno        int64
ename     object
salary     int64
deptno     int64
doj       object
dtype: object

In [11]:
# Read CSV file using Pyspark
df_spark = spark.read.csv("emp.csv",header=True)
# default datatypes
df_spark.printSchema()

root
 |-- eno: string (nullable = true)
 |-- ename: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- deptno: string (nullable = true)
 |-- doj: string (nullable = true)



In [12]:
# Extract the schema based on file data values
df_spark = spark.read.csv("emp.csv",header=True,inferSchema=True)
# default datatypes
df_spark.printSchema()

root
 |-- eno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- deptno: integer (nullable = true)
 |-- doj: date (nullable = true)



In [13]:
df_spark.show()

+---+------+------+------+----------+
|eno| ename|salary|deptno|       doj|
+---+------+------+------+----------+
| 10| Amber|  1000|    10|2018-05-14|
|  2|  Amar|  1000|    10|2017-03-22|
|  8| Sagar|  1000|    20|2020-11-30|
|  4|Thomas|  1000|    10|2019-08-18|
|  5|  Jade|  2000|    10|2021-02-10|
|  6| Smith|  1000|    10|2016-12-01|
|  7|Robert|  3000|    20|2023-06-25|
|  3|  Nick|  1000|    30|2015-09-15|
|  9| Naman|  4000|    30|2020-01-20|
|  1| Vikas|  1000|    30|2017-10-03|
| 11|Dennis| 41000|    30|2024-01-05|
| 12| Karan|  2200|    20|2018-06-16|
| 13|Simran|  1500|    10|2019-12-19|
| 14|  Amit|  2000|    30|2022-04-09|
| 15|  Neha|  1700|    20|2021-07-22|
| 16| Rahul|  1200|    10|2016-03-28|
| 17| Divya|  3000|    30|2015-11-11|
| 18|Rakesh|  1800|    10|2020-09-04|
| 19|Monica|  2500|    20|2023-08-07|
| 20|  John|  2300|    30|2018-10-29|
+---+------+------+------+----------+
only showing top 20 rows


In [33]:
# Read CSV file using Pyspark
#df_spark1 = spark.read.option("emp.csv",header=True)
# default datatypes
#df_spark.printSchema()
df_sprk = spark.read .option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv("emp.csv")


In [58]:
# Read only sepcific columns
df_pd = pd.read_csv("emp.csv")
df_sprk = spark.read.csv("emp.csv",header=True,inferSchema=True).select("eno","salary","deptno")
df_sprk.show()

+---+------+------+
|eno|salary|deptno|
+---+------+------+
| 10|  1000|    10|
|  2|  1000|    10|
|  8|  1000|    20|
|  4|  1000|    10|
|  5|  2000|    10|
|  6|  1000|    10|
|  7|  3000|    20|
|  3|  1000|    30|
|  9|  4000|    30|
|  1|  1000|    30|
| 11| 41000|    30|
| 12|  2200|    20|
| 13|  1500|    10|
| 14|  2000|    30|
| 15|  1700|    20|
| 16|  1200|    10|
| 17|  3000|    30|
| 18|  1800|    10|
| 19|  2500|    20|
| 20|  2300|    30|
+---+------+------+
only showing top 20 rows


In [39]:
# read specific row from 11th to 20th rows
df_pd.head(20).tail(10)

Unnamed: 0,eno,deptno
10,11,30
11,12,20
12,13,10
13,14,30
14,15,20
15,16,10
16,17,30
17,18,10
18,19,20
19,20,30


In [38]:
# read specific row from 11th to 20th rows
df_sprk.limit(20).subtract(df_sprk.limit(10)).show()

+---+------+------+------+----------+
|eno| ename|salary|deptno|       doj|
+---+------+------+------+----------+
| 11|Dennis| 41000|    30|2024-01-05|
| 12| Karan|  2200|    20|2018-06-16|
| 13|Simran|  1500|    10|2019-12-19|
| 14|  Amit|  2000|    30|2022-04-09|
| 15|  Neha|  1700|    20|2021-07-22|
| 16| Rahul|  1200|    10|2016-03-28|
| 17| Divya|  3000|    30|2015-11-11|
| 18|Rakesh|  1800|    10|2020-09-04|
| 19|Monica|  2500|    20|2023-08-07|
| 20|  John|  2300|    30|2018-10-29|
+---+------+------+------+----------+



In [46]:
df_sprk.printSchema()

root
 |-- eno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- deptno: integer (nullable = true)
 |-- doj: date (nullable = true)



In [52]:
# Write the data in to another file in pandas
#df_pd.to_csv("emp_output_pd.csv")

# Write the data in to another file in pyspark
df_sprk.write.mode("overwrite").option("header",True).csv("pysprk_folder")

In [65]:
# Read a zipped(compressed) file in pyspark
df_sp = spark.read .option("header", "true").option("inferSchema", "true").option("delimiter", ",").option("compression","gzip").csv("emp_bkp.csv.gz")

df.show()

# Sorting 

In [64]:
# Sorting by salary in pandas
df_pd = df_pd.sort_values(by='salary',ascending=False)

In [71]:
# Sorting by salary in pyspark
df_sp.orderBy(df_sp.salary.desc()).show()
df_sp.orderBy(df_sp.salary.asc()).show()

+---+------+------+------+----------+
|eno| ename|salary|deptno|       doj|
+---+------+------+------+----------+
| 11|Dennis| 41000|    30|2024-01-05|
|  9| Naman|  4000|    30|2020-01-20|
| 36|  Ritu|  3200|    20|2023-05-01|
|  7|Robert|  3000|    20|2023-06-25|
| 17| Divya|  3000|    30|2015-11-11|
| 35| Seema|  3000|    10|2021-10-15|
| 37| Kunal|  2800|    30|2015-08-22|
| 21| Sneha|  2700|    10|2022-12-14|
| 38|  Alok|  2700|    10|2019-03-02|
| 39|Poonam|  2600|    20|2022-02-05|
| 19|Monica|  2500|    20|2023-08-07|
| 40|  Yash|  2500|    30|2020-07-19|
| 33| Nidhi|  2400|    30|2016-05-12|
| 20|  John|  2300|    30|2018-10-29|
| 41|  Nina|  2300|    20|2018-12-28|
| 12| Karan|  2200|    20|2018-06-16|
| 42| Tarun|  2200|    10|2021-01-09|
| 43| Geeta|  2100|    30|2017-11-14|
| 32|  Ajay|  2100|    10|2020-06-26|
| 34| Vivek|  2000|    20|2017-07-31|
+---+------+------+------+----------+
only showing top 20 rows
+---+------+------+------+----------+
|eno| ename|salary|deptno

In [72]:
# Sorting by deptno in ascending and salary in descending order in pyspark
df_sp.orderBy(df_sp.deptno.asc(),df_sp.salary.desc()).show()

+---+------+------+------+----------+
|eno| ename|salary|deptno|       doj|
+---+------+------+------+----------+
| 35| Seema|  3000|    10|2021-10-15|
| 21| Sneha|  2700|    10|2022-12-14|
| 38|  Alok|  2700|    10|2019-03-02|
| 42| Tarun|  2200|    10|2021-01-09|
| 32|  Ajay|  2100|    10|2020-06-26|
|  5|  Jade|  2000|    10|2021-02-10|
| 44| Ashok|  1900|    10|2016-06-08|
| 24| Vinod|  1800|    10|2016-07-30|
| 18|Rakesh|  1800|    10|2020-09-04|
| 26| Harsh|  1600|    10|2021-09-25|
| 13|Simran|  1500|    10|2019-12-19|
| 28| Mohan|  1300|    10|2015-04-17|
| 16| Rahul|  1200|    10|2016-03-28|
| 30|Suresh|  1100|    10|2018-09-09|
| 10| Amber|  1000|    10|2018-05-14|
|  4|Thomas|  1000|    10|2019-08-18|
|  2|  Amar|  1000|    10|2017-03-22|
|  6| Smith|  1000|    10|2016-12-01|
| 36|  Ritu|  3200|    20|2023-05-01|
|  7|Robert|  3000|    20|2023-06-25|
+---+------+------+------+----------+
only showing top 20 rows


In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(" PySpark version:", pyspark.__version__)
print(" Spark version:", spark.version)


 PySpark version: 4.0.0
 Spark version: 4.0.0


In [4]:
# group by one colums
df = spark.read .option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv("emp.csv")
df.groupBy('deptno').agg(F.sum('salary').alias('sum_Values')).show()

+------+----------+
|deptno|sum_Values|
+------+----------+
|    20|     30000|
|    10|     30900|
|    30|     68700|
+------+----------+



In [16]:
# group by multiple columns
df = spark.read.option("header", "true").option("inferSchema", "true").json("emp_details.json")
#df.show()
df.groupBy('city','deptid').agg(F.sum('salary').alias('sum_Values')).show()


+---------+------+----------+
|     city|deptid|sum_Values|
+---------+------+----------+
|  Chennai|    40|     65000|
|   Mumbai|    20|     60000|
|Bengaluru|    50|     48000|
|     Pune|    20|     49000|
|    Delhi|    20|     62000|
|   Mumbai|    30|     59000|
|Hyderabad|    50|     61000|
|Ahmedabad|    30|     61000|
|Bengaluru|    10|    110000|
|  Kolkata|    10|    107000|
|  Chennai|    60|     66000|
|   Jaipur|    50|     47000|
|     Pune|    30|     57000|
|    Delhi|    40|    130000|
|Hyderabad|    30|     60000|
|    Surat|    20|     56000|
|   Mumbai|    10|     55000|
+---------+------+----------+



In [22]:
# group by one colun but multile agrregate function
df.groupBy('deptid').agg( F.sum('salary').alias('sum'), F.mean('salary').alias('mean'), F.avg('salary').alias('avg') ).show()
df.show()

+------+------+-------+-------+
|deptid|   sum|   mean|    avg|
+------+------+-------+-------+
|    50|156000|52000.0|52000.0|
|    10|272000|54400.0|54400.0|
|    30|237000|59250.0|59250.0|
|    20|227000|56750.0|56750.0|
|    60| 66000|66000.0|66000.0|
|    40|195000|65000.0|65000.0|
+------+------+-------+-------+

+---------+------+---+------+
|     city|deptid|eno|salary|
+---------+------+---+------+
|   Mumbai|    10|101| 55000|
|    Delhi|    20|102| 62000|
|Bengaluru|    10|103| 58000|
|Hyderabad|    30|104| 60000|
|     Pune|    20|105| 49000|
|  Kolkata|    10|106| 53000|
|  Chennai|    40|107| 65000|
|Ahmedabad|    30|108| 61000|
|   Jaipur|    50|109| 47000|
|    Surat|    20|110| 56000|
|   Mumbai|    30|111| 59000|
|    Delhi|    40|112| 63000|
|Bengaluru|    10|113| 52000|
|Hyderabad|    50|114| 61000|
|  Chennai|    60|115| 66000|
|     Pune|    30|116| 57000|
|  Kolkata|    10|117| 54000|
|   Mumbai|    20|118| 60000|
|    Delhi|    40|119| 67000|
|Bengaluru|    50|1

In [26]:
# filter before grouping
filtered = df.filter(df.salary > 60000)
filtered.show()
filtered.groupBy('deptid').agg(F.sum('salary').alias('total_salary')).show()

+---------+------+---+------+
|     city|deptid|eno|salary|
+---------+------+---+------+
|    Delhi|    20|102| 62000|
|  Chennai|    40|107| 65000|
|Ahmedabad|    30|108| 61000|
|    Delhi|    40|112| 63000|
|Hyderabad|    50|114| 61000|
|  Chennai|    60|115| 66000|
|    Delhi|    40|119| 67000|
+---------+------+---+------+

+------+------------+
|deptid|total_salary|
+------+------------+
|    50|       61000|
|    30|       61000|
|    20|       62000|
|    60|       66000|
|    40|      195000|
+------+------------+



In [28]:
# Filter after grouping (HAVING)
df.groupBy('deptid').agg(F.sum('salary').alias('total_salary')).filter('total_salary < 70000').show()

+------+------------+
|deptid|total_salary|
+------+------------+
|    60|       66000|
+------+------------+



In [36]:
# Joiner in Pyspark
df1 = spark.read.option("header", "true").option("inferSchema", "true").json("emp_details.json")
df2 = spark.read.option("header", "true").option("inferSchema", "true").json("department.json")


In [37]:
df2.show()

+----------+------+
|  deptname|deptno|
+----------+------+
|   Finance|    10|
|        HR|    20|
|        IT|    30|
|Operations|    40|
| Marketing|    50|
|     Legal|    70|
|     Admin|    80|
+----------+------+



In [39]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

emp_df = spark.read.json("emp_details.json")
dept_df = spark.read.json("department.json")

# INNER JOIN
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "inner").show()

# LEFT JOIN
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "left").show()

# RIGHT JOIN
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "right").show()

# FULL OUTER JOIN
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "full").show()

# LEFT SEMI (only employees with matching dept)
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "left_semi").show()

# LEFT ANTI (employees whose dept not found in dept_df)
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "left_anti").show()


+---------+------+---+------+----------+------+
|     city|deptid|eno|salary|  deptname|deptno|
+---------+------+---+------+----------+------+
|   Mumbai|    10|101| 55000|   Finance|    10|
|    Delhi|    20|102| 62000|        HR|    20|
|Bengaluru|    10|103| 58000|   Finance|    10|
|Hyderabad|    30|104| 60000|        IT|    30|
|     Pune|    20|105| 49000|        HR|    20|
|  Kolkata|    10|106| 53000|   Finance|    10|
|  Chennai|    40|107| 65000|Operations|    40|
|Ahmedabad|    30|108| 61000|        IT|    30|
|   Jaipur|    50|109| 47000| Marketing|    50|
|    Surat|    20|110| 56000|        HR|    20|
|   Mumbai|    30|111| 59000|        IT|    30|
|    Delhi|    40|112| 63000|Operations|    40|
|Bengaluru|    10|113| 52000|   Finance|    10|
|Hyderabad|    50|114| 61000| Marketing|    50|
|     Pune|    30|116| 57000|        IT|    30|
|  Kolkata|    10|117| 54000|   Finance|    10|
|   Mumbai|    20|118| 60000|        HR|    20|
|    Delhi|    40|119| 67000|Operations|

In [40]:
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "inner").show()


+---------+------+---+------+----------+------+
|     city|deptid|eno|salary|  deptname|deptno|
+---------+------+---+------+----------+------+
|   Mumbai|    10|101| 55000|   Finance|    10|
|    Delhi|    20|102| 62000|        HR|    20|
|Bengaluru|    10|103| 58000|   Finance|    10|
|Hyderabad|    30|104| 60000|        IT|    30|
|     Pune|    20|105| 49000|        HR|    20|
|  Kolkata|    10|106| 53000|   Finance|    10|
|  Chennai|    40|107| 65000|Operations|    40|
|Ahmedabad|    30|108| 61000|        IT|    30|
|   Jaipur|    50|109| 47000| Marketing|    50|
|    Surat|    20|110| 56000|        HR|    20|
|   Mumbai|    30|111| 59000|        IT|    30|
|    Delhi|    40|112| 63000|Operations|    40|
|Bengaluru|    10|113| 52000|   Finance|    10|
|Hyderabad|    50|114| 61000| Marketing|    50|
|     Pune|    30|116| 57000|        IT|    30|
|  Kolkata|    10|117| 54000|   Finance|    10|
|   Mumbai|    20|118| 60000|        HR|    20|
|    Delhi|    40|119| 67000|Operations|

In [41]:
# LEFT SEMI (only employees with matching dept)
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "left_semi").show()

+---------+------+---+------+
|     city|deptid|eno|salary|
+---------+------+---+------+
|   Mumbai|    10|101| 55000|
|    Delhi|    20|102| 62000|
|Bengaluru|    10|103| 58000|
|Hyderabad|    30|104| 60000|
|     Pune|    20|105| 49000|
|  Kolkata|    10|106| 53000|
|  Chennai|    40|107| 65000|
|Ahmedabad|    30|108| 61000|
|   Jaipur|    50|109| 47000|
|    Surat|    20|110| 56000|
|   Mumbai|    30|111| 59000|
|    Delhi|    40|112| 63000|
|Bengaluru|    10|113| 52000|
|Hyderabad|    50|114| 61000|
|     Pune|    30|116| 57000|
|  Kolkata|    10|117| 54000|
|   Mumbai|    20|118| 60000|
|    Delhi|    40|119| 67000|
|Bengaluru|    50|120| 48000|
+---------+------+---+------+



In [43]:
# LEFT ANTI (employees whose dept not found in dept_df)
emp_df.join(dept_df, emp_df.deptid == dept_df.deptno, "left_anti").show()
dept_df.join(emp_df, emp_df.deptid == dept_df.deptno, "left_anti").show()

+-------+------+---+------+
|   city|deptid|eno|salary|
+-------+------+---+------+
|Chennai|    60|115| 66000|
+-------+------+---+------+

+--------+------+
|deptname|deptno|
+--------+------+
|   Legal|    70|
|   Admin|    80|
+--------+------+



In [44]:
emp_df.join(other=dept_df,on=emp_df.deptid == dept_df.deptno,how="inner").show()


+---------+------+---+------+----------+------+
|     city|deptid|eno|salary|  deptname|deptno|
+---------+------+---+------+----------+------+
|   Mumbai|    10|101| 55000|   Finance|    10|
|    Delhi|    20|102| 62000|        HR|    20|
|Bengaluru|    10|103| 58000|   Finance|    10|
|Hyderabad|    30|104| 60000|        IT|    30|
|     Pune|    20|105| 49000|        HR|    20|
|  Kolkata|    10|106| 53000|   Finance|    10|
|  Chennai|    40|107| 65000|Operations|    40|
|Ahmedabad|    30|108| 61000|        IT|    30|
|   Jaipur|    50|109| 47000| Marketing|    50|
|    Surat|    20|110| 56000|        HR|    20|
|   Mumbai|    30|111| 59000|        IT|    30|
|    Delhi|    40|112| 63000|Operations|    40|
|Bengaluru|    10|113| 52000|   Finance|    10|
|Hyderabad|    50|114| 61000| Marketing|    50|
|     Pune|    30|116| 57000|        IT|    30|
|  Kolkata|    10|117| 54000|   Finance|    10|
|   Mumbai|    20|118| 60000|        HR|    20|
|    Delhi|    40|119| 67000|Operations|