<a href="https://colab.research.google.com/github/PPraveen2230/PySpark---SQL-tutorial-2/blob/main/Pyspark_tutorial_exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### how to find unique values in a Column

In [1]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['A', 'East', 11],
        ['A', 'East', 8],
        ['A', 'East', 10],
        ['B', 'West', 6],
        ['B', 'West', 9],
        ['C', 'East', 5]]

 #define column name
columns = ['team', 'conference', 'points']

 # create Dataframe using data and ccolumn names
df = spark.createDataFrame(data, columns)

 # view DataFrame
df.show()

+----+----------+------+
|team|conference|points|
+----+----------+------+
|   A|      East|    11|
|   A|      East|     8|
|   A|      East|    10|
|   B|      West|     6|
|   B|      West|     9|
|   C|      East|     5|
+----+----------+------+



In [3]:
#Find Unique Values in a Column

df.select('team').distinct().show()

+----+
|team|
+----+
|   A|
|   B|
|   C|
+----+



In [4]:
#Find and Sort Unique Values in a Column

df.select('points').distinct().show()

+------+
|points|
+------+
|    10|
|     8|
|    11|
|     6|
|     9|
|     5|
+------+



In [5]:
#find unique values in points column
df_points = df.select('points').distinct()


#display unique values in ascending order
df_points.orderBy('points').show()

+------+
|points|
+------+
|     5|
|     6|
|     8|
|     9|
|    10|
|    11|
+------+



In [6]:
#find unique values in points column
df_points = df.select('points').distinct()

#display unique values in descending order
df_points.orderBy('points', ascending=False).show()

+------+
|points|
+------+
|    11|
|    10|
|     9|
|     8|
|     6|
|     5|
+------+



In [7]:
# Example 3: Find and Count Unique Values in a Column

df.groupBy('team').count().show()

+----+-----+
|team|count|
+----+-----+
|   A|    3|
|   B|    2|
|   C|    1|
+----+-----+



#How to Select Rows by Index in DataFrame

In [8]:

#Select Rows by Index in PySpark DataFrame

df.show()

+----+----------+------+
|team|conference|points|
+----+----------+------+
|   A|      East|    11|
|   A|      East|     8|
|   A|      East|    10|
|   B|      West|     6|
|   B|      West|     9|
|   C|      East|     5|
+----+----------+------+



In [9]:
from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import Window

#add column called 'id' that contains row number from 1 to n
w= Window().orderBy(lit ('A'))
df= df.withColumn('id', row_number().over(w))

df.show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|    11|  1|
|   A|      East|     8|  2|
|   A|      East|    10|  3|
|   B|      West|     6|  4|
|   B|      West|     9|  5|
|   C|      East|     5|  6|
+----+----------+------+---+



In [10]:
from pyspark.sql.functions import col

# select all rows between index values 2 and 5
df.where(col('id').between(2, 5)).show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|     8|  2|
|   A|      East|    10|  3|
|   B|      West|     6|  4|
|   B|      West|     9|  5|
+----+----------+------+---+



In [11]:
#find unique values in points column
df.filter(df.id.isin(1,5,6)).show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|    11|  1|
|   B|      West|     9|  5|
|   C|      East|     5|  6|
+----+----------+------+---+



How to Select Columns by Index in
DataFrame

In [12]:
#select first column in DataFrame
df.select(df.columns[0]).show()

+----+
|team|
+----+
|   A|
|   A|
|   A|
|   B|
|   B|
|   C|
+----+



In [13]:
#select all columns except first column in DataFrame
df.drop(df.columns[0]).show()

+----------+------+---+
|conference|points| id|
+----------+------+---+
|      East|    11|  1|
|      East|     8|  2|
|      East|    10|  3|
|      West|     6|  4|
|      West|     9|  5|
|      East|     5|  6|
+----------+------+---+



In [14]:
#select all columns between index 0 and 2, not including 2
df.select(df.columns[0:2]).show()

+----+----------+
|team|conference|
+----+----------+
|   A|      East|
|   A|      East|
|   A|      East|
|   B|      West|
|   B|      West|
|   C|      East|
+----+----------+



In [15]:
# Example 1: Select Specific Column by Index
#select first column in DataFrame
df.select(df.columns[0]).show()


+----+
|team|
+----+
|   A|
|   A|
|   A|
|   B|
|   B|
|   C|
+----+



In [16]:
#select all columns except first column in DataFrame
df.drop(df.columns[0]).show()

+----------+------+---+
|conference|points| id|
+----------+------+---+
|      East|    11|  1|
|      East|     8|  2|
|      East|    10|  3|
|      West|     6|  4|
|      West|     9|  5|
|      East|     5|  6|
+----------+------+---+



In [18]:
# Example 3: Select Range of Columns by Index

#select all columns between index 0 and 2, not including 2
df.select(df.columns[0:2]).show()

+----+----------+
|team|conference|
+----+----------+
|   A|      East|
|   A|      East|
|   A|      East|
|   B|      West|
|   B|      West|
|   C|      East|
+----+----------+



How to Select Rows Based on Column Values

In [19]:
#select rows where 'team' column is equal to 'B'
df.where(df.team=='B').show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   B|      West|     6|  4|
|   B|      West|     9|  5|
+----+----------+------+---+



In [20]:
#select rows where 'team' column is equal to 'A' or 'B'
df.filter(df.team.isin('A','B')).show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|    11|  1|
|   A|      East|     8|  2|
|   A|      East|    10|  3|
|   B|      West|     6|  4|
|   B|      West|     9|  5|
+----+----------+------+---+



In [21]:
#select rows where 'team' column is 'A' and 'points' column is greater than 9
df.where((df.team=='A') & (df.points>9)).show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|    11|  1|
|   A|      East|    10|  3|
+----+----------+------+---+



In [22]:
#select rows where 'team' column is equal to 'B'
df.where(df.team=='B').show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   B|      West|     6|  4|
|   B|      West|     9|  5|
+----+----------+------+---+



In [23]:
#select rows where 'team' column is equal to 'A' or 'B'
df.filter(df.team.isin('A','B')).show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|    11|  1|
|   A|      East|     8|  2|
|   A|      East|    10|  3|
|   B|      West|     6|  4|
|   B|      West|     9|  5|
+----+----------+------+---+



