# PySpark DataFrame Operations

## CIML Summer Institute

Mai H. Nguyen - UC San Diego

--- 

Spark DataFrame Guide:  https://spark.apache.org/docs/latest/sql-programming-guide.html

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html


## Setup

In [1]:
# Initialize Spark

import pyspark
from pyspark.sql import SparkSession, Row

import os

conf = pyspark.SparkConf().setAll([('spark.master', 'local[*]'),
                                   ('spark.app.name', 'PySpark DataFrame Demo')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print (spark.version, pyspark.version.__version__)

3.1.2 3.1.2


---
## Employee example 

### Create Employee DataFrame

In [2]:
Employee = Row("name", "dept", "state", "salary")
employee1 = Employee('James', 'Sales', 'CA', 100000)
employee2 = Employee('Mary', 'Finance', 'NY', 120000)
employee3 = Employee('Jane', 'Sales', 'WA', 160000)
employees = [employee1, employee2, employee3]
employeesDF = spark.createDataFrame(employees)
employeesDF.show()

+-----+-------+-----+------+
| name|   dept|state|salary|
+-----+-------+-----+------+
|James|  Sales|   CA|100000|
| Mary|Finance|   NY|120000|
| Jane|  Sales|   WA|160000|
+-----+-------+-----+------+



### Check type

In [3]:
type(employeesDF)

pyspark.sql.dataframe.DataFrame

### Count number of rows

In [4]:
employeesDF.count()

3

### Count number of columns

In [5]:
len(employeesDF.columns)

4

### See column names

In [6]:
employeesDF.columns

['name', 'dept', 'state', 'salary']

### Display schema

In [7]:
employeesDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)



### Show contents

Show contents of employeesDF

In [8]:
employeesDF.show()

+-----+-------+-----+------+
| name|   dept|state|salary|
+-----+-------+-----+------+
|James|  Sales|   CA|100000|
| Mary|Finance|   NY|120000|
| Jane|  Sales|   WA|160000|
+-----+-------+-----+------+



What does this return?

In [9]:
employeesDF.show(1)

+-----+-----+-----+------+
| name| dept|state|salary|
+-----+-----+-----+------+
|James|Sales|   CA|100000|
+-----+-----+-----+------+
only showing top 1 row



How is this different from the previous cell?

In [10]:
employeesDF.select("name").show()

+-----+
| name|
+-----+
|James|
| Mary|
| Jane|
+-----+



### Get summary statistics

In [11]:
employeesDF.show()

+-----+-------+-----+------+
| name|   dept|state|salary|
+-----+-------+-----+------+
|James|  Sales|   CA|100000|
| Mary|Finance|   NY|120000|
| Jane|  Sales|   WA|160000|
+-----+-------+-----+------+



In [12]:
employeesDF.describe().show()

+-------+-----+-------+-----+------------------+
|summary| name|   dept|state|            salary|
+-------+-----+-------+-----+------------------+
|  count|    3|      3|    3|                 3|
|   mean| null|   null| null|126666.66666666667|
| stddev| null|   null| null|30550.504633038934|
|    min|James|Finance|   CA|            100000|
|    max| Mary|  Sales|   WA|            160000|
+-------+-----+-------+-----+------------------+



In [13]:
employeesDF.select('salary').describe().show()

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|                 3|
|   mean|126666.66666666667|
| stddev|30550.504633038934|
|    min|            100000|
|    max|            160000|
+-------+------------------+



### Find average salary by department

In [14]:
employeesDF.show()

+-----+-------+-----+------+
| name|   dept|state|salary|
+-----+-------+-----+------+
|James|  Sales|   CA|100000|
| Mary|Finance|   NY|120000|
| Jane|  Sales|   WA|160000|
+-----+-------+-----+------+



In [15]:
employeesDF.groupBy('dept').avg('salary').show()

+-------+-----------+
|   dept|avg(salary)|
+-------+-----------+
|  Sales|   130000.0|
|Finance|   120000.0|
+-------+-----------+



### Count employees by department
Group employees by department, then find count.

In [16]:
employeesDF.groupBy('dept').count().show()

+-------+-----+
|   dept|count|
+-------+-----+
|  Sales|    2|
|Finance|    1|
+-------+-----+



### Filter

In [17]:
employeesDF.filter(employeesDF.dept=='Sales').show()

+-----+-----+-----+------+
| name| dept|state|salary|
+-----+-----+-----+------+
|James|Sales|   CA|100000|
| Jane|Sales|   WA|160000|
+-----+-----+-----+------+



### Sort by name

Sort by name, and save in a new DataFrame

In [18]:
employees_sortedDF = employeesDF.sort('name', ascending=True)

In [19]:
employees_sortedDF.show()

+-----+-------+-----+------+
| name|   dept|state|salary|
+-----+-------+-----+------+
|James|  Sales|   CA|100000|
| Jane|  Sales|   WA|160000|
| Mary|Finance|   NY|120000|
+-----+-------+-----+------+



### Save sorted DataFrame to file

In [20]:
employees_sortedDF.coalesce(1).\
write.csv("employees_sorted.csv", header=True, mode="overwrite")

----
## Sentence example 

### Create DataFrame with sentences

In [21]:
sent_0 = Row(value='This is a sentence')
sent_1 = Row(value='This is another sentence')
sentences = [sent_0, sent_1]
sentenceDF = spark.createDataFrame(sentences)
sentenceDF.show()

+--------------------+
|               value|
+--------------------+
|  This is a sentence|
|This is another s...|
+--------------------+



### Examples to show split() and explode()

Split sentences on space, and rename the resulting column to 'csv'.

In [22]:
from pyspark.sql.functions import split, explode, col

wordsDF1 = sentenceDF.select(split("value"," ").alias("csv"))
wordsDF1.show()

+--------------------+
|                 csv|
+--------------------+
|[This, is, a, sen...|
|[This, is, anothe...|
+--------------------+



Map each column in wordsDF1 to a separate column and put the results in a new DataFrame.
Hint:  Use 'explode'.

In [23]:
wordsDF2 = wordsDF1.select(explode("csv").alias("word"))
wordsDF2.show()

+--------+
|    word|
+--------+
|    This|
|      is|
|       a|
|sentence|
|    This|
|      is|
| another|
|sentence|
+--------+



Filter this new DataFrame to return rows containing the word 'This'.

In [24]:
wordsDF2.filter(col("word") == "This").show()

+----+
|word|
+----+
|This|
|This|
+----+



## Stop Spark session

In [25]:
spark.stop()