tags | displayed_sidebar | ||
---|---|---|---|
|
docsEnglish |
import WarningLicenseKeyContact from '/src/components/en-us/_warning-license-key-contact.mdx';
This tutorial describes how to run analytical queries on sample data by using ScalarDB Analytics with Spark. The source code is available at https://github.com/scalar-labs/scalardb-samples/tree/main/scalardb-analytics-spark-sample.
This sample tutorial shows how you can run interactive analysis in the Spark shell by using ScalarDB Analytics with Spark. In particular, you'll learn how to run the following two types of queries:
- Read data and calculate summaries.
- Join tables that span multiple storages.
- Docker 20.10 or later with Docker Compose V2 or later
Open Terminal, then clone the ScalarDB samples repository by running the following command:
git clone https://github.com/scalar-labs/scalardb-samples
Then, go to the directory that contains the sample application by running the following command:
cd scalardb-samples/scalardb-analytics-spark-sample
Copy your license certificate (cert.pem
) to the sample directory by running the following command, replacing <PATH_TO_YOUR_LICENSE>
with the path to your license:
cp /<PATH_TO_YOUR_LICENSE>/cert.pem cert.pem
To set up the sample underlying databases for ScalarDB, run the following command:
docker compose up -d --wait
This command starts three services locally for PostgreSQL, Cassandra, and DynamoDB.
Then, set up the sample databases on those services by running the following command:
docker compose run --rm sample-data-loader
This command creates postgresns
, cassandrans
, and dynamons
namespaces, which are mapped to the local PostgreSQL, Cassandra, and DyanmoDB services respectively, creates postgresns.orders
, cassandrans.lineitem
, and dynamons.customer
tables, and loads the sample data into those tables. For details about the table schema, see Schema details.
To launch the Spark shell, run the following command:
docker compose run --rm spark-shell
As you can see in docker-compose.yml
, this command executes the spark-shell
command with the --packages com.scalar-labs:scalardb-analytics-spark-<SPARK_VERSION>_<SCALA_VERSION>:<SCALARDB_ANALYTICS_WITH_SPARK_VERSION>
option. With this option, spark-shell
automatically downloads ScalarDB Analytics with Spark from the Maven Central Repository and add it to the classpath of spark-shell
.
In the Spark shell console, you can set up ScalarDB Analytics with Spark by running the following commands:
scala> import com.scalar.db.analytics.spark.implicits._
scala> spark.setupScalarDbAnalytics(
| configPath = "/etc/scalardb.properties",
| namespaces = Set("postgresns", "cassandrans", "dynamons"),
| license = License.certPath("""{"your":"license", "key":"in", "json":"format"}""", "/etc/cert.pem")
| )
:::warning
Remember that you must have copied your license certificate to the sample directory as described in Add your license certificate to the sample directory, since the license is referenced in the JSON string.
:::
Now, you should have tables for postgresns.orders
, cassandrans.lineitem
, and dynamons.customer
on the Spark side that are equivalent to the tables in ScalarDB. For example:
scala> sql("DESCRIBE postgresns.orders").show()
+---------------+---------+-------+
| col_name|data_type|comment|
+---------------+---------+-------+
| o_orderkey| int| NULL|
| o_custkey| int| NULL|
| o_orderstatus| string| NULL|
| o_totalprice| double| NULL|
| o_orderdate| string| NULL|
|o_orderpriority| string| NULL|
| o_clerk| string| NULL|
| o_shippriority| int| NULL|
| o_comment| string| NULL|
+---------------+---------+-------+
The following sections describe how to read data, calculate summaries, and join tables that span multiple storages.
You can run a query that reads data from cassandrans.lineitem
, with the actual data stored in Cassandra, and calculates several summaries of the ordered line items by aggregating the data.
To run the query, run the following command in the Spark shell console:
scala> sql("""
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
cassandrans.lineitem
WHERE
to_date(l_shipdate, 'yyyy-MM-dd') <= date '1998-12-01' - 3
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus;
""").show()
You should see the following output:
+------------+------------+-------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------+
|l_returnflag|l_linestatus|sum_qty| sum_base_price| sum_disc_price| sum_charge| avg_qty| avg_price| avg_disc|count_order|
+------------+------------+-------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------+
| A| F| 1519|2374824.6560278563|1387364.2207725341|1962763.4654265852|26.649122807017545|41663.590456629056|0.41501802923479575| 57|
| N| F| 98| 146371.2295412012| 85593.96776336085|121041.55837332775|32.666666666666664|48790.409847067065|0.40984706454007996| 3|
| N| O| 5374| 8007373.247086477| 4685647.785126835| 6624210.945739046|24.427272727272726| 36397.15112312035| 0.4147594809559689| 220|
| R| F| 1461|2190869.9676265526|1284178.4378283697|1814151.2807494882|25.189655172413794| 37773.62013149229|0.41323493790730753| 58|
+------------+------------+-------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------+
You can also run a query to join tables that are connected to the three back-end databases and calculate the unshipped orders with the highest revenue on a particular date.
To run the query, run the following command in the Spark shell console:
scala> sql("""
SELECT
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,
o_shippriority
FROM
dynamons.customer,
postgresns.orders,
cassandrans.lineitem
WHERE
c_mktsegment = 'AUTOMOBILE'
AND c_custkey = o_custkey
AND l_orderkey = o_orderkey
AND o_orderdate < '1995-03-15'
AND l_shipdate > '1995-03-15'
GROUP BY
l_orderkey,
o_orderdate,
o_shippriority
ORDER BY
revenue DESC,
o_orderdate,
l_orderkey
LIMIT 10;
""").show()
You should see the following output:
+----------+------------------+-----------+--------------+
|l_orderkey| revenue|o_orderdate|o_shippriority|
+----------+------------------+-----------+--------------+
| 1071617|128186.99915996166| 1995-03-10| 0|
| 1959075| 33104.51278645416| 1994-12-23| 0|
| 430243|19476.115819260962| 1994-12-24| 0|
+----------+------------------+-----------+--------------+
:::note
You can also run any arbitrary query that Apache Spark and Spark SQL support on the imported tables in this sample tutorial. Since ScalarDB Analytics with Spark supports all queries that Spark SQL supports, you can use not only join, aggregation, filtering, and ordering as shown in the example but also the window function, lateral join, and various analytical operations.
To see which types of queries Spark SQL supports, see the Spark SQL documentation.
:::
To stop the sample application, stop the Docker container by running the following command:
docker compose down
In this sample tutorial, you have tables with the following schema in the underlying databases of ScalarDB:
erDiagram
"dynamons.customer" ||--|{ "postgresns.orders" : "custkey"
"dynamons.customer" {
int c_custkey
text c_name
text c_address
int c_nationkey
text c_phone
double c_acctbal
text c_mktsegment
text c_comment
}
"postgresns.orders" ||--|{ "cassandrans.lineitem" : "orderkey"
"postgresns.orders" {
int o_orderkey
int o_custkey
text o_orderstatus
double o_totalprice
text o_orderdate
text o_orderpriority
text o_clerk
int o_shippriority
text o_comment
}
"cassandrans.lineitem" {
int l_orderkey
int l_partkey
int l_suppkey
int l_linenumber
double l_quantity
double l_extendedprice
double l_discount
double l_tax
text l_returnflag
text l_linestatus
text l_shipdate
text l_commitdate
text l_receiptdate
text l_shipinstruct
text l_shipmode
text l_comment
}
For reference, this diagram shows the following:
dynamons
,postgresns
, andcassandrans
. Namespaces that are mapped to the back-end storages of DynamoDB, PostgreSQL, and Cassandra, respectively.dynamons.customer
. A table that represents information about customers. This table includes attributes like customer key, name, address, phone number, and account balance.postgresns.orders
. A table that contains information about orders that customers have placed. This table includes attributes like order key, customer key, order status, order date, and order priority.cassandrans.lineitem
. A table that represents line items associated with orders. This table includes attributes such as order key, part key, supplier key, quantity, price, and shipping date.