In [24]:
#select rows where 'team' column is 'A' and 'points' column is greater than 9
df.where((df.team=='A') & (df.points>9)).show()

+----+----------+------+---+
|team|conference|points| id|
+----+----------+------+---+
|   A|      East|    11|  1|
|   A|      East|    10|  3|
+----+----------+------+---+



How to Keep Certain Columns in PySpark

In [25]:
from pyspark.sql.functions import col

#only keep columns 'col1' and 'col2'
df.select(col('team'), col('points')).show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   A|     8|
|   A|    10|
|   B|     6|
|   B|     9|
|   C|     5|
+----+------+



In [26]:
from pyspark.sql.functions import col

df.drop(col('conference'), col('assists')).show()

+----+------+---+
|team|points| id|
+----+------+---+
|   A|    11|  1|
|   A|     8|  2|
|   A|    10|  3|
|   B|     6|  4|
|   B|     9|  5|
|   C|     5|  6|
+----+------+---+



How to Select Multiple Columns in PySpark

In [27]:
#select 'team' and 'points' columns
df.select('team', 'points').show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   A|     8|
|   A|    10|
|   B|     6|
|   B|     9|
|   C|     5|
+----+------+



In [28]:
#define list of columns to select
select_cols = ['team', 'points']
#select all columns in list
df.select(*select_cols).show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   A|     8|
|   A|    10|
|   B|     6|
|   B|     9|
|   C|     5|
+----+------+



In [29]:
#select all columns between index 0 and 2 ( not including 2)
df.select(df.columns[0:2]).show()

+----+----------+
|team|conference|
+----+----------+
|   A|      East|
|   A|      East|
|   A|      East|
|   B|      West|
|   B|      West|
|   C|      East|
+----+----------+



In [30]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]
#define column names
columns = ['team', 'conference', 'points', 'assists']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



How to Do a Left Join in PySpark (With
Example)

In [31]:
# Syntax
# df_joined = df1.join(df2, on=['team'], how='left').show()

In [32]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data1 = [['Mavs', 11],
         ['Hawks', 25],
         ['Nets', 32],
         ['Kings', 15],
         ['Warriors', 22],
         ['Suns', 17]]

#define column names
columns1 = ['team', 'points']

#create dataframe using data and column names
df1 = spark.createDataFrame(data1, columns1)

#view dataframe
df1.show()

+--------+------+
|    team|points|
+--------+------+
|    Mavs|    11|
|   Hawks|    25|
|    Nets|    32|
|   Kings|    15|
|Warriors|    22|
|    Suns|    17|
+--------+------+



In [33]:
#define daata

data2 = [['Mavs', 4],
         ['Nets', 7],
         ['Suns', 8],
         ['Grizzlies', 12],
         ['kings', 7]]

#define column names
columns2 = ['team', 'assists']

# create dataframe using data and ccolumn names
df2 = spark.createDataFrame(data2, columns2)

#view dataframe
df2.show()



+---------+-------+
|     team|assists|
+---------+-------+
|     Mavs|      4|
|     Nets|      7|
|     Suns|      8|
|Grizzlies|     12|
|    kings|      7|
+---------+-------+



In [34]:
#perform left join using 'team' column
df_joined = df1.join(df2, on=['team'], how='left').show()


+--------+------+-------+
|    team|points|assists|
+--------+------+-------+
|    Mavs|    11|      4|
|    Nets|    32|      7|
|   Hawks|    25|   NULL|
|   Kings|    15|   NULL|
|Warriors|    22|   NULL|
|    Suns|    17|      8|
+--------+------+-------+



PySpark: How to Do a Left Join on Multiple
Columns

In [35]:
# syntax
#df_joined = df1.join(df2, on=[df1.col1==df2.col1, df1.col2==df2.col2], how='left')

In [36]:
#Left Join on Multiple Columns in PySpark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data1 = [['A', 'G', 18],
         ['A', 'F', 22],
         ['B', 'F', 19],
         ['B', 'G', 14]]


#define column names
columns1 = ['team', 'pos', 'points']

#create dataframe using data and column names
df1 = spark.createDataFrame(data1, columns1)

#view dataframe
df1.show()

+----+---+------+
|team|pos|points|
+----+---+------+
|   A|  G|    18|
|   A|  F|    22|
|   B|  F|    19|
|   B|  G|    14|
+----+---+------+



In [37]:
#define data
data2 = [['A', 'G', 4],
         ['A', 'F', 9],
         ['B', 'F', 8],
         ['C', 'G', 6],
         ['C', 'F', 5]]
#define column names
columns2 = ['team_name', 'position', 'assists']

#create dataframe using data and column names
df2 = spark.createDataFrame(data2, columns2)

#view dataframe
df2.show()

+---------+--------+-------+
|team_name|position|assists|
+---------+--------+-------+
|        A|       G|      4|
|        A|       F|      9|
|        B|       F|      8|
|        C|       G|      6|
|        C|       F|      5|
+---------+--------+-------+



In [38]:
#perform left join
df_joined = df1.join(df2, on=[df1.team==df2.team_name,df1.pos==df2.position], how='left')

#view resulting DataFrame
df_joined.show()

+----+---+------+---------+--------+-------+
|team|pos|points|team_name|position|assists|
+----+---+------+---------+--------+-------+
|   A|  G|    18|        A|       G|      4|
|   A|  F|    22|        A|       F|      9|
|   B|  G|    14|     NULL|    NULL|   NULL|
|   B|  F|    19|        B|       F|      8|
+----+---+------+---------+--------+-------+



In [39]:
#drop 'team_name' and 'position' columns from joined DataFrame
df_joined.drop('team_name', 'position').show()

+----+---+------+-------+
|team|pos|points|assists|
+----+---+------+-------+
|   A|  G|    18|      4|
|   A|  F|    22|      9|
|   B|  G|    14|   NULL|
|   B|  F|    19|      8|
+----+---+------+-------+



How to Do a Right Join in PySpark

In [40]:
#  SYNTAX
# df_joined = df1.join(df2, on=['team'], how='right ').show()

In [41]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data1 = [['Mavs', 11],
         ['Hawks', 25],
         ['Nets', 32],
         ['kINGS', 15],
         ['Warriors', 22],
         ['Suns', 17]]

