In [1]:
import pyspark
from  pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf

In [2]:
# https://stackoverflow.com/a/39528114/2049763
# https://stackoverflow.com/q/43692453/2049763

config = SparkConf()
config.setMaster('spark://192.72.33.100:7077')
# config.set("spark.driver.memory", "2g") 
config.set("spark.executor.memory", "2g") 
config.set("spark.network.timeout", "180")
config.set("spark.driver.host", "192.72.33.100")
config.set("spark.local.ip", "192.72.33.100")
config.set("spark.driver.bindAddress", "192.72.33.100")
# https://stackoverflow.com/a/51746082/2049763 spark.jars spark.driver.extraClassPath
# https://stackoverflow.com/a/53837399/2049763
# config.set("spark.driver.extraClassPath", 
#            "/vagrant/mysql-connector-java/mysql-connector-java-5.1.48.jar:/vagrant/mysql-connector-java/mysql-connector-java-5.1.48-bin.jar")

# https://stackoverflow.com/a/38479360/2049763
# config.set("spark.executor.instances", "2")

config.setAppName('my_mysql_app')

<pyspark.conf.SparkConf at 0x7f2b90fb0c50>

In [3]:
sc.stop()
sc = SparkContext(conf=config)

In [4]:
spark = SparkSession.builder.config(conf=config).getOrCreate()
sc.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.executor.memory', '2g'),
 ('spark.network.timeout', '180'),
 ('spark.sql.execution.arrow.enabled', 'true'),
 ('spark.local.ip', '192.72.33.100'),
 ('spark.master', 'spark://192.72.33.100:7077'),
 ('spark.driver.port', '36659'),
 ('spark.app.id', 'app-20191129215650-0001'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '192.72.33.100'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.eventLog.dir', '/var/log/ipnb'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.bindAddress', '192.72.33.100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'my_mysql_app'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.sql.warehouse.dir', '/vagrant/hive/warehouse')]

# TPCD DATABASE FOR OS PERFORMANCE TESTING

## Select queries 

In [11]:
# https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
# https://relational.fit.cvut.cz/dataset/TPCD
# https://stackoverflow.com/a/41125629/2049763
# query_1 = "SELECT * FROM tpcd.dss_order"
# query_0 = "(SELECT * FROM tpcd.dss_order LIMIT 10) AS t" # 3 seconds 

dss_order_df = spark.read.format("jdbc").options(
    url = "jdbc:mysql://relational.fit.cvut.cz:3306", 
    driver = "com.mysql.jdbc.Driver",
    user = "guest",
    password = "relational",
    numPartitions = 256,
    dbtable =  "tpcd.dss_order",
    partitionColumn = "o_orderkey",
    lowerBound = 1,
    upperBound = 100000000).load()

In [12]:
# https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3
# https://stackoverflow.com/a/35062411/2049763
dss_order_df.show(5) # 1.5 min

+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+--------------------+
|o_orderkey|o_custkey|o_orderstatus|o_totalprice|o_orderdate|o_orderpriority|        o_clerk|o_shippriority|           o_comment|
+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+--------------------+
|         1|       47|            F|      932.03| 1992-01-01|       1-URGENT|Clerk#000000001|             0|NNiPP2mRgR2LRwRym...|
|         2|    39227|            O| 141065.6154| 1996-09-04|          5-LOW|Clerk#000000447|             0|iL6m0mg4kR 20wBPL...|
|         3|    33632|            O| 136655.7172| 1997-06-02|4-NOT SPECIFIED|Clerk#000000312|             0|6  mBNm2k2ygR4 02...|
|         4|    51901|            O| 295046.4753| 1998-05-04|       3-MEDIUM|Clerk#000000046|             0|Li62iByNmwRBB0R6R...|
|         5|    46604|            O| 359799.4073| 1998-02-05|         2-HIGH|Clerk#0000008

In [7]:
dss_lineitem_df = spark.read.format("jdbc").options(
    url = "jdbc:mysql://relational.fit.cvut.cz:3306", 
    driver = "com.mysql.jdbc.Driver",
    user = "guest",
    password = "relational",
    numPartitions = 256,
    dbtable =  "tpcd.dss_lineitem",
    partitionColumn = "l_orderkey",
    lowerBound = 1,
    upperBound = 10000000).load()

In [8]:
(dss_lineitem_df.count(), len(dss_lineitem_df.columns))

(5998391, 16)

In [9]:
dss_lineitem_df.columns

['l_orderkey',
 'l_partkey',
 'l_suppkey',
 'l_linenumber',
 'l_quantity',
 'l_extendedprice',
 'l_discount',
 'l_tax',
 'l_returnflag',
 'l_linestatus',
 'l_shipdate',
 'l_commitdate',
 'l_receiptdate',
 'l_shipinstruct',
 'l_shipmode',
 'l_comment']

In [10]:
dss_lineitem_df.show(5) # 1.6 min

+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
|l_orderkey|l_partkey|l_suppkey|l_linenumber|l_quantity|l_extendedprice|l_discount|l_tax|l_returnflag|l_linestatus|l_shipdate|l_commitdate|l_receiptdate|   l_shipinstruct|l_shipmode|           l_comment|
+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
|         1|       32|       33|           1|       1.0|         932.03|       0.0|  0.0|           R|           F|1992-01-02|  1992-01-31|   1992-01-03|DELIVER IN PERSON|   REG AIR|i0i60mBymi       ...|
|         2|   126152|     3689|           1|      43.0|       50660.45|       0.1|  0.0|           N|           O|1996-12-22|  1996-10-05|   1996-12-27|DELIVER IN PERSON|      RAIL|4B

## AGGREGATE QUERIES

