## Column adding or updating

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

In [3]:
# Create a Spark DataFrame

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',2000)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [4]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [5]:
# let's convert salary to type `int`
from pyspark.sql.functions import col
#in order to deal with columns you need to import columns
#change salary as integer not long data type
df2 = df.withColumn("salary", col("salary").cast("Integer"))
df2.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [8]:
df2.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  2000|
+---------+----------+--------+----------+------+------+



In [9]:
# increase salary
df3 = df2.withColumn("salary",col("salary")*100)
df3.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|300000|
|  Michael|      Rose|        |2000-05-19|     M|400000|
|   Robert|          |Williams|1978-09-05|     M|400000|
|    Maria|      Anne|   Jones|1967-12-01|     F|200000|
+---------+----------+--------+----------+------+------+



In [10]:
#copy a column and update it as well
df4 = df3.withColumn("NetIncome",col("salary")*0.80)
df4.show()

+---------+----------+--------+----------+------+------+---------+
|firstname|middlename|lastname|       dob|gender|salary|NetIncome|
+---------+----------+--------+----------+------+------+---------+
|    James|          |   Smith|1991-04-01|     M|300000| 240000.0|
|  Michael|      Rose|        |2000-05-19|     M|400000| 320000.0|
|   Robert|          |Williams|1978-09-05|     M|400000| 320000.0|
|    Maria|      Anne|   Jones|1967-12-01|     F|200000| 160000.0|
+---------+----------+--------+----------+------+------+---------+



In [11]:
# Create a column of literal value.
from pyspark.sql.functions import lit

df5 = df4.withColumn("Country", lit("USA"))
df5.show()

+---------+----------+--------+----------+------+------+---------+-------+
|firstname|middlename|lastname|       dob|gender|salary|NetIncome|Country|
+---------+----------+--------+----------+------+------+---------+-------+
|    James|          |   Smith|1991-04-01|     M|300000| 240000.0|    USA|
|  Michael|      Rose|        |2000-05-19|     M|400000| 320000.0|    USA|
|   Robert|          |Williams|1978-09-05|     M|400000| 320000.0|    USA|
|    Maria|      Anne|   Jones|1967-12-01|     F|200000| 160000.0|    USA|
+---------+----------+--------+----------+------+------+---------+-------+



In [12]:
#rename a column
df5 = df5.withColumnRenamed("Country","Home Contry")
df5.show()

+---------+----------+--------+----------+------+------+---------+-----------+
|firstname|middlename|lastname|       dob|gender|salary|NetIncome|Home Contry|
+---------+----------+--------+----------+------+------+---------+-----------+
|    James|          |   Smith|1991-04-01|     M|300000| 240000.0|        USA|
|  Michael|      Rose|        |2000-05-19|     M|400000| 320000.0|        USA|
|   Robert|          |Williams|1978-09-05|     M|400000| 320000.0|        USA|
|    Maria|      Anne|   Jones|1967-12-01|     F|200000| 160000.0|        USA|
+---------+----------+--------+----------+------+------+---------+-----------+



In [13]:
#drop a column
df5.drop("Home Contry").show()

+---------+----------+--------+----------+------+------+---------+
|firstname|middlename|lastname|       dob|gender|salary|NetIncome|
+---------+----------+--------+----------+------+------+---------+
|    James|          |   Smith|1991-04-01|     M|300000| 240000.0|
|  Michael|      Rose|        |2000-05-19|     M|400000| 320000.0|
|   Robert|          |Williams|1978-09-05|     M|400000| 320000.0|
|    Maria|      Anne|   Jones|1967-12-01|     F|200000| 160000.0|
+---------+----------+--------+----------+------+------+---------+



# Filtering

In [14]:
# let's create a new df
from pyspark.sql.types import (
    StructType,
    StructField, 
    StringType, 
    IntegerType, 
    ArrayType,
)

arrayStructureData = [
        (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
        (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
        (("Julia","","Williams"),["CSharp","Mathlab"],"OH","F"),
        (("Maria","Anne","Jones"),["CSharp","Mathlab"],"NY","M"),
        (("Jen","Mary","Brown"),["CSharp","Mathlab"],"NY","M"),
        (("Mike","Mary","Williams"),["Python","Mathlab"],"OH","M")
        ]

# note that we can create nested structures here too
arrayStructureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('languages', ArrayType(StringType()), True),
         StructField('state', StringType(), True),
         StructField('gender', StringType(), True)
         ])


df = spark.createDataFrame(data = arrayStructureData, schema = arrayStructureSchema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, Mathlab] |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, Mathlab] |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, Mathlab] |NY   |M     |
|{Mike, Mary, Williams}|[Python, Mathlab] |OH   |M     |
+----------------------+------------------+-----+------+