#define column names
columns1 = ['team', 'points']

#create dataframe using data and column names
df1 = spark.createDataFrame(data1, columns1)

#view dataframe
df1.show()

+--------+------+
|    team|points|
+--------+------+
|    Mavs|    11|
|   Hawks|    25|
|    Nets|    32|
|   kINGS|    15|
|Warriors|    22|
|    Suns|    17|
+--------+------+



In [42]:
#define data
data2 = [['Mavs', 4],
        ['Nets', 7],
        ['Suns', 8],
        ['Grizzlies', 12],
        ['Kings', 7]]
#define column names
columns2 = ['team', 'assists']

#create dataframe using data and column names
df2 = spark.createDataFrame(data2, columns2)

#view dataframe
df2.show()

+---------+-------+
|     team|assists|
+---------+-------+
|     Mavs|      4|
|     Nets|      7|
|     Suns|      8|
|Grizzlies|     12|
|    Kings|      7|
+---------+-------+



In [43]:
df_joined = df1.join(df2, on=['team'], how='right').show()

+---------+------+-------+
|     team|points|assists|
+---------+------+-------+
|     Mavs|    11|      4|
|     Nets|    32|      7|
|    Kings|  NULL|      7|
|Grizzlies|  NULL|     12|
|     Suns|    17|      8|
+---------+------+-------+



How to Perform an Anti-Join in PySpark

In [44]:
'''An anti-join allows you to return all rows in one DataFrame that do not have matching values in
another DataFrame.'''

'An anti-join allows you to return all rows in one DataFrame that do not have matching values in\nanother DataFrame.'

In [45]:
# SYNTAX
# df_anti_join = df1.join(df2, on=['team'], how='left_anti')

In [46]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


#define data
data1 = [['A', 18],
         ['B', 22],
         ['C', 19],
         ['D', 14],
         ['E', 30]]


#define column names
columns1 = ['team', 'points']


#create dataframe using data and column names
df1 = spark.createDataFrame(data1, columns1)


#view dataframe
df1.show()

+----+------+
|team|points|
+----+------+
|   A|    18|
|   B|    22|
|   C|    19|
|   D|    14|
|   E|    30|
+----+------+



In [47]:
#define data
data2 = [['A', 18],
         ['B', 22],
         ['C', 19],
         ['F', 22],
         ['G', 29]]

#define column names
columns2 = ['team', 'points']

#create dataframe using data and column names
df2 = spark.createDataFrame(data2, columns2)

#view dataframe
df2.show()

+----+------+
|team|points|
+----+------+
|   A|    18|
|   B|    22|
|   C|    19|
|   F|    22|
|   G|    29|
+----+------+



In [48]:
#perform anti-join
df_anti_join = df1.join(df2, on=['team'], how='left_anti')

#view resulting DataFrame
df_anti_join.show()

+----+------+
|team|points|
+----+------+
|   E|    30|
|   D|    14|
+----+------+



How to Do an Outer Join in PySpark (With
Example)

In [49]:
#SYNTAX
#df_joined = df1.join(df2, on=['team'], how='full').show()

In [50]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data1 = [['Mavs', 11],
         ['Hawks', 25],
         ['Nets', 32],
         ['Kings', 15],
         ['Warriors', 22],
         ['Suns', 17]]
#define column names
columns1 = ['team', 'points']

#create dataframe using data and column names
df1 = spark.createDataFrame(data1, columns1)

#view dataframe
df1.show()

+--------+------+
|    team|points|
+--------+------+
|    Mavs|    11|
|   Hawks|    25|
|    Nets|    32|
|   Kings|    15|
|Warriors|    22|
|    Suns|    17|
+--------+------+



In [51]:
#define data
data2 = [['Mavs', 4],
         ['Nets', 7],
         ['Suns', 8],
         ['Grizzlies', 12],
         ['Kings', 7]]
#define column names
columns2 = ['team', 'assists']

#create dataframe using data and column names
df2 = spark.createDataFrame(data2, columns2)


#view dataframe
df2.show()

+---------+-------+
|     team|assists|
+---------+-------+
|     Mavs|      4|
|     Nets|      7|
|     Suns|      8|
|Grizzlies|     12|
|    Kings|      7|
+---------+-------+



In [52]:
#perform outer join using 'team' column
df_joined = df1.join(df2, on=['team'], how='full').show()

+---------+------+-------+
|     team|points|assists|
+---------+------+-------+
|Grizzlies|  NULL|     12|
|    Hawks|    25|   NULL|
|    Kings|    15|      7|
|     Mavs|    11|      4|
|     Nets|    32|      7|
|     Suns|    17|      8|
| Warriors|    22|   NULL|
+---------+------+-------+



How to Do an Inner Join in PySpark (With
Example)

In [53]:
#You can use the following basic syntax to perform an inner join in PySpark:
# df_joined = df1.join(df2, on=['team'], how='inner').show()

In [54]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data1 = [['Mavs', 11],
         ['Hawks', 25],
         ['Nets', 32],
         ['Kings', 15],
         ['Warriors', 22],
         ['Suns', 17]]

#define column names
columns1 = ['team', 'points']


#create dataframe using data and column names
df1 = spark.createDataFrame(data1, columns1)

#view dataframe
df1.show()

+--------+------+
|    team|points|
+--------+------+
|    Mavs|    11|
|   Hawks|    25|
|    Nets|    32|
|   Kings|    15|
|Warriors|    22|
|    Suns|    17|
+--------+------+



In [55]:
#define data
data2 = [['Mavs', 4],
        ['Nets', 7],
        ['Suns', 8],
        ['Grizzlies', 12],
        ['Kings', 7]]
#define column names
columns2 = ['team', 'assists']

#create dataframe using data and column names
df2 = spark.createDataFrame(data2, columns2)

#view dataframe
df2.show()

+---------+-------+
|     team|assists|
+---------+-------+
|     Mavs|      4|
|     Nets|      7|
|     Suns|      8|
|Grizzlies|     12|
|    Kings|      7|
+---------+-------+



In [56]:
#perform inner join using 'team' column
df_joined = df1.join(df2, on=['team'], how='inner').show()

+-----+------+-------+
| team|points|assists|
+-----+------+-------+
|Kings|    15|      7|
| Mavs|    11|      4|
| Nets|    32|      7|
| Suns|    17|      8|
+-----+------+-------+



