# Data Federation with Apache Spark
In this notebook we'll explore how Spark's Structured Streaming API makes it easy to do data federation between
heterogeneous data sources, data at rest and streaming data.

### Configuring Spark with JDBC and Kafka
In order to work with Kafka and Spark's JDBC API, you'll need to provide Spark with some additional java packages. Many of these are available as Maven-style packages, such as the Kafka driver and the driver for postgresql in the cell below.

Consuming JDBC drivers via Maven coordinates and spark.jars.packages is convenient, since Spark will automatically download such packages and install them on Spark executors.

Spark can receive its configuration parameters from a variety of channels. In general, configurations set via a SparkConf object (as below) will override all other configurations. However, there are a few glitches in this rule, and the spark.package.jars parameter is one of them, which is important for this tutorial. To maximize clarity, this notebook unsets PYSPARK_SUBMIT_ARGS in favor of doing all configurations using a SparkConf object so that it is easy to read.

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import os

# Disable this so that configuration of 'spark.jars.packages' works correctly
if 'PYSPARK_SUBMIT_ARGS' in os.environ:
    del os.environ['PYSPARK_SUBMIT_ARGS']

# Instantiate a spark configuration object to receive settings
spark_conf = SparkConf()

# Maven coordinates for package containing JDBC drivers
jdbc_driver_packages = 'org.postgresql:postgresql:42.2.9,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3'

# Configure spark to see the postgresql driver package
spark_conf.set('spark.jars.packages', jdbc_driver_packages)

# The name of your Spark cluster hostname or ip address
spark_cluster = os.environ['SPARK_CLUSTER']

# Configure some basic spark cluster sizing parameters
spark_conf.set('spark.cores.max', 2)
spark_conf.set('spark.executor.cores', '1')

spark = SparkSession.builder \
    .master('spark://{cluster}:7077'.format(cluster=spark_cluster)) \
    .appName('Spark-Demo') \
    .config(conf = spark_conf) \
    .getOrCreate()

### Checking driver configurations
When you are configuring Spark with extra drivers, it can be useful to sanity check that you have installed what you thought you did. The next cells use `getConf()` to sanity-check Spark's jar file configurations.

In [2]:
'postgresql' in spark.sparkContext.getConf().get('spark.jars')

True

In [3]:
'kafka' in spark.sparkContext.getConf().get('spark.jars')

True

### Spark Data Frames
Spark's preferred data model is the DataFrame. In the following cell, we are going to load some simulated
credit card fraud data from a raw CSV file, into a python pandas DataFrame, and then tell Spark to create its own Spark Data Frame from pandas.

You can see that this data includes a numeric user id and a similar merchant id.
In the next cells, we'll be setting up a DataFrame join operation to replace these id numbers with names.

In [4]:
import pandas as pd
transactionDF = spark.createDataFrame(pd.read_csv("fraud.csv"))
transactionDF.show(5)

+----------+----------+-------+------+-----------+-----------+-------+
| timestamp|     label|user_id|amount|merchant_id| trans_type|foreign|
+----------+----------+-------+------+-----------+-----------+-------+
|1591150620|legitimate|      8|  9.42|          4|     online|  false|
|1591150669|legitimate|      9| 36.38|         11|contactless|  false|
|1591152891|legitimate|      7| 10.56|          8|     online|  false|
|1591157902|legitimate|      9| 25.09|         11|      swipe|  false|
|1591157988|legitimate|      8| 17.73|          4|     manual|  false|
+----------+----------+-------+------+-----------+-----------+-------+
only showing top 5 rows



### Declaring some new Data Frame tables
In the following cell, we are creating a new DataFrame that maps user ids into names.
For this demo, our name table is small, and we are creating it manually.
A real table for an enterprise would reside in something like a SQL database.

In [5]:
names = ["Jake", "Sherman", "Morgan", "Bodie", "Ben", "Elwood", "Sandy", "Clover", "Molly", "Linsey"]
raw = [(j, names[j]) for j in range(10)]
tdf = spark.createDataFrame(raw, ['user_id', 'user_name'])
userDF = tdf.select(tdf.user_id.cast("int"), tdf.user_name)
userDF.show(5)

+-------+---------+
|user_id|user_name|
+-------+---------+
|      0|     Jake|
|      1|  Sherman|
|      2|   Morgan|
|      3|    Bodie|
|      4|      Ben|
+-------+---------+
only showing top 5 rows



And now we create a similar table for merchant names:

