# Introduction to DataFrames - Python

Create DataFrames

In [6]:
# import pyspark class Row from module Sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)



Row(id='123456', name='Computer Science')
Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)
no-reply@berkeley.edu


Create DataFrames from a list of the rows

In [None]:
spark = SparkSession\
        .builder\
        .appName("DataframesIntroduction")\
        .getOrCreate()

departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

display(df1)

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)

DataFrame[department: struct<id:string,name:string>, employees: array<struct<firstName:string,lastName:string,email:string,salary:bigint>>]

DataFrame[department: struct<id:string,name:string>, employees: array<struct<firstName:string,lastName:string,email:string,salary:bigint>>]

In [8]:
df1.show()
df2.show()

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[123456, Computer...|[[michael, armbru...|
|[789012, Mechanic...|[[matei,, no-repl...|
+--------------------+--------------------+

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[345678, Theater ...|[[michael, armbru...|
|[901234, Indoor R...|[[xiangrui, meng,...|
+--------------------+--------------------+



Work with DataFrames. Union two DataFrames


In [9]:
unionDF = df1.unionAll(df2)
unionDF.show()

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[123456, Computer...|[[michael, armbru...|
|[789012, Mechanic...|[[matei,, no-repl...|
|[345678, Theater ...|[[michael, armbru...|
|[901234, Indoor R...|[[xiangrui, meng,...|
+--------------------+--------------------+



Write the unioned DataFrame to a Parquet File


In [10]:
# Remove the file if it exists
import os
if os.path.exists("output/databricks-df-example.parquet"):
    os.remove("output/databricks-df-example.parquet")
unionDF.write.parquet("output/databricks-df-example.parquet")

Read a DataFrame from the Parquet file

In [11]:
parquetDF = spark.read.parquet("output/databricks-df-example.parquet")
parquetDF.show()

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[345678, Theater ...|[[michael, armbru...|
|[789012, Mechanic...|[[matei,, no-repl...|
|[901234, Indoor R...|[[xiangrui, meng,...|
|[123456, Computer...|[[michael, armbru...|
+--------------------+--------------------+



Explode the employees column

In [29]:
from pyspark.sql.functions import split, explode
df = unionDF.select(explode("employees").alias("e"))
print(df)
df.show()
explodeDF = df.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

print(explodeDF)
explodeDF.show()

DataFrame[e: struct<firstName:string,lastName:string,email:string,salary:bigint>]
+--------------------+
|                   e|
+--------------------+
|[michael, armbrus...|
|[xiangrui, meng, ...|
|[matei,, no-reply...|
|[, wendell, no-re...|
|[michael, armbrus...|
|[, wendell, no-re...|
|[xiangrui, meng, ...|
|[matei,, no-reply...|
+--------------------+

DataFrame[firstName: string, lastName: string, email: string, salary: bigint]
+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
|     null| wendell|no-reply@berkeley...|160000|
|  michael|armbrust|no-reply@berkeley...|100000|
|     null| wendell|no-reply@berkeley...|160000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
+---------+--------+----

In [28]:
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()

+---+-----+
|key|value|
+---+-----+
|  a|    b|
+---+-----+



Use filter() to return the rows that match a predicate

In [31]:
filterDF = explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)
display(filterDF)
filterDF.show()

DataFrame[firstName: string, lastName: string, email: string, salary: bigint]

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+



In [32]:
from pyspark.sql.functions import col, asc

# Use `|`instead of `or`
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
filterDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+



The where() clause is equivalent to filter()

In [33]:
filterDF = explodeDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
filterDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+



Replace null values with -- using DataFrame Na function

In [35]:
nonNullDF = explodeDF.fillna("--")
nonNullDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|      --|no-reply@waterloo...|140000|
|       --| wendell|no-reply@berkeley...|160000|
|  michael|armbrust|no-reply@berkeley...|100000|
|       --| wendell|no-reply@berkeley...|160000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|      --|no-reply@waterloo...|140000|
+---------+--------+--------------------+------+



Retrieve only rows with missing firstName or lastName

In [36]:
filterNonNullDF = explodeDF.filter(col("firstName").isNull() | col("lastName").isNull()).sort("email")
filterNonNullDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|     null| wendell|no-reply@berkeley...|160000|
|     null| wendell|no-reply@berkeley...|160000|
|    matei|    null|no-reply@waterloo...|140000|
|    matei|    null|no-reply@waterloo...|140000|
+---------+--------+--------------------+------+



Example aggregations using agg() and countDistinct()

In [42]:
from pyspark.sql.functions import countDistinct, count

countDistinctDF = explodeDF.select("firstName", "lastName")\
    .groupBy("firstName", "lastName")\
    .agg(count("firstName"))

display(countDistinctDF)
countDistinctDF.show()

DataFrame[firstName: string, lastName: string, count(firstName): bigint]

+---------+--------+----------------+
|firstName|lastName|count(firstName)|
+---------+--------+----------------+
|     null| wendell|               0|
| xiangrui|    meng|               2|
|  michael|armbrust|               2|
|    matei|    null|               2|
+---------+--------+----------------+



Compare the DataFrame and SQL query physical plans

In [43]:
countDistinctDF.explain()

== Physical Plan ==
*(4) HashAggregate(keys=[firstName#179, lastName#180], functions=[count(firstName#179)])
+- Exchange hashpartitioning(firstName#179, lastName#180, 200)
   +- *(3) HashAggregate(keys=[firstName#179, lastName#180], functions=[partial_count(firstName#179)])
      +- *(3) Project [e#173.firstName AS firstName#179, e#173.lastName AS lastName#180]
         +- Generate explode(employees#1), false, [e#173]
            +- Union
               :- *(1) Project [employees#1]
               :  +- Scan ExistingRDD[department#0,employees#1]
               +- *(2) Project [employees#5]
                  +- Scan ExistingRDD[department#4,employees#5]


In [44]:
# register the DataFrame as a temp table so that we can query it using SQL
explodeDF.registerTempTable("databricks_df_example")

# Perform the same query as the DataFrame above and return ``explain``
countDistinctDF_sql = spark.sql("SELECT firstName, lastName, count(distinct firstName) \
                                as distinct_first_names FROM databricks_df_example GROUP BY firstName, lastName")

countDistinctDF_sql.explain()

== Physical Plan ==
*(4) HashAggregate(keys=[firstName#179, lastName#180], functions=[count(distinct firstName#179)])
+- *(4) HashAggregate(keys=[firstName#179, lastName#180], functions=[partial_count(distinct firstName#179)])
   +- *(4) HashAggregate(keys=[firstName#179, lastName#180, firstName#179], functions=[])
      +- Exchange hashpartitioning(firstName#179, lastName#180, firstName#179, 200)
         +- *(3) HashAggregate(keys=[firstName#179, lastName#180, firstName#179], functions=[])
            +- *(3) Project [e#173.firstName AS firstName#179, e#173.lastName AS lastName#180]
               +- Generate explode(employees#1), false, [e#173]
                  +- Union
                     :- *(1) Project [employees#1]
                     :  +- Scan ExistingRDD[department#0,employees#1]
                     +- *(2) Project [employees#5]
                        +- Scan ExistingRDD[department#4,employees#5]