PySpark: How to Join on Different Column
Names

In [57]:
# SYNTAX
#df3 = df1.withColumn('id',col('team_id')).join(df2.withColumn('id', col('team_name')),on='id')

In [58]:
#How to Join on Different Column Names in PySpark

In [59]:
data = [['Mavs', 18],
['Nets', 33],
['Lakers', 12],
['Kings', 15],
['Hawks', 19],
['Wizards', 24],
['Magic', 28]]
#define column names
columns = ['team_ID', 'points']

#create dataframe using data and column names
df1 = spark.createDataFrame(data, columns)

#view dataframe
df1.show()

+-------+------+
|team_ID|points|
+-------+------+
|   Mavs|    18|
|   Nets|    33|
| Lakers|    12|
|  Kings|    15|
|  Hawks|    19|
|Wizards|    24|
|  Magic|    28|
+-------+------+



In [60]:
#define data
data = [['Hawks', 4],
['Wizards', 5],
['Raptors', 5],
['Kings', 12],
['Mavs', 7],
['Nets', 11],
['Magic', 3]]

#define column names
columns = ['team_name', 'assists']

#create dataframe using data and column names
df2 = spark.createDataFrame(data, columns)

#view dataframe
df2.show()

+---------+-------+
|team_name|assists|
+---------+-------+
|    Hawks|      4|
|  Wizards|      5|
|  Raptors|      5|
|    Kings|     12|
|     Mavs|      7|
|     Nets|     11|
|    Magic|      3|
+---------+-------+



In [61]:
#join df1 and df2 on different column names
df3 = df1.withColumn('id',
col('team_id')).join(df2.withColumn('id', col('team_name')),
on='id')

#view resulting DataFrame
df3.show()

+-------+-------+------+---------+-------+
|     id|team_ID|points|team_name|assists|
+-------+-------+------+---------+-------+
|  Hawks|  Hawks|    19|    Hawks|      4|
|  Kings|  Kings|    15|    Kings|     12|
|  Magic|  Magic|    28|    Magic|      3|
|   Mavs|   Mavs|    18|     Mavs|      7|
|   Nets|   Nets|    33|     Nets|     11|
|Wizards|Wizards|    24|  Wizards|      5|
+-------+-------+------+---------+-------+



In [62]:
#join df1 and df2 on different column names
df3 = df1.withColumn('id',col('team_id')).join(df2.withColumn('id', col('team_name')),on='id').select('id', 'points', 'assists')

In [63]:
#view resulting DataFrame
df3.show()

+-------+------+-------+
|     id|points|assists|
+-------+------+-------+
|  Hawks|    19|      4|
|  Kings|    15|     12|
|  Magic|    28|      3|
|   Mavs|    18|      7|
|   Nets|    33|     11|
|Wizards|    24|      5|
+-------+------+-------+



How to Print One Column of a PySpark
DataFrame

In [64]:
#print 'conference' column (with column name)
df.select('conference').show()

+----------+
|conference|
+----------+
|      East|
|      East|
|      East|
|      West|
|      West|
|      East|
+----------+



In [65]:
#print values only from 'conference' column
df.select('conference').rdd.flatMap(list).collect()

['East', 'East', 'East', 'West', 'West', 'East']

How to Check if Column Contains
String

In [66]:
#check if 'conference' column contains exact string 'Eas' in any row
df.where(df.conference=='East').count()>0

True

In [67]:
#check if 'conference' column contains partial string 'Eas' in any row
df.filter(df.conference.contains('East')).count()>0

True

In [68]:
#count occurrences of partial string 'Eas' in 'conference' column
df.filter(df.conference.contains('Eas')).count()

4

In [69]:
#define data
data = [['A', 'East', 11],
['A', 'East', 8],
['A', 'East', 10],
['B', 'West', 6],
['B', 'West', 6],
['C', 'East', 5]]

#define column names
columns = ['team', 'conference', 'points']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)


#view dataframe
df.show()

+----+----------+------+
|team|conference|points|
+----+----------+------+
|   A|      East|    11|
|   A|      East|     8|
|   A|      East|    10|
|   B|      West|     6|
|   B|      West|     6|
|   C|      East|     5|
+----+----------+------+



In [70]:
#check if column name 'points' exists in the DataFrame
'points' in df.columns

True

How to Check if DataFrame is
Empty

In [71]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, FloatType

#create empty RDD
empty_rdd=spark.sparkContext.emptyRDD()


#specify colum names and types
my_columns=[StructField('team', StringType(),True),
StructField('position', StringType(),True),
StructField('points', FloatType(),True)]

#create DataFrame with specific column names
df=spark.createDataFrame([], schema=StructType(my_columns))

#view DataFrame
df.show()

+----+--------+------+
|team|position|points|
+----+--------+------+
+----+--------+------+



In [72]:
#check if DataFrame is empty
print(df.count() == 0)
True

True


True

In [73]:
#define data
data = [['Mavs', 18],
['Nets', 33],
['Lakers', 12],
['Mavs', 15],
['Cavs', 19],
['Wizards', 24],]
#define column names
columns = ['team', 'points']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+-------+------+
|   team|points|
+-------+------+
|   Mavs|    18|
|   Nets|    33|
| Lakers|    12|
|   Mavs|    15|
|   Cavs|    19|
|Wizards|    24|
+-------+------+



In [74]:
#check if DataFrame is empty
print(df.count() == 0)

False


How to Check Data Type of
Columns in DataFrame

In [75]:
df.show()

+-------+------+
|   team|points|
+-------+------+
|   Mavs|    18|
|   Nets|    33|
| Lakers|    12|
|   Mavs|    15|
|   Cavs|    19|
|Wizards|    24|
+-------+------+



In [76]:
#return data type of 'conference' column
dict(df.dtypes)['team']

'string'

In [77]:
#return data type of all columns
df.dtypes

[('team', 'string'), ('points', 'bigint')]

In [78]:
#define data
data = [['A', 'East', 11, 4],
        ['A', None, 8, 9],
        ['A', 'East', 10, 3],
        ['B', 'West', None, 12],
        ['B', 'West', None, 4],
        ['C', 'East', 5, 2]]


