In [1]:
%matplotlib inline
from __future__ import print_function
import sys
import numpy as np
import matplotlib.pyplot as plt

from operator import itemgetter

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row

sc = SparkContext('local[*]')
sc.setLogLevel("WARN")

spark = SparkSession(sc)

In [3]:
spark.range(1000).filter("id > 100").selectExpr("sum(id)").explain()

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- *Filter (id#0L > 100)
         +- *Range (0, 1000, step=1, splits=Some(2))


In [52]:
%%time
def make_measurements(input_id):
    n_measurements = np.random.poisson(15)
    meas_values = input_id + np.random.randn(n_measurements)
    htm_id = int(200*np.random.random())
    return zip(n_measurements*[input_id], n_measurements*[htm_id], meas_values.tolist())
    
rdd = sc.range(100000).flatMap(make_measurements).collect()
meas_table = spark.createDataFrame(rdd, schema=("obj_id", "htm_id", "meas_value"))
meas_table.registerTempTable("meas_table")
meas_table.write.parquet("meas_table.parquet", partitionBy=['htm_id'])

CPU times: user 51 s, sys: 150 ms, total: 51.1 s
Wall time: 1min 12s


In [53]:
meas_table.count()

1499400

In [55]:
%%time
summary_table = spark.sql("SELECT obj_id, first(htm_id) as htm_id, count(*) as n_epochs, "
                             "min(meas_value) as min_val, max(meas_value) as max_val "
                             "FROM meas_table GROUP BY obj_id")
summary_table.registerTempTable("summary_table")
summary_table.write.parquet("summary_table.parquet")

CPU times: user 10 ms, sys: 30 ms, total: 40 ms
Wall time: 11.5 s


In [21]:
summary_table.toPandas()

Unnamed: 0,obj_id,n_epochs,min_val,max_val
0,0,17,-0.961687,1.668768
1,7,23,5.064148,9.762524
2,6,15,5.193034,7.944696
3,9,12,6.324873,10.110992
4,5,17,4.038313,6.668768
5,1,15,0.193034,2.944696
6,3,9,1.954731,4.158932
7,8,9,6.954731,9.158932
8,2,23,0.064148,4.762524
9,4,12,1.324873,5.110992


Delete and Reload
-----------------

If we leave the original RDDs around, subsequent queries can cheat and reuse the aggregation relations that built the summary table. Since we want to force actual joins between two independent tables, we need to delete the RDDs and reload them from disk.

In [61]:
meas_table.unpersist()
summary_table.unpersist()
spark.sql("DROP TABLE summary_table")
spark.sql("DROP TABLE meas_table")

DataFrame[]

In [63]:
summary_table = spark.read.parquet("summary_table.parquet")
summary_table.registerTempTable("summary_table")

meas_table = spark.read.parquet("meas_table.parquet")
meas_table.registerTempTable("meas_table")

In [73]:
%%time
query = ("SELECT summary_table.obj_id, meas_value FROM  meas_table "
         "JOIN summary_table ON (summary_table.obj_id = meas_table.obj_id) "
         "WHERE summary_table.min_val > 10000")
spark.sql(query).explain(True)
targetObjects = spark.sql(query)
print(targetObjects.count())

== Parsed Logical Plan ==
'Project ['summary_table.obj_id, 'meas_value]
+- 'Filter ('summary_table.min_val > 10000)
   +- 'Join Inner, ('summary_table.obj_id = 'meas_table.obj_id)
      :- 'UnresolvedRelation `meas_table`
      +- 'UnresolvedRelation `summary_table`

