# ** Spark DataFrames with Python **
This notebook demonstrates a lot of common Spark DataFrames functions with Python. Nice example to see how you can use transformation on Dataframes, create or save as temporary tables, and then issue SQL queires against them

In [2]:
# import pyspark class row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments or you could read these from a csv or json file
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
employee1 = Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000)
employee2 = Row(firstName='chris', lastName='fregly', email='no-reply@northwestern.edu', salary=120000)
employee3 = Row(firstName='matei', lastName=None, email='no-reply@waterloo.edu', salary=140000)
employee4 = Row(firstName=None, lastName='wendell', email='no-reply@berkeley.edu', salary=160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row (department = department1, employees = [employee1, employee2])
departmentWithEmployees2 = Row (department = department2, employees = [employee3, employee4])
departmentWithEmployees3 = Row (department = department3, employees = [employee1, employee4])
departmentWithEmployees4 = Row (department = department4, employees = [employee2, employee3])

print department1
print departmentWithEmployees1.employees[0].email


In [3]:
print departmentWithEmployees1
print departmentWithEmployees2

** Create the first DataFrame from a List of the Case Classes. **

In [5]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = sqlContext.createDataFrame(departmentsWithEmployeesSeq1)

display (df1)


** Create a 2nd DataFrame from a List of Case Classes. **

In [7]:
departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = sqlContext.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)

## ** Working with DataFrames **

** Union 2 DataFrames. **

In [10]:
unionDF = df1.unionAll(df2)
display(unionDF)

** Write the Unioned DataFrame to a Parquet file. **

In [12]:
# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)
unionDF.write.parquet("/tmp/databricks-df-example.parquet")

** Read a DataFrame from the Parquet file that you wrote to. **

In [14]:
parquetDF = sqlContext.read.parquet("/tmp/databricks-df-example.parquet")
display (parquetDF)

**Explode the employees column.** 
Explode Function: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#explode

In [16]:
from pyspark.sql.functions import explode

from pyspark.sql import Row
#the lambda function gets each row, and what you end up with is python list of employee rows
parquetDF.select(explode("employees").alias("employee")).map(lambda row: row).collect()

In [17]:
from pyspark.sql.functions import explode

##create another dataframe with columns firstname, lastname, email & salary
## this time the lambda function gets a row, but explodes it into its respective fields. Note that "None" maps to [] or empty string
explodeDF = sqlContext.createDataFrame(parquetDF.select(explode("employees").alias("employee")).map(lambda row: Row(row.employee.firstName, row.employee.lastName, row.employee.email, row.employee.salary)), ["firstName", "lastName", "email", "salary"])

display(explodeDF)

In [18]:
print explodeDF

**Use ``filter()`` to return only the rows that match the given predicate. Note that we can now employ transformations on our Dataframe as with RDDs. What's returned is antoher Dataframe**

In [20]:
from pyspark.sql.functions import col, asc
##use filter transformation on Dataframe and produce another dataframe
filterDF = explodeDF.filter(col("firstName") == "chris").sort(col("lastName"))
display(filterDF)

In [21]:
from pyspark.sql.functions import col, asc
##a more complicated filter with predicates and generate another dataframe
filterDF = explodeDF.filter((col("firstName") == "chris") | (col("firstName") == "michael")).sort(asc("lastName"))
display(filterDF)

**The ``where()`` clause is equivalent to ``filter()``.**

In [23]:
whereDF = explodeDF.where((col("firstName") == "chris") | (col("firstName") == "michael")).sort(asc("lastName"))
display(whereDF)

**Replace ``null`` values with empty string using DataFrame Na functions.**

In [25]:
naFunctions = explodeDF.na
nonNullDF = naFunctions.fill("")

display(nonNullDF)

**Retrieve only rows with missing firstName or lastName.**

In [27]:
##powerfully declarative and descriptive way to winnow down your dataframe
filterNonNullDF = nonNullDF.filter((col("firstName") == "") | (col("lastName") == "")).sort(asc("email"))
display(filterNonNullDF)

**Example aggregations using ``agg()`` and ``countDistinct()``.**

In [29]:
from pyspark.sql.functions import countDistinct
countDistinctDF = nonNullDF.select("firstName", "lastName").groupBy("firstName", "lastName").agg(countDistinct("firstName")) 
  
display(countDistinctDF)

**Compare the DataFrame and SQL Query Physical Plans**
**(Hint:  They should be the same.)**

In [31]:
countDistinctDF.explain()

In [32]:
# register the DataFrame as a temp table so that we can query it using SQL
nonNullDF.registerTempTable("databricks_df_example")

# Perform the same query as the DataFrame above and return ``explain``
countDistinctDF_sql = sqlContext.sql("SELECT firstName, lastName, count(distinct firstName) as distinct_first_names FROM databricks_df_example GROUP BY firstName, lastName")

countDistinctDF_sql.explain()

**Sum up all the salaries**

In [34]:
salarySumDF = nonNullDF.agg({"salary" : "sum"}) 
display(salarySumDF)

In [35]:
type(nonNullDF.salary)

**Print the summary statistics for the salaries.**

In [37]:
nonNullDF.describe("salary").show()

### An example using Pandas & Matplotlib Integration

In [39]:
import pandas as pd
import matplotlib.pyplot as plt
plt.clf()
pdDF = nonNullDF.toPandas()
type(pdDF)
pdDF.plot(x='firstName', y='salary', kind='line')
plt.clf()
pdDF.plot(x='firstName', y='salary', kind='bar')
display()


### Cleanup: Remove the parquet file.

In [41]:
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)