### Dataframe operations

In [1]:
from pyspark.sql import Row

row = Row(name="Alice", age=11)
print row
print row['name'], row['age']
print row.name, row.age

row = Row(name="Alice", age=11, count=1)
print row.count
print row['count']

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1489483935825_0006,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
Row(age=11, name='Alice')
Alice 11
Alice 11
<built-in method count of Row object at 0x7f9a96a0fb90>
1

In [None]:
dir(row)

In [None]:
df = spark.read.csv('wasb:///HdiSamples/HdiSamples/SensorSampleData/building/building.csv', header=True, inferSchema=True)

In [None]:
# show the content of the dataframe
df.show()

In [None]:
# Print the dataframe schema in a tree format
df.printSchema()

In [None]:
# Create an RDD from the dataframe
dfrdd = df.rdd
dfrdd.take(3)

In [None]:
# Retrieve specific columns from the dataframe
df.select('BuildingID', 'Country').show()

In [None]:
from pyspark.sql.functions import *

df.filter("Country='USA'").select('*', lit('OK').alias('Status')).show()

In [None]:
# Use GroupBy clause with dataframe 
df.groupBy('HVACProduct').count().show()

### Rewriting SQL with DataFrame API

The data files have been put to a public blob container, which can be accessed as follows

In [None]:
# Load data from csv files

dfCustomer = spark.read.csv('wasb://cluster@msbd.blob.core.windows.net/data/Customer.csv', header=True, inferSchema=True)
dfProduct = spark.read.csv('wasb://cluster@msbd.blob.core.windows.net/data/Product.csv', header=True, inferSchema=True)
dfDetail = spark.read.csv('wasb://cluster@msbd.blob.core.windows.net/data/SalesOrderDetail.csv', header=True, inferSchema=True)
dfHeader = spark.read.csv('wasb://cluster@msbd.blob.core.windows.net/data/SalesOrderHeader.csv', header=True, inferSchema=True)

In [None]:
# SELECT ProductID, Name, ListPrice 
# FROM Product 
# WHERE Color = 'black'

dfProduct.filter("Color = 'Black'")\
         .select('ProductID', 'Name', 'ListPrice')\
         .show(truncate=False)

In [None]:
dfProduct.where(dfProduct.Color=='Black') \
         .select(dfProduct.ProductID, dfProduct['Name'], dfProduct.ListPrice * 2) \
         .show(truncate=False)

In [None]:
dfProduct.where("ListPrice * 2 > 100") \
         .select(dfProduct.ProductID, dfProduct['Name'], dfProduct.ListPrice * 2) \
         .show(truncate=False)

In [None]:
# SELECT ProductID, Name, ListPrice 
# FROM Product 
# WHERE Color = 'black' 
# ORDER BY ProductID

dfProduct.filter("Color = 'Black'")\
         .select('ProductID', 'Name', 'ListPrice')\
         .orderBy('ListPrice')\
         .show(truncate=False)

In [None]:
# Find all orders and details on black product,
# return the product SalesOrderID, SalesOrderDetailID, Name, UnitPrice, and OrderQty

# SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty 
# FROM SalesLT.SalesOrderDetail, SalesLT.Product
# WHERE SalesOrderDetail.ProductID = Product.ProductID AND Color = 'Black'

# SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty 
# FROM SalesLT.SalesOrderDetail
# JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
# WHERE Color = 'Black'

# Spark SQL supports natural joins

dfDetail.join(dfProduct.filter("Color='Black'"), 'ProductID') \
        .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty') \
        .show()

# If we move the filter to after select, it still works.  Why?

In [None]:
# This also works:

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d2 = d1.filter("Color = 'Black'")
d2.show()
# d2.explain()

In [None]:
# This will report an error:

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.write.csv('wasb:///temp.csv', mode = 'overwrite', header = True)
d2 = spark.read.csv('wasb:///temp.csv', header = True, inferSchema = True)
d2.filter("Color = 'Black'").show()


In [None]:
# Find all orders that include at least one black product, 
# return the product SalesOrderID, Name, UnitPrice, and OrderQty

# SELECT DISTINCT SalesOrderID
# FROM SalesLT.SalesOrderDetail
# JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
# WHERE Color = 'Black'

dfDetail.join(dfProduct.filter("Color='Black'"), 'ProductID') \
        .select('SalesOrderID') \
        .distinct() \
        .show()

In [None]:
# How many colors in the products?

# SELECT COUNT(DISTINCT Color)
# FROM SalesLT.Product

dfProduct.select('Color').distinct().count()

# It's 1 more than standard SQL.  In standard SQL, COUNT() does not count NULLs.

In [None]:
# Find the total price of each order, 
# return SalesOrderID and total price (column name should be ‘totalprice’)

# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail
# GROUP BY SalesOrderID