== Analyzed Logical Plan ==
obj_id: bigint, meas_value: double
Project [obj_id#728L, meas_value#741]
+- Filter (min_val#731 > cast(10000 as double))
   +- Join Inner, (obj_id#728L = obj_id#740L)
      :- SubqueryAlias meas_table, `meas_table`
      :  +- Relation[obj_id#740L,meas_value#741,htm_id#742] parquet
      +- SubqueryAlias summary_table, `summary_table`
         +- Relation[obj_id#728L,htm_id#729L,n_epochs#730L,min_val#731,max_val#732] parquet

== Optimized Logical Plan ==
Project [obj_id#728L, meas_value#741]
+- Join Inner, (obj_id#728L = obj_id#740L)
   :- Project [obj_id#740L, meas_value#741]
   :  +- Filter isnotnull(obj_id#740L)
   :     +- Relation[obj_id#740L,meas_value#741,htm_id#742] parquet
   +- Project

In [78]:
%%time
y = summary_table.groupBy("htm_id").count()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 19.8 ms


In [85]:
%%time
query = ("SELECT summary_table.obj_id, meas_value FROM  meas_table "
         "JOIN summary_table ON (summary_table.obj_id = meas_table.obj_id) "
         "WHERE summary_table.obj_id = 5")
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Project ['summary_table.obj_id, 'meas_value]
+- 'Filter ('summary_table.obj_id = 5)
   +- 'Join Inner, ('summary_table.obj_id = 'meas_table.obj_id)
      :- 'UnresolvedRelation `meas_table`
      +- 'UnresolvedRelation `summary_table`

== Analyzed Logical Plan ==
obj_id: bigint, meas_value: double
Project [obj_id#728L, meas_value#741]
+- Filter (obj_id#728L = cast(5 as bigint))
   +- Join Inner, (obj_id#728L = obj_id#740L)
      :- SubqueryAlias meas_table, `meas_table`
      :  +- Relation[obj_id#740L,meas_value#741,htm_id#742] parquet
      +- SubqueryAlias summary_table, `summary_table`
         +- Relation[obj_id#728L,htm_id#729L,n_epochs#730L,min_val#731,max_val#732] parquet

== Optimized Logical Plan ==
Project [obj_id#728L, meas_value#741]
+- Join Inner, (obj_id#728L = obj_id#740L)
   :- Project [obj_id#740L, meas_value#741]
   :  +- Filter ((obj_id#740L = 5) && isnotnull(obj_id#740L))
   :     +- Relation[obj_id#740L,meas_value#741,htm_id#742] parquet

In [84]:
summary_table.first()

Row(obj_id=39, htm_id=86, n_epochs=16, min_val=37.575302509516334, max_val=40.51324216407764)

In [87]:
%%time

# AND summary_table.htm_id = meas_table.htm_id
query = ("SELECT meas_table.obj_id, meas_value FROM  meas_table "
         "JOIN summary_table ON (summary_table.obj_id = meas_table.obj_id AND summary_table.htm_id = meas_table.htm_id) "
         "WHERE summary_table.obj_id = 5")
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Project ['meas_table.obj_id, 'meas_value]
+- 'Filter ('summary_table.obj_id = 5)
   +- 'Join Inner, (('summary_table.obj_id = 'meas_table.obj_id) && ('summary_table.htm_id = 'meas_table.htm_id))
      :- 'UnresolvedRelation `meas_table`
      +- 'UnresolvedRelation `summary_table`

== Analyzed Logical Plan ==
obj_id: bigint, meas_value: double
Project [obj_id#740L, meas_value#741]
+- Filter (obj_id#728L = cast(5 as bigint))
   +- Join Inner, ((obj_id#728L = obj_id#740L) && (htm_id#729L = cast(htm_id#742 as bigint)))
      :- SubqueryAlias meas_table, `meas_table`
      :  +- Relation[obj_id#740L,meas_value#741,htm_id#742] parquet
      +- SubqueryAlias summary_table, `summary_table`
         +- Relation[obj_id#728L,htm_id#729L,n_epochs#730L,min_val#731,max_val#732] parquet

== Optimized Logical Plan ==
Project [obj_id#740L, meas_value#741]
+- Join Inner, ((obj_id#728L = obj_id#740L) && (htm_id#729L = cast(htm_id#742 as bigint)))
   :- Filter (((obj_id#740L = 