In [6]:
names = ["Changing Hands", "First Draft", "Anaya's", "SBUX", "Ike's", "Bob's Burgers", "Lovin Spoonful", "Ayse's", "Dawn Treader", "Taco Hut", "Johnny's Hots", "Pavement", "Victrola", "The Barn", "Clockwork"]
raw = [(j, names[j]) for j in range(15)]
tdf = spark.createDataFrame(raw, ['merchant_id', 'merchant_name'])
merchantDF = tdf.select(tdf.merchant_id.cast("int"), tdf.merchant_name)
merchantDF.show(5)

+-----------+--------------+
|merchant_id| merchant_name|
+-----------+--------------+
|          0|Changing Hands|
|          1|   First Draft|
|          2|       Anaya's|
|          3|          SBUX|
|          4|         Ike's|
+-----------+--------------+
only showing top 5 rows



### Registering DataFrames as Tables
Spark allows you to register DataFrames with table names that can be used with SQL queries.
Hre we'll register our tables so that we can operate on them with SQL:

In [7]:
transactionDF.registerTempTable("transactions")
userDF.registerTempTable("users")
merchantDF.registerTempTable("merchants")

### Table Joins with Spark Data Frames
Spark Data Frames support all of the join operations supported in common SQL dialects,
and you can operate on them with SQL queries.
In this cell, we are setting up a join with our user and merchant tables to replace id numbers with names:

In [8]:
joinq = """
SELECT timestamp, user_name, merchant_name, foreign, trans_type, amount FROM
transactions
left join users on transactions.user_id = users.user_id
left join merchants on transactions.merchant_id = merchants.merchant_id
"""

joinDF = spark.sql(joinq)
joinDF.show(5)

+----------+---------+--------------+-------+------------+------+
| timestamp|user_name| merchant_name|foreign|  trans_type|amount|
+----------+---------+--------------+-------+------------+------+
|1601491845|     Jake|Changing Hands|  false|chip_and_pin| 16.37|
|1593029512|     Jake|Changing Hands|  false|chip_and_pin|  16.2|
|1595585679|     Jake|Changing Hands|  false|      manual| 15.51|
|1596822054|     Jake|Changing Hands|  false| contactless| 15.15|
|1603014673|   Linsey|Changing Hands|  false|chip_and_pin| 21.41|
+----------+---------+--------------+-------+------------+------+
only showing top 5 rows



### Grouped Aggregations with Data Frames
Spark supports the common data analysis operations such as aggregating data by the values of a given column.
In the following cell we are grouping transactions by user name, and finding the average transaction amount
for each user.

In [9]:
from pyspark.sql import functions as F
g = joinDF.groupBy("user_name").agg(F.mean("amount").alias("avg"))
g.sort(F.desc("avg")).show()

+---------+------------------+
|user_name|               avg|
+---------+------------------+
|   Linsey|42.803226397800174|
|    Sandy|40.503693820224726|
|   Morgan| 39.76930656934308|
|    Molly| 36.44709313264344|
|   Elwood| 35.53930167597763|
|    Bodie|34.177011217948724|
|   Clover|  33.9838043478261|
|  Sherman| 33.63665760869564|
|     Jake| 30.17052736982641|
|      Ben|26.919920948616625|
+---------+------------------+



Similar to the previous aggregation, here we find the average transaction amount for each merchant name.

In [10]:
g = joinDF.groupBy("merchant_name").agg(F.mean("amount").alias("avg"))
g.sort(F.desc("avg")).show()

+--------------+------------------+
| merchant_name|               avg|
+--------------+------------------+
|        Ayse's|52.653333333333336|
| Bob's Burgers|  47.9674193548387|
|      Victrola|44.008076923076935|
|          SBUX|42.444074074074074|
|      Pavement| 38.92485021398004|
|     Clockwork|38.505380821917676|
|Lovin Spoonful| 34.72461977186304|
|         Ike's|34.193778741865486|
|  Dawn Treader|31.942931896883408|
| Johnny's Hots| 30.46848484848485|
|      The Barn| 27.20685714285713|
|      Taco Hut| 26.23961538461538|
|   First Draft| 23.74031250000001|
|Changing Hands|23.066451612903233|
|       Anaya's|22.162608695652178|
+--------------+------------------+



## Spark Data Frames with a SQL Database

In the previous section, we worked with Data Frames based on flat files such as CSV.
In the next section we'll see that Spark's Structured Streaming allows you to operate on data residing in a SQL Database without changing your code.

### Connecting to a SQL Database with JDBC
The next cell sets up the parameters Spark uses to connect to an SQL database using JDBC.