In [15]:
# let's find all rows with state NY
df.filter(df.state == "NY").show(truncate=False)

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, Mathlab] |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, Mathlab] |NY   |M     |
+--------------------+------------------+-----+------+



In [16]:
# we can also use col
from pyspark.sql.functions import col
df.filter(col("state") == "NY").show(truncate=False)

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, Mathlab] |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, Mathlab] |NY   |M     |
+--------------------+------------------+-----+------+



In [17]:
# We can also use sql-like syntax
df.filter("state == 'NY'").show(truncate=False)

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, Mathlab] |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, Mathlab] |NY   |M     |
+--------------------+------------------+-----+------+



In [18]:
# Multiple conditions
df.filter( 
    (df.state  == "NY") & (df.gender  == "M") 
).show(truncate=False)  

+--------------------+-----------------+-----+------+
|name                |languages        |state|gender|
+--------------------+-----------------+-----+------+
|{Maria, Anne, Jones}|[CSharp, Mathlab]|NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, Mathlab]|NY   |M     |
+--------------------+-----------------+-----+------+



In [19]:
df.filter( 
    (df.state  == "NY") | (df.state  == "OH")  # OR condition
).show(truncate=False) 

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, Mathlab] |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, Mathlab] |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, Mathlab] |NY   |M     |
|{Mike, Mary, Williams}|[Python, Mathlab] |OH   |M     |
+----------------------+------------------+-----+------+



In [20]:
# Sql like syntax
df.filter("""
state == 'NY' AND gender == 'M'
""").show(truncate=False)

+--------------------+-----------------+-----+------+
|name                |languages        |state|gender|
+--------------------+-----------------+-----+------+
|{Maria, Anne, Jones}|[CSharp, Mathlab]|NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, Mathlab]|NY   |M     |
+--------------------+-----------------+-----+------+



In [21]:
# searching for jAVA
from pyspark.sql.functions import array_contains
df.filter(
    array_contains(df.languages,"Java")
).show(truncate=False)  

+----------------+------------------+-----+------+
|name            |languages         |state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }  |[Spark, Java, C++]|NY   |F     |
+----------------+------------------+-----+------+



In [22]:
# creating Nested columns
df.filter(df.name.lastname == "Brown").show(truncate=False) 

+------------------+-----------------+-----+------+
|name              |languages        |state|gender|
+------------------+-----------------+-----+------+
|{Jen, Mary, Brown}|[CSharp, Mathlab]|NY   |M     |
+------------------+-----------------+-----+------+



# Sorting

In [23]:
# Basic sorting
df.sort("state").show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|{Maria, Anne, Jones}| [CSharp, Mathlab]|   NY|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|  {Jen, Mary, Brown}| [CSharp, Mathlab]|   NY|     M|
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}| [CSharp, Mathlab]|   OH|     F|
|{Mike, Mary, Will...| [Python, Mathlab]|   OH|     M|
+--------------------+------------------+-----+------+



In [24]:
# Sort on multiple cols
df.sort("state", "gender").show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}| [CSharp, Mathlab]|   NY|     M|
|  {Jen, Mary, Brown}| [CSharp, Mathlab]|   NY|     M|
| {Julia, , Williams}| [CSharp, Mathlab]|   OH|     F|
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|{Mike, Mary, Will...| [Python, Mathlab]|   OH|     M|
+--------------------+------------------+-----+------+



In [25]:
df.sort("gender", "state").show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}| [CSharp, Mathlab]|   OH|     F|
|{Maria, Anne, Jones}| [CSharp, Mathlab]|   NY|     M|
|  {Jen, Mary, Brown}| [CSharp, Mathlab]|   NY|     M|
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|{Mike, Mary, Will...| [Python, Mathlab]|   OH|     M|
+--------------------+------------------+-----+------+



In [26]:
# ASC/DESC?
df.sort(df.gender.desc(), df.state.asc()).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|  {Jen, Mary, Brown}| [CSharp, Mathlab]|   NY|     M|
|{Maria, Anne, Jones}| [CSharp, Mathlab]|   NY|     M|
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|{Mike, Mary, Will...| [Python, Mathlab]|   OH|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}| [CSharp, Mathlab]|   OH|     F|
+--------------------+------------------+-----+------+



In [27]:
# using sql query , we can also use it for the SQL file
df.createOrReplaceTempView("programmers")
spark.sql("""
SELECT *
FROM programmers
ORDER BY gender DESC, state ASC
""").show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{Jen, Mary, Brown}    |[CSharp, Mathlab] |NY   |M     |
|{Maria, Anne, Jones}  |[CSharp, Mathlab] |NY   |M     |
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Mike, Mary, Williams}|[Python, Mathlab] |OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, Mathlab] |OH   |F     |
+----------------------+------------------+-----+------+



