Dependencies to install:

1) Java 8

2) Apache Spark with hadoop and

3) Findspark (used to locate the spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark


In [None]:
# will be working on cars dataset

# link : https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv


In [None]:
!ls

In [None]:
# header=True means the first row is a header 
# sep=';' means the column are seperated using ''

df = spark.read.csv('cars.csv', header=True, sep=";")
df.show(5)



Viewing the Dataframe
There are a couple of ways to view your dataframe(DF) in PySpark:







1.   df.take(5) will return a list of five Row objects.
2.   df.collect() will get all of the data from the entire.  DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node.
3. df.show() is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, df.show(5, False) or df.show(5, truncate=False) will show the entire data wihtout any truncation.

4. df.limit(5) will return a new DataFrame by taking the first n rows. As spark is distributed in nature, there is no guarantee that df.limit() will give you the same results each time.











In [None]:
df.show(5, truncate=False)

In [None]:
df.limit(5)

Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
Chevrolet Chevell...,18.0,8,307.0,130.0,3504,12.0,70,US
Buick Skylark 320,15.0,8,350.0,165.0,3693,11.5,70,US
Plymouth Satellite,18.0,8,318.0,150.0,3436,11.0,70,US
AMC Rebel SST,16.0,8,304.0,150.0,3433,12.0,70,US
Ford Torino,17.0,8,302.0,140.0,3449,10.5,70,US


Viewing Dataframe Columns

In [None]:
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [None]:
df.dtypes   

[('Car', 'string'),
 ('MPG', 'string'),
 ('Cylinders', 'string'),
 ('Displacement', 'string'),
 ('Horsepower', 'string'),
 ('Weight', 'string'),
 ('Acceleration', 'string'),
 ('Model', 'string'),
 ('Origin', 'string')]

In [None]:
df.printSchema()

Inferring Schema Implicitly

->We can use the parameter inferschema=true to infer the input schema automatically while loading the data. An example is shown below:

In [None]:
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
df.printSchema()

As you can see, the datatype has been infered automatically spark with even the correct precison for decimal type. A problem that might arise here is that sometimes, when you have to read multiple files with different schemas in different files, there might be an issue with implicit inferring leading to null values in some columns. Therefore, let us also see how to define schemas explicitly.

Defining Schema Explicitly

In [None]:
from pyspark.sql.types import *
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [None]:
# Creating a list of the schema in the format column_name, data_type
labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

In [None]:
# Creating the schema that will be passed when reading the csv
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType(List(StructField(Car,StringType,true),StructField(MPG,DoubleType,true),StructField(Cylinders,IntegerType,true),StructField(Displacement,DoubleType,true),StructField(Horsepower,DoubleType,true),StructField(Weight,DoubleType,true),StructField(Acceleration,DoubleType,true),StructField(Model,IntegerType,true),StructField(Origin,StringType,true)))

In [None]:
df = spark.read.csv('cars.csv', header=True, sep=";", schema=schema)
df.printSchema()
# The schema comes as we gave!

In [None]:
df.show(truncate=False)

#As we can see here, the data has been successully loaded with the specified datatypes.

DataFrame Operations on Columns 

We will go over the following in this section:

1. Selecting Columns
2. Selecting Multiple Columns
3. Adding New Columns
4. Renaming Columns
5. Grouping By Columns
6. Removing Columns

Selecting Columns

There are multiple ways to do a select in PySpark. You can find how they differ and how each below:

In [None]:
# 1st method
# Column name is case sensitive in this usage
print(df.Car)
print("*"*20)
df.select(df.Car).show(truncate=False)

NOTE:

We can't always use the dot notation because this will break when the column names have reserved names or attributes to the data frame class. Additionally, the column names are case sensitive in nature so we need to always make sure the column names have been changed to a paticular case before using it.

In [None]:
# 2nd method
# Column name is case insensitive here
print(df['car'])
print("*"*20)
df.select(df['car']).show(truncate=False)

In [None]:
# 3rd method
# Column name is case insensitive here
from pyspark.sql.functions import col
df.select(col('car')).show(truncate=False)

Selecting Multiple Columns

In [None]:
# 1st method
# Column name is case sensitive in this usage
print(df.Car, df.Cylinders)
print("*"*40)
df.select(df.Car, df.Cylinders).show(truncate=False)

In [None]:
# 2nd method
# Column name is case insensitive in this usage
print(df['car'],df['cylinders'])
print("*"*40)
df.select(df['car'],df['cylinders']).show(truncate=False)

In [None]:
# 3rd method
# Column name is case insensitive in this usage
from pyspark.sql.functions import col
df.select(col('car'),col('cylinders')).show(truncate=False)

Adding New Columns
We will take a look at three cases here:

1. Adding a new column
2. Adding multiple columns
3. Deriving a new column from an exisitng one

In [None]:
# CASE 1: Adding a new column
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1)) 
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