dfDetail.select('*', (dfDetail.UnitPrice * dfDetail.OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .show()

In [None]:
# Find the total price of each order where the total price > 10000

# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail
# GROUP BY SalesOrderID
# HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000

dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .filter('TotalPrice > 10000')\
        .show()

In [None]:
# Find the total price on the black products of each order where the total price > 10000

# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail, SalesLT.Product
# WHERE SalesLT.SalesOrderDetail.ProductID = SalesLT.Product.ProductID AND Color = 'Black'
# GROUP BY SalesOrderID
# HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000

dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .join(dfProduct, 'ProductID').where("Color = 'Black'") \
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .filter('TotalPrice > 10000')\
        .show()

In [None]:
# For each customer, find the total quantity of black products bought.
# Report CustomerID, FirstName, LastName, and total quantity

# select saleslt.customer.customerid, FirstName, LastName, sum(orderqty)
# from saleslt.customer
# left outer join 
# (
# saleslt.salesorderheader
# join saleslt.salesorderdetail
# on saleslt.salesorderdetail.salesorderid = saleslt.salesorderheader.salesorderid
# join saleslt.product
# on saleslt.product.productid = saleslt.salesorderdetail.productid and color = 'black'
# )
# on saleslt.customer.customerid = saleslt.salesorderheader.customerid
# group by saleslt.customer.customerid, FirstName, LastName
# order by sum(orderqty) desc

d1 = dfDetail.join(dfProduct, 'ProductID')\
             .where('Color = "Black"') \
             .join(dfHeader, 'SalesOrderID')\
             .groupBy('CustomerID').sum('OrderQty')
dfCustomer.join(d1, 'CustomerID', 'left_outer')\
          .select('CustomerID', 'FirstName', 'LastName', 'sum(OrderQty)')\
          .orderBy('sum(OrderQty)', ascending=False)\
          .show()

-------

### Embed SQL queries

You can also run SQL queries over dataframes once you register them as temporary tables within the SparkSession.

In [None]:
# Register the dataframe as a temporary view called HVAC
df.createOrReplaceTempView('HVAC')

In [None]:
spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10').show()

In [None]:
# Can even mix DataFrame API with SQL:
df.where('BuildingAge >= 10').createOrReplaceTempView('OldBuildings')
spark.sql('SELECT HVACproduct, COUNT(*) FROM OldBuildings GROUP BY HVACproduct').show()

In [None]:
d1 = spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10')
d1.groupBy('HVACproduct').count().show()

In [None]:
# UDF

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

slen = udf(lambda s: len(s), IntegerType())
df.select('*', slen(df['Country']).alias('slen')).show()

In [None]:
spark.udf.register('slen', lambda s: len(s), IntegerType())
spark.sql('SELECT *, slen(Country) AS slen FROM HVAC').show()

---
## Flexible Data Model

Sample data file at

https://msbd.blob.core.windows.net/cluster/data/products.json

In [1]:
df = spark.read.json('wasb://cluster@msbd.blob.core.windows.net/data/products.json')
df.printSchema()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1489453106427_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: double (nullable = true)
 |    |-- width: double (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- warehouseLocation: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)

In [None]:
df.show()

In [None]:
# Accessing nested fields

df.select(df['dimensions.height']).show()

In [None]:
df.select('dimensions.height').show()

In [None]:
df.select('dimensions.height')\
  .filter("tags[0] = 'cold' AND warehouseLocation.latitude < 0")\
  .show()

In [None]:
df.rdd.take(3)

---
## Converting between RDD and DataFrame

Sample data file at:

https://msbd.blob.core.windows.net/cluster/data/people.txt

In [None]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("wasb://cluster@msbd.blob.core.windows.net/data/people.txt")

def parse(l):
    a = l.split(',')
    return (a[0], int(a[1]))

rdd = lines.map(parse)
rdd.collect()

In [None]:
# Create the DataFrame from an RDD of tuples, schema is inferred
df = spark.createDataFrame(rdd)
df.printSchema()
df.show()

In [None]:
# Create the DataFrame from an RDD of tuples with column names, type is inferred
df = spark.createDataFrame(rdd, ['name', 'age'])
df.printSchema()
df.show()

In [None]:
# Create the DataFrame from an RDD of Rows, type is given in the Row objects
from pyspark.sql import Row

rdd_rows = rdd.map(lambda p: Row(name = p[0], age = p[1]))
df = spark.createDataFrame(rdd_rows)
df.printSchema()
df.show()

In [None]:
# Row fields with types incompatible with that of previous rows will be turned into nulls
row1 = Row(name="Alice", age=11)
row2 = Row(name="Bob", age='12')
rdd_rows = sc.parallelize([row1, row2])
df1 = spark.createDataFrame(rdd_rows)
df1.show()

In [None]:
# rdd returns the content as an RDD of Rows
teenagers = df.filter('age >= 13 and age <= 19')

teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name)
teenNames.collect()

### Note:

DataFrames are stored using columnar storage with compression

RDDs are stored using row storage without compression

The RDD view of DataFrame just provides an interface, the Row objects are constructed on the fly and do not necessarily represent the internal storage format of the data

### Closure in DataFrames

In [None]:
data = range(10)
df = spark.createDataFrame(zip(data, data))
df.printSchema()
df.show()

In [None]:
# The 'closure' behaviour in RDD doesn't seem to exist for DataFrames

x = 5
df1 = df.filter(df._1 < x)
print df1.show()
x = 3
print df1.show()

In [None]:
# Because of the Catalyst optimizer !

x = 5
df1 = df.filter(df._1 < x)
df1.show()
x = 3
df1.show()

In [None]:
def f():
    return x/2
x = 5
df1 = df.select(df._1 * f() * f() / 3 + 1)
df1.show()
x = 3
df1.show()

In [None]:
rdd = sc.parallelize(range(10))
x = 5
a = rdd.filter(lambda z: z < x)
print a.take(10)
x = 3
print a.take(10)

In [None]:
counter = 0

def increment_counter(x):
    global counter
    counter += 1

df.foreach(increment_counter)

print counter

### Partitioning in DataFrames

In [None]:
data1 = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4]
data2 = [2, 2, 3, 4, 5, 3, 1, 1, 2, 3]
df = spark.createDataFrame(zip(data1, data2))
print df.rdd.getNumPartitions()
print df.rdd.glom().collect()

In [None]:
df1 = df.repartition(6, df._2)
print df1.rdd.glom().collect()
df1.show()