#define column names
columns = ['team', 'conference', 'points', 'assists']


#create dataframe using data and column names
df = spark.createDataFrame(data, columns)


#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      NULL|     8|      9|
|   A|      East|    10|      3|
|   B|      West|  NULL|     12|
|   B|      West|  NULL|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [79]:
dict(df.dtypes)['conference']

'string'

In [80]:
df.dtypes

[('team', 'string'),
 ('conference', 'string'),
 ('points', 'bigint'),
 ('assists', 'bigint')]

PySpark: How to Drop Multiple Columns
from DataFrame

In [81]:
#drop 'team' and 'points' columns
df.drop('team', 'points').show()

+----------+-------+
|conference|assists|
+----------+-------+
|      East|      4|
|      NULL|      9|
|      East|      3|
|      West|     12|
|      West|      4|
|      East|      2|
+----------+-------+



In [82]:
#define list of columns to drop
drop_cols = ['team', 'points']

#drop all columns in list
df.select(*drop_cols).show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   A|     8|
|   A|    10|
|   B|  NULL|
|   B|  NULL|
|   C|     5|
+----+------+



In [83]:
#define data
data = [['A', 'East', 11, 4],
      ['A', 'East', 8, 9],
        ['A', 'East', 10, 3],
      ['B', 'West', 6, 12],
      ['B', 'West', 6, 4],
      ['C', 'East', 5, 2]]
#define column names
columns = ['team', 'conference', 'points', 'assists']


#create dataframe using data and column names
df = spark.createDataFrame(data, columns)


#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [84]:

# Drop Multiple Columns by Name

#drop 'team' and 'points' columns
df.drop('team', 'points').show()

+----------+-------+
|conference|assists|
+----------+-------+
|      East|      4|
|      East|      9|
|      East|      3|
|      West|     12|
|      West|      4|
|      East|      2|
+----------+-------+



How to Drop Duplicate Rows from
DataFrame

In [85]:
#define data
data = [['A', 'Guard', 11],
['A', 'Guard', 8],
['A', 'Forward', 22],
['A', 'Forward', 22],
['B', 'Guard', 14],
['B', 'Guard', 14],
['B', 'Forward', 13],
['B', 'Forward', 7]]


#define column names
columns = ['team', 'position', 'points']


#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+--------+------+
|team|position|points|
+----+--------+------+
|   A|   Guard|    11|
|   A|   Guard|     8|
|   A| Forward|    22|
|   A| Forward|    22|
|   B|   Guard|    14|
|   B|   Guard|    14|
|   B| Forward|    13|
|   B| Forward|     7|
+----+--------+------+



In [86]:
#drop rows that have duplicate values across all columns
df_new = df.dropDuplicates()
#view DataFrame without duplicates
df_new.show()

+----+--------+------+
|team|position|points|
+----+--------+------+
|   A|   Guard|     8|
|   A|   Guard|    11|
|   A| Forward|    22|
|   B| Forward|     7|
|   B|   Guard|    14|
|   B| Forward|    13|
+----+--------+------+



In [87]:
#drop rows that have duplicate values across 'team' and 'position'
columns
df_new = df.dropDuplicates(['team', 'position'])
#view DataFrame without duplicates
df_new.show()

+----+--------+------+
|team|position|points|
+----+--------+------+
|   A| Forward|    22|
|   A|   Guard|    11|
|   B| Forward|    13|
|   B|   Guard|    14|
+----+--------+------+



In [88]:
#drop rows that have duplicate values in 'team' column
df_new = df.dropDuplicates(['team'])
#view DataFrame without duplicates
df_new.show()

+----+--------+------+
|team|position|points|
+----+--------+------+
|   A|   Guard|    11|
|   B|   Guard|    14|
+----+--------+------+



How to Select Distinct Rows in PySpark
(With Examples)

In [89]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'Guard', 11],
['A', 'Guard', 8],
['A', 'Forward', 22],
['A', 'Forward', 22],
['B', 'Guard', 14],
['B', 'Guard', 14],
['B', 'Forward', 13],
['B', 'Forward', 7]]
#define column names
columns = ['team', 'position', 'points']
#create DataFrame using data and column names
df = spark.createDataFrame(data, columns)
#view DataFrame
df.show()

+----+--------+------+
|team|position|points|
+----+--------+------+
|   A|   Guard|    11|
|   A|   Guard|     8|
|   A| Forward|    22|
|   A| Forward|    22|
|   B|   Guard|    14|
|   B|   Guard|    14|
|   B| Forward|    13|
|   B| Forward|     7|
+----+--------+------+



In [90]:
#display distinct rows only
df.distinct().show()

+----+--------+------+
|team|position|points|
+----+--------+------+
|   A|   Guard|     8|
|   A|   Guard|    11|
|   A| Forward|    22|
|   B| Forward|     7|
|   B|   Guard|    14|
|   B| Forward|    13|
+----+--------+------+



In [91]:
#display distinct values from 'team' column only
df.select('team').distinct().show()

+----+
|team|
+----+
|   A|
|   B|
+----+



In [92]:
#count number of distinct rows
df.distinct().count()

6

PySpark: How to Select Columns with Alias

In [93]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]
#define column names
columns = ['team', 'conference', 'points', 'assists']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [94]:
#select 'team' column and display using aliased name of 'team_name'
df.select(df.team.alias('team_name')).show()

+---------+
|team_name|
+---------+
|        A|
|        A|
|        A|
|        B|
|        B|
|        C|
+---------+



How to Select Top N Rows in PySpark
DataFrame (With Examples)

In [95]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11],
['A', 'East', 8],
['A', 'East', 10],
['B', 'West', 6],
['B', 'West', 6],
['C', 'East', 5]]
#define column names
columns = ['team', 'conference', 'points']
#create DataFrame using data and column names
df = spark.createDataFrame(data, columns)
#view DataFrame
df.show()

+----+----------+------+
|team|conference|points|
+----+----------+------+
|   A|      East|    11|
|   A|      East|     8|
|   A|      East|    10|
|   B|      West|     6|
|   B|      West|     6|
|   C|      East|     5|
+----+----------+------+



In [96]:
#select top 3 rows from DataFrame
df.take(3)

[Row(team='A', conference='East', points=11),
 Row(team='A', conference='East', points=8),
 Row(team='A', conference='East', points=10)]