In [None]:
# CASE 1: Adding a new column
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1)) 
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

In [None]:
# CASE 2: Adding multiple columns
# We will add two new columns called 'second_column' and 'third_column' at the end
df = df.withColumn('second_column', lit(2)) \
       .withColumn('third_column', lit('Third Column')) 

# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.

df.show(5,truncate=False)

In [None]:
# CASE 3: Deriving a new column from an exisitng one
# We will add a new column called 'car_model' which has the value of car and model appended together with a space in between 

from pyspark.sql.functions import concat
df = df.withColumn('car_model', concat(col("Car"), lit(" "), col("model")))



df.show(5,truncate=False)

As we can see, the new column car model has been created from existing columns. Since our aim was to create a column which has the value of car and model appended together with a space in between we have used the concat operator.




Renaming Columns

We use the withColumnRenamed function to rename a columm in PySpark. Let us see it in action below:

In [None]:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two') \
       .withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)

Grouping By Columns

Here, we see the Dataframe API way of grouping values. We will discuss how to:

1. Group By a single column
2. Group By multiple columns

In [None]:
# Group By a column in PySpark
df.groupBy('Origin').count().show(5)

In [None]:
# Group By multiple columns in PySpark
df.groupBy('Origin', 'Model').count().show(5)

Removing Columns

In [None]:
#Remove columns in PySpark
df = df.drop('new_column_one')
df.show(5,truncate=False)

In [None]:
#Remove multiple columnss in one go
df = df.drop('new_column_two') \
       .drop('new_column_three')
df.show(5,truncate=False)

DataFrame Operations on Rows
We will discuss the follwoing in this section:

1. Filtering Rows
2. Get Distinct Rows
3. Sorting Rows
4. Union Dataframes

Filtering Rows

In [None]:
# Filtering rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count)) 
europe_filtered_count = df.filter(col('Origin')=='Europe').count()
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

In [None]:
# Filtering rows in PySpark based on Multiple conditions
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count)) 
europe_filtered_count = df.filter((col('Origin')=='Europe') & 
                                  (col('Cylinders')==4)).count() # Two conditions added here
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

Get Distinct Rows


In [None]:
#Get Unique Rows in PySpark
df.select('Origin').distinct().show()

In [None]:
#Get Unique Rows in PySpark based on mutliple columns
df.select('Origin','model').distinct().show()

Sorting Rows

In [None]:
# Sort Rows in PySpark
# By default the data will be sorted in ascending order
df.orderBy('Cylinders').show(truncate=False) 

In [None]:
# To change the sorting order, you can use the ascending parameter
df.orderBy('Cylinders', ascending=False).show(truncate=False) 

In [None]:
# Using groupBy aand orderBy together
df.groupBy("Origin").count().orderBy('count', ascending=False).show(10)

###Union Dataframes
You will see three main methods for performing union of dataframes. It is important to know the difference between them and which one is preferred:

-->union() – It is used to merge two DataFrames of the same structure/schema. If schemas are not the same, it returns an error

-->unionAll() – This function is deprecated since Spark 2.0.0, and replaced with union()

-->unionByName() - This function is used to merge two dataframes based on column name.
Since unionAll() is deprecated, union() is the preferred method for merging dataframes.
The difference between unionByName() and union() is that unionByName() resolves columns by name, not by position.

In other SQLs, Union eliminates the duplicates but UnionAll merges two datasets, thereby including duplicate records. But, in PySpark, both behave the same and includes duplicate records. The recommendation is to use distinct() or dropDuplicates() to remove duplicate records.

In [None]:
# CASE 1: Union When columns are in order
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
europe_cars = df.filter((col('Origin')=='Europe') & (col('Cylinders')==5))
japan_cars = df.filter((col('Origin')=='Japan') & (col('Cylinders')==3))
print("EUROPE CARS: "+str(europe_cars.count()))
print("JAPAN CARS: "+str(japan_cars.count()))
print("AFTER UNION: "+str(europe_cars.union(japan_cars).count()))