In [14]:
# SELECT o_custkey, sum(o_totalprice) FROM tpcd.dss_order group by o_custkey
sum_o_totalprice_df = dss_order_df.groupby('o_custkey').sum('o_totalprice')

In [15]:
sum_o_totalprice_df.show(10) # 37 s

+---------+------------------+
|o_custkey| sum(o_totalprice)|
+---------+------------------+
|    92644|      2836487.3839|
|    87338|1658548.1436000003|
|    96469|      2674413.0357|
|    31261|3534400.9905000003|
|   106544|      1235199.2328|
|   102119|1198082.8994999998|
|   117994|      2273359.5405|
|   133189|3571808.0951000005|
|    16339|       2474340.657|
|    99817|      2634315.1224|
+---------+------------------+
only showing top 10 rows



In [17]:
# SELECT * FROM tpcd.dss_order ORDER BY o_orderdate DESC
dss_order_sorted_df = dss_order_df.orderBy('o_orderdate', ascending=False)

In [18]:
dss_order_sorted_df.show(10) # 2.2 min

+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+--------------------+
|o_orderkey|o_custkey|o_orderstatus|o_totalprice|o_orderdate|o_orderpriority|        o_clerk|o_shippriority|           o_comment|
+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+--------------------+
|   3205094|   138424|            O| 151449.0993| 1998-08-02|       1-URGENT|Clerk#000000154|             0| y 2gNLB4BRgNRm4L...|
|   3310498|    22888|            O| 200453.5016| 1998-08-02|          5-LOW|Clerk#000000692|             0|yy0iLRkBik iiPy2P...|
|   3209511|    22936|            O| 285038.6412| 1998-08-02|          5-LOW|Clerk#000000693|             0|RL24iBiR4yPRk2P2 ...|
|   3184548|    80701|            O|  27053.6576| 1998-08-02|       3-MEDIUM|Clerk#000000923|             0|ywyB BBNiB 2Rk2iy...|
|   3221542|    46030|            O|  70419.4128| 1998-08-02|4-NOT SPECIFIED|Clerk#0000003

In [19]:
# SELECT * FROM tpcd.dss_lineitem order by l_shipdate desc
dss_lineitem_sorted_df = dss_lineitem_df.orderBy('l_shipdate', ascending=False)

In [20]:
dss_lineitem_sorted_df.show(10) # 8.7 min

+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
|l_orderkey|l_partkey|l_suppkey|l_linenumber|l_quantity|l_extendedprice|l_discount|l_tax|l_returnflag|l_linestatus|l_shipdate|l_commitdate|l_receiptdate|   l_shipinstruct|l_shipmode|           l_comment|
+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
|   1519014|    53655|     3656|           2|       5.0|        8043.25|       0.1| 0.07|           N|           O|1998-12-01|  1998-10-23|   1998-12-23| TAKE BACK RETURN|     TRUCK| 0ywBPgP6 6NPk2gR...|
|   1207745|    35314|      321|           4|      37.0|       46224.47|      0.01| 0.04|           N|           O|1998-12-01|  1998-09-25|   1998-12-26|             NONE|      RAIL|Ng

## INNER JOIN

In [22]:
dss_customer_df = spark.read.format("jdbc").options(
    url = "jdbc:mysql://relational.fit.cvut.cz:3306", 
    driver = "com.mysql.jdbc.Driver",
    user = "guest",
    password = "relational",
    numPartitions = 256,
    dbtable =  "tpcd.dss_customer",
    partitionColumn = "c_custkey",
    lowerBound = 1,
    upperBound = 10000000).load()

In [24]:
# select dss_order.o_custkey, dss_order.o_totalprice , dss_customer.c_name,
# dss_customer.c_address,dss_customer.c_nationkey,dss_customer.c_phone
# FROM dss_order
# INNER JOIN dss_customer ON dss_order.o_custkey = dss_customer.c_custkey 
# order by c_nationkey desc

dss_order_join_dss_customer_df = dss_order_df.join(
    dss_customer_df, dss_order_df.o_custkey == dss_customer_df.c_custkey).select(
    dss_order_df.o_custkey, dss_order_df.o_totalprice , 
    dss_customer_df.c_name,
    dss_customer_df.c_address, dss_customer_df.c_nationkey, dss_customer_df.c_phone)

In [25]:
dss_order_join_dss_customer_sorted_df = dss_order_join_dss_customer_df.orderBy('c_nationkey', ascending=False)

In [26]:
dss_order_join_dss_customer_sorted_df.show(10) # 2.0 min

+---------+------------+--------------------+--------------------+-----------+------------+
|o_custkey|o_totalprice|              c_name|           c_address|c_nationkey|     c_phone|
+---------+------------+--------------------+--------------------+-----------+------------+
|     2443| 187451.5443|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443| 218473.1685|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443| 348536.5476|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443| 215322.6765|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443| 118284.3657|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443|  50220.6138|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443| 247638.5209|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|378-155-2138|
|     2443| 139192.4071|Customer#00000024...|gkN0kgRNBLm4P6LN6...|         24|37

In [27]:
dss_order_join_dss_customer_sorted_df.show(10) # killing 1 of 4 nodes: 2.6 min 

+---------+------------+--------------------+--------------------+-----------+------------+
|o_custkey|o_totalprice|              c_name|           c_address|c_nationkey|     c_phone|
+---------+------------+--------------------+--------------------+-----------+------------+
|     2821| 148740.3968|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821| 111448.1032|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821|  189929.301|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821| 136359.8534|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821| 204574.3857|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821|  89034.3936|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821| 150957.3094|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|534-624-9403|
|     2821| 282999.9287|Customer#00000028...|P4k24kyimNPw4gN6y...|         24|53