In [97]:
#select top 3 rows from DataFrame
df.limit(3).show()

+----+----------+------+
|team|conference|points|
+----+----------+------+
|   A|      East|    11|
|   A|      East|     8|
|   A|      East|    10|
+----+----------+------+



In [98]:
#select top 3 rows from DataFrame only for 'team' and 'points columns
df.select('team', 'points').limit(3).show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   A|     8|
|   A|    10|
+----+------+



PySpark: Select All Columns Except Specific
Ones

In [99]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],['B', 'West', 6, 4],
['C', 'East', 5, 2]]
#define column names
columns = ['team', 'conference', 'points', 'assists']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [100]:
#select all columns except 'conference' column
df.drop('conference').show()

+----+------+-------+
|team|points|assists|
+----+------+-------+
|   A|    11|      4|
|   A|     8|      9|
|   A|    10|      3|
|   B|     6|     12|
|   B|     6|      4|
|   C|     5|      2|
+----+------+-------+



In [101]:
# select all columns except 'conference' and 'assists' column
df.drop('conference', 'assists').show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   A|     8|
|   A|    10|
|   B|     6|
|   B|     6|
|   C|     5|
+----+------+



In [102]:
# How to Use a Case Statement in PySpark (With Example)

In [103]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 6],
['B', 8],
['C', 9],
['D', 9],
['E', 12],
['F', 14],
['G', 15],
['H', 17],
['I', 19],
['J', 22]]
#define column names
columns = ['player', 'points']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+------+------+
|player|points|
+------+------+
|     A|     6|
|     B|     8|
|     C|     9|
|     D|     9|
|     E|    12|
|     F|    14|
|     G|    15|
|     H|    17|
|     I|    19|
|     J|    22|
+------+------+



In [105]:
from pyspark.sql.functions import when
df.withColumn('class',when(df.points<9, 'Bad').when(df.points<12,'OK').when(df.points<15, 'Good').otherwise('Great')).show()

+------+------+-----+
|player|points|class|
+------+------+-----+
|     A|     6|  Bad|
|     B|     8|  Bad|
|     C|     9|   OK|
|     D|     9|   OK|
|     E|    12| Good|
|     F|    14| Good|
|     G|    15|Great|
|     H|    17|Great|
|     I|    19|Great|
|     J|    22|Great|
+------+------+-----+



PySpark: How to Convert Column from Date
to String

In [106]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import datetime
#define data
data = [[datetime.date(2023, 10, 30), 136],
[datetime.date(2023, 11, 14), 223],
[datetime.date(2023, 11, 22), 450],
[datetime.date(2023, 11, 25), 290],
[datetime.date(2023, 12, 19), 189]]
#define column names
columns = ['date', 'sales']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe with full column content
df.show()

+----------+-----+
|      date|sales|
+----------+-----+
|2023-10-30|  136|
|2023-11-14|  223|
|2023-11-22|  450|
|2023-11-25|  290|
|2023-12-19|  189|
+----------+-----+



In [107]:
#check data type of each column
df.dtypes

[('date', 'date'), ('sales', 'bigint')]

In [108]:
from pyspark.sql.functions import date_format
#create new column that converts dates to strings
df_new = df.withColumn('date_string', date_format('date','MM/dd/yyyy'))
#view new DataFrame
df_new.show()

+----------+-----+-----------+
|      date|sales|date_string|
+----------+-----+-----------+
|2023-10-30|  136| 10/30/2023|
|2023-11-14|  223| 11/14/2023|
|2023-11-22|  450| 11/22/2023|
|2023-11-25|  290| 11/25/2023|
|2023-12-19|  189| 12/19/2023|
+----------+-----+-----------+



In [109]:
#check data type of each column
df.dtypes

[('date', 'date'), ('sales', 'bigint')]

How to Convert String to Date in PySpark
(With Example)

In [113]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['2023-01-15', 225],
['2023-02-24', 260],
['2023-07-14', 413],
['2023-10-30', 368]]

#define column names
columns = ['date', 'sales']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----------+-----+
|      date|sales|
+----------+-----+
|2023-01-15|  225|
|2023-02-24|  260|
|2023-07-14|  413|
|2023-10-30|  368|
+----------+-----+



In [114]:
#check data type of each column
df.dtypes

[('date', 'string'), ('sales', 'bigint')]

In [115]:
from pyspark.sql import functions as F

#convert 'date' column from string to date
df = df.withColumn('date', F.to_date('date'))

#view updated DataFrame
df.show()

+----------+-----+
|      date|sales|
+----------+-----+
|2023-01-15|  225|
|2023-02-24|  260|
|2023-07-14|  413|
|2023-10-30|  368|
+----------+-----+



In [116]:
#check data type of each column
df.dtypes

[('date', 'date'), ('sales', 'bigint')]

How to Convert String to Timestamp in
PySpark (With Example)

In [117]:
#define data
data = [['2023-01-15 04:14:22', 225],
['2023-02-24 10:55:01', 260],
['2023-07-14 18:34:59', 413],
['2023-10-30 22:20:05', 368]]

#define column names
columns = ['ts', 'sales']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+-------------------+-----+
|                 ts|sales|
+-------------------+-----+
|2023-01-15 04:14:22|  225|
|2023-02-24 10:55:01|  260|
|2023-07-14 18:34:59|  413|
|2023-10-30 22:20:05|  368|
+-------------------+-----+



In [118]:
#check data type of each column
df.dtypes

[('ts', 'string'), ('sales', 'bigint')]

In [119]:
from pyspark.sql import functions as F
#convert 'ts' column from string to timestamp
df = df.withColumn('ts_new', F.to_timestamp('ts', 'yyyy-MM-dd HH:mm:ss'))#view updated DataFrame
df.show()

+-------------------+-----+-------------------+
|                 ts|sales|             ts_new|
+-------------------+-----+-------------------+
|2023-01-15 04:14:22|  225|2023-01-15 04:14:22|
|2023-02-24 10:55:01|  260|2023-02-24 10:55:01|
|2023-07-14 18:34:59|  413|2023-07-14 18:34:59|
|2023-10-30 22:20:05|  368|2023-10-30 22:20:05|
+-------------------+-----+-------------------+