Result:

As you can see here, there were 3 cars from Europe with 5 Cylinders, and 4 cars from Japan with 3 Cylinders. After union, there are 7 cars in total.

In [None]:
# CASE 1: Union When columns are not in order
# Creating two dataframes with jumbled columns
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

Result:

As you can see here, the two dataframes have been successfully merged based on their column names.

###Common Data Manipulation Functions

In [None]:
# Functions available in PySpark
from pyspark.sql import functions
# Similar to python, we can use the dir function to view the avaiable functions
print(dir(functions)) 

String Functions

In [None]:
# Loading the data
from pyspark.sql.functions import col
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)

Display the Car column in exisitng, lower and upper characters, and the first 4 characters of the column

In [None]:
from pyspark.sql.functions import col,lower, upper, substring
# Prints out the details of a function
help(substring)

# alias is used to rename the column name in the output
df.select(col('Car'),lower(col('Car')),upper(col('Car')),substring(col('Car'),1,4).alias("concatenated value")).show(5, False)

Concatenate the Car column and Model column and add a space between them.

In [None]:
from pyspark.sql.functions import concat
df.select(col("Car"),col("model"),concat(col("Car"), lit(" "), col("model"))).show(5, False)

Numeric functions

--> Show the oldest date and the most recent date

In [None]:
from pyspark.sql.functions import min, max
df.select(min(col('Weight')), max(col('Weight'))).show()

Add 10 to the minimum and maximum weight

In [None]:
from pyspark.sql.functions import min, max, lit
df.select(min(col('Weight'))+lit(10), max(col('Weight')+lit(10))).show()

Operations on Date

-->PySpark follows SimpleDateFormat table of Java. Click here to view the docs.

In [None]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()

In [None]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()

In [None]:
df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'dd/MMM/yyyy HH:mm:ss'), to_timestamp(col('DOB'),'dd/MMM/yyyy HH:mm:ss'))
df.show()
df.printSchema()

What is 3 days earlier that the oldest date and 3 days later than the most recent date?

In [None]:
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')),3)).show()

###Joins in PySpark

In [None]:
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

In [None]:
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['car_name'],car_price_df['car_price']).show(truncate=False)

As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:

1. inner (default)
2. cross
3. outer
4. full
5. full_outer
6. left
7. left_outer
8. right
9. right_outer
10. left_semi
11. left_anti

##Spark SQL
SQL has been around since the 1970s, and so one can imagine the number of people who made it their bread and butter. As big data came into popularity, the number of professionals with the technical knowledge to deal with it was in shortage. This led to the creation of Spark SQL. To quote the docs:

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

Basically, what you need to know is that Spark SQL is used to execute SQL queries on big data. Spark SQL can also be used to read data from Hive tables and views. Let me explain Spark SQL with an example.

