A comparison of Spark SQL run locally and EmptyHeaded on a graph dataset.

In [1]:
import os
import time

# PYTHONPATH should include $SPARK_HOME/python and $SPARK_HOME/python/lib/py4j-0.9-src.zip
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row

from emptyheaded import *

In [2]:
# Init. SparkSQL and EH
sc = SparkContext(appName="SparkSQL")
sql_context = SQLContext(sc)

start()
eh_home = os.path.expandvars("$EMPTYHEADED_HOME")

# Use FB graph data.
path = eh_home + "/test/graph/data/facebook_duplicated.tsv"

In [3]:
# Build table in SparkSQL
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split())
edges = parts.map(lambda p: Row(src=int(p[0]), dst=int(p[1])))
spark_df = sql_context.createDataFrame(edges)
spark_df.registerTempTable("Edge")

In [None]:
# Convert from SparkSQL DataFrames to EH Relations
eh_edge = Relation(
    name="Edge", dataframe=spark_df.toPandas(), attribute_names=spark_df.columns
)

In [4]:
# After EH Relations are initialized, build EH.
c = Config(num_threads=48)
db = Database.create(
    c,
    eh_home + "/test/graph/databases/db",
    [eh_edge]
)
db.build()

cd /Users/andrewlamb/EmptyHeaded/storage_engine/build && cmake -DNUM_THREADS=48 .. && make && cd - > /dev/null


### Triangle Counting Query

In [5]:
tri_query = """
    SELECT COUNT(*) FROM
    Edge e1
    JOIN Edge e2 ON e1.dst = e2.src
    JOIN Edge e3 ON e1.src = e3.src AND e2.dst = e3.dst
"""

# RDD operations are lazy, start time when we materialize the count.
result_set = sql_context.sql(tri_query)
start = time.time()
count = result_set.collect()[0][0]
end = time.time()

print("ELAPSED TIME: {0} s".format(end - start))
print("FOUND {0}.".format(count))

ELAPSED TIME: 23.4883317947 s
FOUND 9672060.


In [6]:
# EH needs a name for the table that stores the result set.
tri_query = "CREATE TABLE tri_count AS (" + tri_query + ")"

# Include query compilation and codegen in the time. The actual
# time to execute the query is much lower.
start = time.time()
db.eval(tri_query, useSql=True)
count = db.get("tri_count").getDF()[0][0]
end = time.time()

print("ELAPSED TIME: {0} s".format(end - start))
print("FOUND {0}.".format(count))

ELAPSED TIME: 4.71924495697 s
FOUND 9672060.


### Lollipop Counting Query

In [7]:
lollipop_query = """
      SELECT COUNT(*)
      FROM Edge e1
      JOIN Edge e2 ON e1.dst = e2.src
      JOIN Edge e3 ON e2.dst = e3.src AND e1.src = e3.dst
      JOIN Edge e4 ON e1.src = e4.src
"""

result_set = sql_context.sql(lollipop_query)
start = time.time()
count = result_set.collect()[0][0]
end = time.time()

print("ELAPSED TIME: {0} s".format(end - start))
print("FOUND {0}.".format(count))

ELAPSED TIME: 70.6646809578 s
FOUND 1426911480.


In [8]:
lollipop_query = "CREATE TABLE lollipop_count AS (" + lollipop_query + ")"

start = time.time()
db.eval(lollipop_query, useSql=True)
count = db.get("lollipop_count").getDF()[0][0]
end = time.time()

print("ELAPSED TIME: {0} s".format(end - start))
print("FOUND {0}.".format(count))

ELAPSED TIME: 4.17032194138 s
FOUND 1426911480.


### Barbell Counting Query

In [9]:
# Runs out of memory on local machine.

barbell_query = """
      SELECT COUNT(*)
      FROM Edge e1
      JOIN Edge e2 ON e1.dst = e2.src
      JOIN Edge e3 ON e2.dst = e3.src AND e3.dst = e1.src
      JOIN Edge e4 ON e4.src = e1.dst
      JOIN Edge e5 ON e5.src = e4.dst
      JOIN Edge e6 ON e5.dst = e6.src
      JOIN Edge e7 ON e6.dst = e7.src AND e7.dst = e5.src
"""

# result_set = sql_context.sql(barbell_query)
# start = time.time()
# count = result_set.collect()[0][0]
# end = time.time()

# print("ELAPSED TIME: {0} s".format(end - start))
# print("FOUND {0}.".format(count))

In [10]:
barbell_query = "CREATE TABLE barbell_count AS (" + barbell_query + ")"

start = time.time()
db.eval(barbell_query, useSql=True)
count = db.get("barbell_count").getDF()[0][0]
end = time.time()

print("ELAPSED TIME: {0} s".format(end - start))
print("FOUND {0}.".format(count))

ELAPSED TIME: 4.41488790512 s
FOUND 20371831447136.