In [120]:
#check data type of each column
df.dtypes

[('ts', 'string'), ('sales', 'bigint'), ('ts_new', 'timestamp')]

How to Convert Timestamp to Date in
PySpark (With Example)

In [121]:
from pyspark.sql import functions as F
#define data
data = [['2023-01-15 04:14:22', 225],
['2023-02-24 10:55:01', 260],
['2023-07-14 18:34:59', 413],
['2023-10-30 22:20:05', 368]]

#define column names
columns = ['ts', 'sales']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#convert string column to timestamp
df = df.withColumn('ts', F.to_timestamp('ts', 'yyyy-MM-dd HH:mm:ss'))

#view dataframe
df.show()

+-------------------+-----+
|                 ts|sales|
+-------------------+-----+
|2023-01-15 04:14:22|  225|
|2023-02-24 10:55:01|  260|
|2023-07-14 18:34:59|  413|
|2023-10-30 22:20:05|  368|
+-------------------+-----+



In [122]:
#check data type of each column
df.dtypes

[('ts', 'timestamp'), ('sales', 'bigint')]

In [123]:
from pyspark.sql.types import DateType#create date column from timestamp column
df = df.withColumn('new_date', df['ts'].cast(DateType()))
#view updated DataFrame
df.show()

+-------------------+-----+----------+
|                 ts|sales|  new_date|
+-------------------+-----+----------+
|2023-01-15 04:14:22|  225|2023-01-15|
|2023-02-24 10:55:01|  260|2023-02-24|
|2023-07-14 18:34:59|  413|2023-07-14|
|2023-10-30 22:20:05|  368|2023-10-30|
+-------------------+-----+----------+



How to Convert String to Integer in PySpark
(With Example)

In [124]:
#define data
data = [['A', '11'],
['B', '19'],
['C', '22'],
['D', '25'],
['E', '12'],
['F', '41'],
['G', '32'],
['H', '20']]

#define column names
columns = ['team', 'points']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   B|    19|
|   C|    22|
|   D|    25|
|   E|    12|
|   F|    41|
|   G|    32|
|   H|    20|
+----+------+



In [125]:
#check data type of each column
df.dtypes

[('team', 'string'), ('points', 'string')]

In [126]:
from pyspark.sql.types import IntegerType

#create integer column from string column
df = df.withColumn('points_integer',
df['points'].cast(IntegerType()))

#view updated DataFrame
df.show()

+----+------+--------------+
|team|points|points_integer|
+----+------+--------------+
|   A|    11|            11|
|   B|    19|            19|
|   C|    22|            22|
|   D|    25|            25|
|   E|    12|            12|
|   F|    41|            41|
|   G|    32|            32|
|   H|    20|            20|
+----+------+--------------+



In [127]:
#check data type of each column
df.dtypes

[('team', 'string'), ('points', 'string'), ('points_integer', 'int')]

How to Convert Integer to String in PySpark
(With Example)

In [128]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['A', 11],
['B', 19],
['C', 22],
['D', 25],
['E', 12],
['F', 41],
['G', 32],
['H', 20]]

#define column names
columns = ['team', 'points']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+------+
|team|points|
+----+------+
|   A|    11|
|   B|    19|
|   C|    22|
|   D|    25|
|   E|    12|
|   F|    41|
|   G|    32|
|   H|    20|
+----+------+



In [129]:
#check data type of each column
df.dtypes

[('team', 'string'), ('points', 'bigint')]

In [130]:
from pyspark.sql.types import StringType

#create string column from integer column
df = df.withColumn('points_string',
df['points'].cast(StringType()))

#view updated DataFrame
df.show()

+----+------+-------------+
|team|points|points_string|
+----+------+-------------+
|   A|    11|           11|
|   B|    19|           19|
|   C|    22|           22|
|   D|    25|           25|
|   E|    12|           12|
|   F|    41|           41|
|   G|    32|           32|
|   H|    20|           20|
+----+------+-------------+



In [131]:
#check data type of each column
df.dtypes

[('team', 'string'), ('points', 'bigint'), ('points_string', 'string')]

PySpark: How to Convert RDD to
DataFrame (With Example)

In [132]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [('A', 11),
('B', 19),
('C', 22),
('D', 25),
('E', 12),
('F', 41)]
#create RDD using data
my_RDD = spark.sparkContext.parallelize(data)

In [133]:
#check object type
type(my_RDD)

pyspark.rdd.RDD

In [134]:
#convert RDD to DataFrame
my_df = my_RDD.toDF()

In [135]:
#view DataFrame
my_df.show()

+---+---+
| _1| _2|
+---+---+
|  A| 11|
|  B| 19|
|  C| 22|
|  D| 25|
|  E| 12|
|  F| 41|
+---+---+



In [136]:
#check object type
type(my_df)

pyspark.sql.dataframe.DataFrame

In [137]:
#convert RDD to DataFrame with specific column names
my_df = my_RDD.toDF(['player', 'assists'])

#view DataFrame
my_df.show()

+------+-------+
|player|assists|
+------+-------+
|     A|     11|
|     B|     19|
|     C|     22|
|     D|     25|
|     E|     12|
|     F|     41|
+------+-------+



PySpark: How to Convert Column to
Lowercase

In [138]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]

#define column names
columns = ['team', 'conference', 'points', 'assists']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [140]:
from pyspark.sql.functions import lower

#convert 'conference' column to lowercase
df = df.withColumn('conference', lower(df['conference']))

#view updated DataFrame
df.show()


+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      east|    11|      4|
|   A|      east|     8|      9|
|   A|      east|    10|      3|
|   B|      west|     6|     12|
|   B|      west|     6|      4|
|   C|      east|     5|      2|
+----+----------+------+-------+



PySpark: How to Convert Column to
Uppercase

In [141]:
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]

#define column names
columns = ['team', 'conference', 'points', 'assists']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [142]:
from pyspark.sql.functions import upper

#convert 'conference' column to uppercase
df = df.withColumn('conference', upper(df['conference']))

#view updated DataFrame
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      EAST|    11|      4|
|   A|      EAST|     8|      9|
|   A|      EAST|    10|      3|
|   B|      WEST|     6|     12|
|   B|      WEST|     6|      4|
|   C|      EAST|     5|      2|
+----+----------+------+-------+