In [None]:
# Load data
df = spark.read.csv('cars.csv', header=True, sep=";")
# Register Temporary Table
df.createOrReplaceTempView("temp")
# Select all data from temp table
spark.sql("select * from temp limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()

As you can see, we registered the dataframe as temporary table and then ran basic SQL queries on it. How amazing is that?!
If you are a person who is more comfortable with SQL, then this feature is truly a blessing for you! But this raises a question:

`Should I just keep using Spark SQL all the time?`

**And the answer is, it depends.**

So basically, the different functions acts in differnet ways, and depending upon the type of action you are trying to do, the speed at which it completes execution also differs. But as time progress, this feature is getting better and better, so hopefully the difference should be a small margin. There are plenty of analysis done on this, but nothing has a definite answer yet.

###RDD
With map, you define a function and then apply it record by record. Flatmap returns a new RDD by first applying a function to all of the elements in RDDs and then flattening the result. Filter, returns a new RDD. Meaning only the elements that satisfy a condition. With reduce, we are taking neighboring elements and producing a single combined result. For example, let's say you have a set of numbers. You can reduce this to its sum by providing a function that takes as input two values and reduces them to one.

Some of the reasons you would use a dataframe over RDD are:

1. It's ability to represnt data as rows and columns. But this also means it can only hold structred and semi-structured data.
2. It allows processing data in different formats (AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL).
3. It's superior job Optimization capability.
4. DataFrame API is very easy to use.

In [None]:
cars = spark.sparkContext.textFile('cars.csv')
print(cars.first())
cars_header = cars.first()
cars_rest = cars.filter(lambda line: line!=cars_header)
print(cars_rest.first())

**How many cars are there in our csv data?**

In [None]:
cars_rest.map(lambda line: line.split(";")).count()

406

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in Europe**

In [None]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(";")[8]=='Europe').
 map(lambda line: (line.split(";")[0],
    line.split(";")[1],
    line.split(";")[2],
    line.split(";")[5],
    line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Volkswagen Type 3', '23.0', '4', '2254.', 'Europe'),
 ('Volvo 145e (sw)', '18.0', '4', '2933.', 'Europe'),
 ('Volkswagen 411 (sw)', '22.0', '4', '2511.', 'Europe'),
 ('Peugeot 504 (sw)', '21.0', '4', '2979.', 'Europe'),
 ('Renault 12 (sw)', '26.0', '4', '2189.', 'Europe'),
 ('Volkswagen Super Beetle', '26.0', '4', '1950.', 'Europe'),
 ('Fiat 124 Sport Coupe', '26.0', '4', '2265.', 'Europe'),
 ('Fiat 128', '29

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in either Europe or Japan**

In [None]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(";")[8] in ['Europe','Japan']).
 map(lambda line: (line.split(";")[0],
    line.split(";")[1],
    line.split(";")[2],
    line.split(";")[5],
    line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Toyota Corolla Mark ii', '24.0', '4', '2372.', 'Japan'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Toyota Corolla', '25.0', '4', '2228.', 'Japan'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Toyota Corolla 1200', '31.0', '4', '1773.', 'Japan'),
 ('Datsun 1200', '35.0', '4', '1613.', 'Japan'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Toyota Corolla Hardtop', '24.0', '4', '2278.', 'Japan'),
 ('Volkswagen Type 3', '23.0', '4', '

You can create custom user-defined function in spark. You can learn that at: https://docs.databricks.com/spark/latest/spark-sql/udf-python.html

###Submitting a Spark Job
The python syntax for running jobs is: python <file_name>.py <arg1> <arg2> ...
But when you submit a spark job you have to use spark-submit to run the application.

Here is a simple example of a spark-submit command: spark-submit filename.py --named_argument 'arguemnt value'
Here, named_argument is an argument that you are reading from inside your script.

There are other options you can pass in the command, like:
* --py-files which helps you pass a python file to read in your file,
--files which helps pass other files like txt or config,
--deploy-mode which tells wether to deploy your worker node on cluster or locally
--conf which helps pass different configurations, like memoryOverhead, dynamicAllocation etc.

There is an entire page in spark documentation dedicated to this. I highly recommend you go through https://spark.apache.org/docs/latest/submitting-applications.html

###Creating Dataframes
When getting started with dataframes, the most common question is: 'How do I create a dataframe?'
Below, you can see how to create three kinds of dataframes:

**Create a totally empty dataframe**

In [None]:
from pyspark.sql.types import StructType
sc = spark.sparkContext
#Create empty df
schema = StructType([])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()

Create an empty dataframe with header

In [None]:
from pyspark.sql.types import StructType, StructField
#Create empty df with header
schema_header = StructType([StructField("name", StringType(), True)])
empty_with_header = spark.createDataFrame(sc.emptyRDD(), schema_header)
empty_with_header.show()

Create a dataframe with header and data

In [None]:
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":13},
  {"name":'Jacob',"age":24},
  {"name":'Betty',"age":135},
]
spark.createDataFrame(Row(**x) for x in mylist).show()

In [None]:
# You can achieve the same using this - note that we are using spark context here, not a spark session
from pyspark.sql import Row
df = sc.parallelize([
        Row(name='Alice', age=13),
        Row(name='Jacob', age=24),
        Row(name='Betty', age=135)]).toDF()
df.show()

Drop Duplicates
As mentioned earlier, there are two easy to remove duplicates from a dataframe. We have already seen the usage of distinct under Get Distinct Rows section. I will expalin how to use the dropDuplicates() function to achieve the same.

`drop_duplicates() is an alias for dropDuplicates()`

In [None]:
from pyspark.sql import Row
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":5,"height":80},
  {"name":'Jacob',"age":24,"height":80},
  {"name":'Alice',"age":5,"height":80}
]
df = spark.createDataFrame(Row(**x) for x in mylist)
df.dropDuplicates().show()

dropDuplicates() can also take in an optional parameter called subset which helps specify the columns on which the duplicate check needs to be done on.

In [None]:
df.dropDuplicates(subset=['height']).show()