# Grouping

In [28]:
# let's start with a new DF

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [29]:
# How much money is spent on salary per department?
df.groupBy("department").sum("salary").show(truncate=False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+



In [30]:
# What is the average salary/dept?
df.groupBy("department").avg("salary").show(truncate=False)

+----------+-----------------+
|department|avg(salary)      |
+----------+-----------------+
|Sales     |85666.66666666667|
|Finance   |87750.0          |
|Marketing |85500.0          |
+----------+-----------------+



In [31]:
# It would be nice to sort this 
from pyspark.sql.functions import mean, count, sum, col
from pyspark.sql.functions import mean, count, sum, col

df.groupBy("department").agg(
        # average salary
        mean('salary').alias("avg_salary"),
        # number of salaries
        count('salary').alias("num_salary"),
        # total salary
        sum("salary").alias("total_salary")).sort(col("avg_salary").asc()).show()

+----------+-----------------+----------+------------+
|department|       avg_salary|num_salary|total_salary|
+----------+-----------------+----------+------------+
| Marketing|          85500.0|         2|      171000|
|     Sales|85666.66666666667|         3|      257000|
|   Finance|          87750.0|         4|      351000|
+----------+-----------------+----------+------------+



In [32]:
# groupby multiple cols
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
|     Sales|   CA|      81000|     23000|
| Marketing|   CA|      80000|     18000|
|   Finance|   CA|     189000|     47000|
|     Sales|   NY|     176000|     30000|
+----------+-----+-----------+----------+



In [33]:
# groupby and then filter
from pyspark.sql.functions import max

df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
      mean("salary").alias("avg_salary"), \
      sum("bonus").alias("sum_bonus"), \
      max("bonus").alias("max_bonus")) \
    .filter(col("sum_bonus") >= 50000) \
    .show(truncate=False)

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
+----------+----------+-----------------+---------+---------+



# UDFs: User Defined Functions

In [34]:
# Create some basic data

columns = ["ID","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+---+------------+
|ID |Name        |
+---+------------+
|1  |john jones  |
|2  |tracey smith|
|3  |amy sanders |
+---+------------+



In [35]:
# let's fix the casing on these names

def fixNameCase(st):
    if st is None:
        return ""
    return " ".join([word[0].upper()+word[1:].lower() for word in st.split(" ")])

# always test first
print(fixNameCase(None)) # expect ""
print(fixNameCase("james lee"))
print(fixNameCase("james Lee")) 
print(fixNameCase("James Lee")) 
print(fixNameCase("James lee"))


James Lee
James Lee
James Lee
James Lee


In [36]:
# define spark udf which help us write custom functions for processing data
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

fixNameCaseUDF = udf(lambda z: fixNameCase(z),StringType()) 

In [37]:
# let's apply to our df
df.select(col("ID"), \
    fixNameCaseUDF(col("Name")).alias("NameFixed") ) \
   .show(truncate=False)

+---+------------+
|ID |NameFixed   |
+---+------------+
|1  |John Jones  |
|2  |Tracey Smith|
|3  |Amy Sanders |
+---+------------+



In [52]:
# let's create a name "slug" and append to this df
def slugify(name):
    if name is None:
        return ""
    return "-".join(name.lower().split(" "))

# always test first
print(slugify(None)) # expect ""
print(slugify("Ecem Basak")) # expect "Ecem Basak"
print(slugify("Ecem Basak")) # expect "Ecem Basak"
print(slugify("Ecem Basak")) # expect "Ecem Basak"
print(slugify("Ecem Basak")) # expect "Ecem Basak"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


ecem-basak
ecem-basak
ecem-basak
ecem-basak

In [53]:
# Apply to our data
slugifyUDF = udf(lambda z:slugify(z), StringType())   

fixedDf = df.select(col("ID"),fixNameCaseUDF(col("Name")).alias("NameFixed"))

fixedDf.withColumn("NameSlug", slugifyUDF(col("NameFixed"))).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------------+------------+
| ID|   NameFixed|    NameSlug|
+---+------------+------------+
|  1|  John Jones|  john-jones|
|  2|Tracey Smith|tracey-smith|
|  3| Amy Sanders| amy-sanders|
+---+------------+------------+

In [54]:
# Use in sql

spark.udf.register("fixNameCaseUDF", fixNameCase, StringType())
df.createOrReplaceTempView("names")
spark.sql("""
SELECT 
    ID, 
    fixNameCaseUDF(Name) as NameFixed 
FROM names
""").show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------------+
|ID |NameFixed   |
+---+------------+
|1  |John Jones  |
|2  |Tracey Smith|
|3  |Amy Sanders |
+---+------------+