For a more detailed discussion of Spark and JDBC drivers, see the Red Hat blog
[Data integration in the hybrid cloud with Apache Spark and Open Data Hub](https://next.redhat.com/2020/06/16/data-integration-in-the-hybrid-cloud-with-apache-spark-and-open-data-hub/)

In [11]:
spark_jdbc_url = 'jdbc:postgresql://{host}:{port}/{database}'.format( \
    host     = 'postgresql', \
    port     = '5432', \
    database = 'demo')

spark_jdbc_prop = { \
    'user':     'demo', \
    'password': 'demo', \
    'driver':   'org.postgresql.Driver'
}

### Creating SQL Table
To set up our example, we will write our transaction data to a SQL database:

In [12]:
transactionDF.write.jdbc(table='transactions', mode='overwrite', url=spark_jdbc_url, properties=spark_jdbc_prop)

### Spark Data Frames from JDBC
Here we attach a Spark DataFrame to our transaction data stored in Postgresql,
and confirm that it is the same as the data we loaded from CSV earlier.

In [13]:
transactionSQLDF = spark.read.jdbc( \
    table      = '({q}) tmp'.format(q='select * from transactions'), \
    url        = spark_jdbc_url, \
    properties = spark_jdbc_prop \
)

transactionSQLDF.show(5)

+----------+----------+-------+------+-----------+------------+-------+
| timestamp|     label|user_id|amount|merchant_id|  trans_type|foreign|
+----------+----------+-------+------+-----------+------------+-------+
|1598608959|legitimate|      8| 18.76|          4|      online|  false|
|1598609843|legitimate|      3| 129.7|          8|      online|   true|
|1598610775|legitimate|      9| 42.84|         14|       swipe|  false|
|1598610872|legitimate|      1| 28.08|          6|chip_and_pin|  false|
|1598610890|legitimate|      2| 10.29|         11|      online|  false|
+----------+----------+-------+------+-----------+------------+-------+
only showing top 5 rows



### Table Joins from JDBC
In the following cells, we join our id numbers to names as we did earlier.
You can see that our SQL query is exactly the same, excepting the table names that tell spark to
use the tables we set up over JDBC:

In [14]:
transactionSQLDF.registerTempTable("transactionsSQL")
joinq = """
SELECT timestamp, user_name, merchant_name, foreign, trans_type, amount FROM
transactionsSQL
left join users on transactionsSQL.user_id = users.user_id
left join merchants on transactionsSQL.merchant_id = merchants.merchant_id
"""
joinSQLDF = spark.sql(joinq)
joinSQLDF.show(5)

+----------+---------+--------------+-------+------------+------+
| timestamp|user_name| merchant_name|foreign|  trans_type|amount|
+----------+---------+--------------+-------+------------+------+
|1601491845|     Jake|Changing Hands|  false|chip_and_pin| 16.37|
|1593029512|     Jake|Changing Hands|  false|chip_and_pin|  16.2|
|1595585679|     Jake|Changing Hands|  false|      manual| 15.51|
|1596822054|     Jake|Changing Hands|  false| contactless| 15.15|
|1592832148|   Clover|Changing Hands|  false| contactless| 20.61|
+----------+---------+--------------+-------+------------+------+
only showing top 5 rows



In [15]:
g = joinSQLDF.groupBy("user_name").agg(F.mean("amount").alias("avg"))
g.sort(F.desc("avg")).show()

+---------+------------------+
|user_name|               avg|
+---------+------------------+
|   Linsey| 42.80322639780018|
|    Sandy|40.503693820224726|
|   Morgan|39.769306569343094|
|    Molly| 36.44709313264344|
|   Elwood| 35.53930167597763|
|    Bodie|  34.1770112179487|
|   Clover|  33.9838043478261|
|  Sherman|33.636657608695636|
|     Jake| 30.17052736982641|
|      Ben|26.919920948616596|
+---------+------------------+



In [16]:
g = joinSQLDF.groupBy("merchant_name").agg(F.mean("amount").alias("avg"))
g.sort(F.desc("avg")).show()

+--------------+------------------+
| merchant_name|               avg|
+--------------+------------------+
|        Ayse's|52.653333333333336|
| Bob's Burgers|47.967419354838704|
|      Victrola|44.008076923076935|
|          SBUX| 42.44407407407408|
|      Pavement| 38.92485021398003|
|     Clockwork|38.505380821917754|
|Lovin Spoonful|34.724619771863104|
|         Ike's|34.193778741865415|
|  Dawn Treader|31.942931896883348|
| Johnny's Hots| 30.46848484848485|
|      The Barn|27.206857142857135|
|      Taco Hut| 26.23961538461538|
|   First Draft|23.740312500000005|
|Changing Hands|23.066451612903226|
|       Anaya's|22.162608695652178|
+--------------+------------------+



## Spark and Data Frames with Streaming Data
We have seen that Spark Data Frames operate the same whether they are based on file format such as CSV or SQL databases such as Postgresql.
With Spark Data Frames and Structured Streaming we can also operate on streaming data.
The following section we perform the same table joins and grouped aggregations on streaming data pulled off of a Kafka topic.

### Attaching a DataFrame to Kafka streaming data
The following cell shows an example of how to attach a Spark Data Frame to streaming data - in this case, data pulled off of a Kafka topic.

In our example, the data records have been encoded as JSON objects, and so we have added some additional `select` statements to unpack the JSON and impose a Data Frame schema that corresponds to our examples above.

To populate this kafka topic, you can run the companion notebook `kafka-product.ipynb` in this git repo.

In [17]:
transactionsStreamDF = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "odh-message-bus-kafka-bootstrap:9092") \
    .option("subscribe", "demotopic") \
    .option("startingOffsets", "latest") \
    .load() \
    .select(F.from_json(F.col("value").cast("string"), transactionDF.schema).alias("json")) \
    .select(F.col("json.timestamp"),F.col("json.user_id"),F.col("json.merchant_id"),F.col("json.trans_type"),F.col("json.foreign"),F.col("json.amount"))

