In [None]:
# Define Table Schemas

from pyspark.sql.types import StructType, StringType, DateType, IntegerType, DoubleType

lineitem_struct = (
    StructType()
    .add("L_ORDERKEY", IntegerType(), True, None)
    .add("L_PARTKEY", IntegerType(), True, None)
    .add("L_SUPPKEY", IntegerType(), True, None)
    .add("L_LINENUMBER", IntegerType(), True, None)
    .add("L_QUANTITY", DoubleType(), True, None)
    .add("L_EXTENDEDPRICE", DoubleType(), True, None)
    .add("L_DISCOUNT", DoubleType(), True, None)
    .add("L_TAX", DoubleType(), True, None)
    .add("L_RETURNFLAG", StringType(), True, None)
    .add("L_LINESTATUS", StringType(), True, None)
    .add("L_SHIPDATE", DateType(), True, None)
    .add("L_COMMITDATE", DateType(), True, None)
    .add("L_RECEIPTDATE", DateType(), True, None)
    .add("L_SHIPINSTRUCT", StringType(), True, None)
    .add("L_SHIPMODE", StringType(), True, None)
    .add("L_COMMENT", StringType(), True, None)
)


orders_struct = (
    StructType()
    .add("O_ORDERKEY", IntegerType(), True, None)
    .add("O_CUSTKEY", IntegerType(), True, None)
    .add("O_ORDERSTATUS", StringType(), True, None)
    .add("O_TOTALPRICE", DoubleType(), True, None)
    .add("O_ORDERDATE", DateType(), True, None)
    .add("O_ORDERPRIORITY", StringType(), True, None)
    .add("O_CLERK", StringType(), True, None)
    .add("O_SHIPPRIORITY", IntegerType(), True, None)
    .add("O_COMMENT", StringType(), True, None)
)


customer_struct = (
    StructType()
    .add("C_CUSTKEY", IntegerType(), True, None)
    .add("C_NAME", StringType(), True, None)
    .add("C_ADDRESS", StringType(), True, None)
    .add("C_NATIONKEY", IntegerType(), True, None)
    .add("C_PHONE", StringType(), True, None)
    .add("C_ACCTBAL", DoubleType(), True, None)
    .add("C_MKTSEGMENT", StringType(), True, None)
    .add("C_COMMENT", StringType(), True, None)
)


nation_struct = (
    StructType()
    .add("N_NATIONKEY", IntegerType(), True, None)
    .add("N_NAME", StringType(), True, None)
    .add("N_REGIONKEY", IntegerType(), True, None)
    .add("N_COMMENT", StringType(), True, None)
)

In [None]:
"""
Example that runs TPC-H Q10.

"""
import time
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext


def load_lineitem(data_folder, spark):
    lineitem = spark.read.parquet(
        data_folder + "/lineitem.pq/"
    )
    # To isolate Spark read from execution we have to perform
    # an action. We opt to use lineitem.head() to force reading
    # the whole dataset (which we do for Bodo as well).
    #
    # Additionally we cache/persist to avoid needing to return to S3
    # in case of failure. This is the most accurate method we know to
    # isolate read from execution in Spark.
    lineitem.cache()
    lineitem.persist()
    print(lineitem.head())
    # Add for Spark SQL
    lineitem.createOrReplaceTempView("lineitem")
    print(lineitem.count())


def load_orders(data_folder, spark):
    orders = spark.read.parquet(
        data_folder + "/orders.pq/"
    )
    # To isolate Spark read from execution we have to perform
    # an action. We opt to use lineitem.head() to force reading
    # the whole dataset (which we do for Bodo as well).
    #
    # Additionally we cache/persist to avoid needing to return to S3
    # in case of failure. This is the most accurate method we know to
    # isolate read from execution in Spark.
    orders.cache()
    orders.persist()
    print(orders.head())
    # Add for Spark SQL
    orders.createOrReplaceTempView("orders")
    print(orders.count())



def load_customer(data_folder, spark):
    customer = spark.read.parquet(
        data_folder + "/customer.pq/"
    )
    # To isolate Spark read from execution we have to perform
    # an action. We opt to use lineitem.head() to force reading
    # the whole dataset (which we do for Bodo as well).
    #
    # Additionally we cache/persist to avoid needing to return to S3
    # in case of failure. This is the most accurate method we know to
    # isolate read from execution in Spark.
    customer.cache()
    customer.persist()
    print(customer.head())
    # Add for Spark SQL
    customer.createOrReplaceTempView("customer")
    print(customer.count())
    


def load_nation(data_folder, spark):
    nation = spark.read.parquet(
        data_folder + "/nation.pq/"
    )
    # To isolate Spark read from execution we have to perform
    # an action. We opt to use lineitem.head() to force reading
    # the whole dataset (which we do for Bodo as well).
    #
    # Additionally we cache/persist to avoid needing to return to S3
    # in case of failure. This is the most accurate method we know to
    # isolate read from execution in Spark.
    nation.cache()
    nation.persist()
    print(nation.head())
    # Add for Spark SQL
    nation.createOrReplaceTempView("nation")
    print(nation.count())


def q10(spark):
    # Load the data
    t1 = time.time()
    load_lineitem(data_folder, spark)
    load_orders(data_folder, spark)
    load_customer(data_folder, spark)
    load_nation(data_folder, spark)
    
    t1 = time.time()
    total = spark.sql(
        """select
                c_custkey,
                c_name,
                sum(l_extendedprice * (1 - l_discount)) as revenue,
                c_acctbal,
                n_name,
                c_address,
                c_phone,
                c_comment
            from
                customer,
                orders,
                lineitem,
                nation
            where
                c_custkey = o_custkey
                and l_orderkey = o_orderkey
                and o_orderdate >= date '1994-11-01'
                and o_orderdate < date '1994-11-01' + interval '3' month
                and l_returnflag = 'R'
                and c_nationkey = n_nationkey
            group by
                c_custkey,
                c_name,
                c_acctbal,
                c_phone,
                n_name,
                c_address,
                c_comment
            order by
                revenue desc
            limit 20"""
    )

    total.show()
    print("Q10 Execution time (s): ", time.time() - t1)


def main(folder):
    run_queries(folder, spark)

In [None]:
main("s3://tpch-data-parquet/SF1000/")