In [1]:
import pyspark
from pyspark.sql import SparkSession

sc = SparkSession.builder.appName("Dataframe excercise").getOrCreate()

23/07/28 16:31:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
from pyspark.sql.types import StructType, StructField,StringType, IntegerType
schema = StructType([StructField("name", StringType(), True),
                     StructField("age", IntegerType(), True)])

In [3]:
data = [('Sam',26),
       ('Peter', 25),
       ('Rahul',22)]

In [4]:
 df = spark.createDataFrame(data, schema)

In [5]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [6]:
df.show()

+-----+---+
| name|age|
+-----+---+
|  Sam| 26|
|Peter| 25|
|Rahul| 22|
+-----+---+



In [7]:
nestedData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]

nestedStruct = StructType([ StructField('name', 
                                        StructType([ 
                                            StructField('firstName', StringType(), True),
                                            StructField('middleName', StringType(), True),
                                            StructField('lastName', StringType(), True)]), True),
                            StructField('id', StringType(), True),
                            StructField('gender', StringType(), True),
                            StructField('salary', IntegerType(), False)])

In [8]:
df = spark.createDataFrame(nestedData, schema=nestedStruct)
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- middleName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = false)



In [9]:
df.show()

+--------------------+-----+------+------+
|                name|   id|gender|salary|
+--------------------+-----+------+------+
|    {James, , Smith}|36636|     M|  3100|
|   {Michael, Rose, }|40288|     M|  4300|
|{Robert, , Williams}|42114|     M|  1400|
|{Maria, Anne, Jones}|39192|     F|  5500|
|  {Jen, Mary, Brown}|     |     F|    -1|
+--------------------+-----+------+------+



In [10]:
from pyspark.sql.functions import struct, when, col
df2 = df.withColumn('salary_details', struct( col('salary').alias('monthly_payment'),
                                              when(col('gender') == 'M','type1')
                                             .otherwise('type2')
                                             .alias('salary_type'))).drop('gender', 'salary')
df2.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- middleName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- salary_details: struct (nullable = false)
 |    |-- monthly_payment: integer (nullable = false)
 |    |-- salary_type: string (nullable = false)



In [11]:
df2.show()

+--------------------+-----+--------------+
|                name|   id|salary_details|
+--------------------+-----+--------------+
|    {James, , Smith}|36636| {3100, type1}|
|   {Michael, Rose, }|40288| {4300, type1}|
|{Robert, , Williams}|42114| {1400, type1}|
|{Maria, Anne, Jones}|39192| {5500, type2}|
|  {Jen, Mary, Brown}|     |   {-1, type2}|
+--------------------+-----+--------------+



In [12]:
from pyspark.sql.functions import lit
df3 = df2.withColumn('department',lit('engg'))
df3.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- middleName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- salary_details: struct (nullable = false)
 |    |-- monthly_payment: integer (nullable = false)
 |    |-- salary_type: string (nullable = false)
 |-- department: string (nullable = false)



In [13]:
df3.show()

+--------------------+-----+--------------+----------+
|                name|   id|salary_details|department|
+--------------------+-----+--------------+----------+
|    {James, , Smith}|36636| {3100, type1}|      engg|
|   {Michael, Rose, }|40288| {4300, type1}|      engg|
|{Robert, , Williams}|42114| {1400, type1}|      engg|
|{Maria, Anne, Jones}|39192| {5500, type2}|      engg|
|  {Jen, Mary, Brown}|     |   {-1, type2}|      engg|
+--------------------+-----+--------------+----------+



In [14]:
df3 = df3.withColumn('department', when(col('id') == '', 'unknown'))
df3.show()   

+--------------------+-----+--------------+----------+
|                name|   id|salary_details|department|
+--------------------+-----+--------------+----------+
|    {James, , Smith}|36636| {3100, type1}|      null|
|   {Michael, Rose, }|40288| {4300, type1}|      null|
|{Robert, , Williams}|42114| {1400, type1}|      null|
|{Maria, Anne, Jones}|39192| {5500, type2}|      null|
|  {Jen, Mary, Brown}|     |   {-1, type2}|   unknown|
+--------------------+-----+--------------+----------+



In [15]:
df3.select(df3.id + df3.department).show()

+-----------------+
|(id + department)|
+-----------------+
|             null|
|             null|
|             null|
|             null|
|             null|
+-----------------+



In [17]:
df3.select(df3.columns[1:]).show()

+-----+--------------+----------+
|   id|salary_details|department|
+-----+--------------+----------+
|36636| {3100, type1}|      null|
|40288| {4300, type1}|      null|
|42114| {1400, type1}|      null|
|39192| {5500, type2}|      null|
|     |   {-1, type2}|   unknown|
+-----+--------------+----------+



In [18]:
df3.select([col for col in df3.columns]).show()

+--------------------+-----+--------------+----------+
|                name|   id|salary_details|department|
+--------------------+-----+--------------+----------+
|    {James, , Smith}|36636| {3100, type1}|      null|
|   {Michael, Rose, }|40288| {4300, type1}|      null|
|{Robert, , Williams}|42114| {1400, type1}|      null|
|{Maria, Anne, Jones}|39192| {5500, type2}|      null|
|  {Jen, Mary, Brown}|     |   {-1, type2}|   unknown|
+--------------------+-----+--------------+----------+



In [19]:
df3.collect()

[Row(name=Row(firstName='James', middleName='', lastName='Smith'), id='36636', salary_details=Row(monthly_payment=3100, salary_type='type1'), department=None),
 Row(name=Row(firstName='Michael', middleName='Rose', lastName=''), id='40288', salary_details=Row(monthly_payment=4300, salary_type='type1'), department=None),
 Row(name=Row(firstName='Robert', middleName='', lastName='Williams'), id='42114', salary_details=Row(monthly_payment=1400, salary_type='type1'), department=None),
 Row(name=Row(firstName='Maria', middleName='Anne', lastName='Jones'), id='39192', salary_details=Row(monthly_payment=5500, salary_type='type2'), department=None),
 Row(name=Row(firstName='Jen', middleName='Mary', lastName='Brown'), id='', salary_details=Row(monthly_payment=-1, salary_type='type2'), department='unknown')]

In [20]:
df3.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- middleName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- salary_details: struct (nullable = false)
 |    |-- monthly_payment: integer (nullable = false)
 |    |-- salary_type: string (nullable = false)
 |-- department: string (nullable = true)



In [22]:
# Replace existing dataframe having columns firstName, middleName, lastName and salary

df3 = df3.withColumn('firstName', df3.name.firstName).withColumn('middleName', df3.name.middleName).withColumn('lastName', df3.name.lastName).withColumn('salary', df3.salary_details.monthly_payment)

In [27]:
for c in ['name','id', 'salary_details', 'department']:
    df3 = df3.drop(col(c))
df3.take(2)

[Row(firstName='James', middleName='', lastName='Smith', salary=3100),
 Row(firstName='Michael', middleName='Rose', lastName='', salary=4300)]

In [36]:
df3.filter((df3.salary>3000) & (df3.lastName != '') & (df3.firstName.isin(['James','Maria']))).show()

+---------+----------+--------+------+
|firstName|middleName|lastName|salary|
+---------+----------+--------+------+
|    James|          |   Smith|  3100|
|    Maria|      Anne|   Jones|  5500|
+---------+----------+--------+------+