### Spark SQL with Streaming Data is Same as Data at Rest
In the following cell we can see that, once again, our SQL for joining with user and merchant tables is exactly the same as our previous examples:

In [18]:
transactionsStreamDF.registerTempTable("transactionsStream")
joinq = """
SELECT timestamp, user_name, merchant_name, foreign, trans_type, amount FROM
transactionsStream
left join users on transactionsStream.user_id = users.user_id
left join merchants on transactionsStream.merchant_id = merchants.merchant_id
"""
joinStreamDF = spark.sql(joinq)

### Streaming Operations
True streaming queries, such as the ones below, must be aggregations,
so in this final streaming example we move directly to our data aggregations.

Note that in this variation, we attach a `queryName` to each result, so that we can poll the accumulating results as the data comes off the Kafka stream.

In [19]:
streamquery_user = joinStreamDF.groupBy("user_name").agg(F.mean("amount").alias("avg")).sort(F.desc("avg")).writeStream \
    .trigger(processingTime='1 seconds') \
    .outputMode("complete") \
    .format("memory") \
    .queryName("stream_results_user") \
    .start()

streamquery_merchant = joinStreamDF.groupBy("merchant_name").agg(F.mean("amount").alias("avg")).sort(F.desc("avg")).writeStream \
    .trigger(processingTime='1 seconds') \
    .outputMode("complete") \
    .format("memory") \
    .queryName("stream_results_merchant") \
    .start()

### Polling Query Results from Streaming Data
A defining characteristic of streaming data queries in Spark is that they are updated continuously as data comes in off the stream.
This cell accesses the temporary tables we set up above with `queryName` to display the latest results.
You can re-run this cell and watch the results update.

In [22]:
spark.table("stream_results_user").show()
spark.table("stream_results_merchant").show()

+---------+------------------+
|user_name|               avg|
+---------+------------------+
|   Linsey|          67.50225|
|    Molly|          37.81825|
|    Bodie| 35.84312500000001|
|     Jake| 34.74527272727272|
|      Ben|30.797297297297302|
|   Morgan|29.498249999999995|
|  Sherman|27.860357142857147|
|   Elwood|23.871724137931032|
|   Clover|22.583666666666673|
|    Sandy| 22.44607142857143|
+---------+------------------+

+--------------+------------------+
| merchant_name|               avg|
+--------------+------------------+
|         Ike's|45.988131868131866|
|      Pavement|            39.878|
|     Clockwork|32.274155844155835|
|  Dawn Treader| 31.57601769911505|
|Lovin Spoonful| 30.97042253521127|
|   First Draft|             23.17|
|       Anaya's|             20.81|
| Bob's Burgers|             18.85|
+--------------+------------------+



### Halting a Streaming Query
To halt the streaming queries, execute this cell.
Otherwise, they will run until the notebook kernel is halted.

In [23]:
streamquery_user.stop()
streamquery_merchant.stop()