How to Use “Is Not Null” in PySpark (With
Examples)

In [143]:
#define data
data = [['A', 'East', 11, 4],['A', None, 8, 9],
['A', 'East', 10, 3],
['B', 'West', None, 12],
['B', 'West', None, 4],
['C', 'East', 5, 2]]

#define column names
columns = ['team', 'conference', 'points', 'assists']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      NULL|     8|      9|
|   A|      East|    10|      3|
|   B|      West|  NULL|     12|
|   B|      West|  NULL|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [144]:
#filter for rows where value is not null in 'points' column
df.filter(df.points.isNotNull()).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      NULL|     8|      9|
|   A|      East|    10|      3|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [145]:
#filter for rows where value is not null in any column
df.dropna().show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|    10|      3|
|   C|      East|     5|      2|
+----+----------+------+-------+



How to Use “IS NOT IN” in PySpark (With
Example)

In [146]:
#define array of values
my_array = ['A', 'D', 'E']
#filter DataFrame to only contain rows where 'team' is not in
my_array
df.filter(~df.team.isin(my_array)).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   B|      West|  NULL|     12|
|   B|      West|  NULL|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [147]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2],
['D', 'East', 14, 2],
['E', 'West', 25, 2]]
#define column names
columns = ['team', 'conference', 'points', 'assists']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
|   D|      East|    14|      2|
|   E|      West|    25|      2|
+----+----------+------+-------+



In [148]:
#define array of values
my_array = ['A', 'D', 'E']
#filter DataFrame to only contain rows where 'team' is not in
my_array
df.filter(~df.team.isin(my_array)).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



How to Use “OR” Operator in PySpark
(With Examples)

In [150]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]


#define column names
columns = ['team', 'conference', 'points', 'assists']


#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [151]:
#filter DataFrame where points is greater than 9 or team equals "B"
df.filter('points>9 or team=="B"').show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
+----+----------+------+-------+



In [152]:
#filter DataFrame where points is greater than 9 or team equals "B"
df.filter((df.points>9) | (df.team=="B")).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
+----+----------+------+-------+



How to Use “AND” Operator in PySpark
(With Examples)

In [153]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]

#define column names
columns = ['team', 'conference', 'points', 'assists']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [155]:
#filter DataFrame where points is greater than 5 and conference equals "East"
df.filter((df.points>5) & (df.conference=="East")).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
+----+----------+------+-------+



How to Use “Not Equal” Operator in
PySpark (With Examples)

In [156]:
#filter DataFrame where team is not equal to 'A' and points is not equal to 5
df.filter((df.team!='A') & (df.points!=5)).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   B|      West|     6|     12|
|   B|      West|     6|      4|
+----+----------+------+-------+



In [157]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 'East', 11, 4],
['A', 'East', 8, 9],
['A', 'East', 10, 3],
['B', 'West', 6, 12],
['B', 'West', 6, 4],
['C', 'East', 5, 2]]

#define column names
columns = ['team', 'conference', 'points', 'assists']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   A|      East|    11|      4|
|   A|      East|     8|      9|
|   A|      East|    10|      3|
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [158]:
#filter DataFrame where team is not equal to 'A'
df.filter(df.team!='A').show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   B|      West|     6|     12|
|   B|      West|     6|      4|
|   C|      East|     5|      2|
+----+----------+------+-------+



In [159]:
#filter DataFrame where team is not equal to 'A' and points is not equal to 5
df.filter((df.team!='A') & (df.points!=5)).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
|   B|      West|     6|     12|
|   B|      West|     6|      4|
+----+----------+------+-------+



PySpark: How to Use Case-Insensitive
“Contains”

In [160]:
from pyspark.sql.functions import upper
#perform case-insensitive filter for rows that contain 'AVS' in team column
df.filter(upper(df.team).contains('AVS')).show()

+----+----------+------+-------+
|team|conference|points|assists|
+----+----------+------+-------+
+----+----------+------+-------+



In [161]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['Mavs', 14],
['Nets', 22],
['Nets', 31],
['Cavs', 27],
['CAVS', 26],
['Spurs', 40],
['mavs', 23],
['MAVS', 17],]
#define column names
columns = ['team', 'points']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()

+-----+------+
| team|points|
+-----+------+
| Mavs|    14|
| Nets|    22|
| Nets|    31|
| Cavs|    27|
| CAVS|    26|
|Spurs|    40|
| mavs|    23|
| MAVS|    17|
+-----+------+



In [162]:
#filter DataFrame where team column contains 'AVS'
df.filter(df.team.contains('AVS')).show()

+----+------+
|team|points|
+----+------+
|CAVS|    26|
|MAVS|    17|
+----+------+



In [164]:
from pyspark.sql.functions import upper
#perform case-insensitive filter for rows that contain 'AVS' in team column
df.filter(upper(df.team).contains('AVS')).show()

+----+------+
|team|points|
+----+------+
|Mavs|    14|
|Cavs|    27|
|CAVS|    26|
|mavs|    23|
|MAVS|    17|
+----+------+



In [165]:
#filter DataFrame where team does not contain 'avs'
df.filter(~df.team.contains('avs')).show()

+-----+------+
| team|points|
+-----+------+
| Nets|    22|
| Nets|    31|
| CAVS|    26|
|Spurs|    40|
| MAVS|    17|
+-----+------+



In [166]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['Mavs', 14],
['Nets', 22],
['Nets', 31],
['Cavs', 27],
['Kings', 26],
['Spurs', 40],
['Lakers', 23],
['Spurs', 17],]

#define column names
columns = ['team', 'points']

#create dataframe using data and column names
df = spark.createDataFrame(data, columns)

#view dataframe
df.show()

+------+------+
|  team|points|
+------+------+
|  Mavs|    14|
|  Nets|    22|
|  Nets|    31|
|  Cavs|    27|
| Kings|    26|
| Spurs|    40|
|Lakers|    23|
| Spurs|    17|
+------+------+



In [167]:
#filter DataFrame where team does not contain 'avs'
df.filter(~df.team.contains('avs')).show()

+------+------+
|  team|points|
+------+------+
|  Nets|    22|
|  Nets|    31|
| Kings|    26|
| Spurs|    40|
|Lakers|    23|
| Spurs|    17|
+------+------+

