# Purpose

Explore PySpark and the JDBC connection functionality to read from operational databases.

In this notebook we will setup a PostgreSQL instance and populate it with the Pagila dataset. We will then connect to the database via a JDBC connector.

# Setup

## PostgreSQL

Firstly, let's install postgres in the this Colab instance.

In [1]:
!sudo apt install postgresql postgresql-contrib

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libcommon-sense-perl libjson-perl libjson-xs-perl libtypes-serialiser-perl
  logrotate netbase postgresql-14 postgresql-client-14
  postgresql-client-common postgresql-common ssl-cert sysstat
Suggested packages:
  bsd-mailx | mailx postgresql-doc postgresql-doc-14 isag
The following NEW packages will be installed:
  libcommon-sense-perl libjson-perl libjson-xs-perl libtypes-serialiser-perl
  logrotate netbase postgresql postgresql-14 postgresql-client-14
  postgresql-client-common postgresql-common postgresql-contrib ssl-cert
  sysstat
0 upgraded, 14 newly installed, 0 to remove and 49 not upgraded.
Need to get 18.4 MB of archives.
After this operation, 51.7 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 logrotate amd64 3.19.0-1ubuntu1.1 [54.3 kB]
Get:2 http://archive.ubuntu.com

In [2]:
!service postgresql start

 * Starting PostgreSQL 14 database server
   ...done.


Create a user in Postgres ([stackoverflow](https://stackoverflow.com/questions/12720967/how-to-change-postgresql-user-password/12721020#12721020))


In [3]:
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'test';"

ALTER ROLE


Store you database password in an environmental variable so that we need no type it in all the time (not advisable generally).

We'll use the notebook magic `%end`

In [4]:
%env PGPASSWORD=test

env: PGPASSWORD=test


## Pagila

Now, let's populate the PostgreSQL database with the Pagila data from the tutorial.

In [5]:
!git clone https://github.com/spatialedge-ai/pagila.git

Cloning into 'pagila'...
remote: Enumerating objects: 94, done.[K
remote: Counting objects: 100% (94/94), done.[K
remote: Compressing objects: 100% (50/50), done.[K
remote: Total 94 (delta 47), reused 85 (delta 42), pack-reused 0 (from 0)[K
Receiving objects: 100% (94/94), 2.91 MiB | 13.05 MiB/s, done.
Resolving deltas: 100% (47/47), done.


In [6]:
!psql -h localhost -U postgres -c "create database pagila"

CREATE DATABASE


In [7]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-schema.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
CREATE TYPE
ALTER TYPE
CREATE DOMAIN
ALTER DOMAIN
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE FUNCTION
ALTER FUNCTION
CREATE SEQUENCE
ALTER TABLE
SET
SET
CREATE TABLE
ALTER TABLE
CREATE FUNCTION
ALTER FUNCTION
CREATE AGGREGATE
ALTER AGGREGATE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE VIEW
ALTER TABLE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE VIEW
ALTER TABLE
CREATE VIEW
ALTER TABLE
CREATE SEQUENCE
ALTER TABLE
CREATE TABLE
ALTER TABLE
CREATE SEQU

In [8]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-data.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
COPY 200
COPY 109
COPY 600
COPY 603
COPY 16
COPY 2
COPY 599
COPY 6
COPY 1000
COPY 5462
COPY 1000
COPY 4581
COPY 2
COPY 16044
COPY 1157
COPY 2312
COPY 5644
COPY 6754
COPY 182
COPY 0
 setval 
--------
    200
(1 row)

 setval 
--------
    605
(1 row)

 setval 
--------
     16
(1 row)

 setval 
--------
    600
(1 row)

 setval 
--------
    109
(1 row)

 setval 
--------
    599
(1 row)

 setval 
--------
   1000
(1 row)

 setval 
--------
   4581
(1 row)

 setval 
--------
      6
(1 row)

 setval 
--------
  32098
(1 row)

 setval 
--------
  16049
(1 row)

 setval 
--------
      2
(1 row)

 setval 
--------
      2
(1 row)



## PySpark Setup

Now, let's download what is necessary for initiating jdbc connections, as well as what is required to run PySpark itself.

In [9]:
# https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

--2024-10-23 06:56:05--  https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046274 (1022K) [application/java-archive]
Saving to: ‘postgresql-42.5.0.jar’


2024-10-23 06:56:06 (8.68 MB/s) - ‘postgresql-42.5.0.jar’ saved [1046274/1046274]



In [10]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np

%config Completer.use_jedi = False

SPARKVERSION='3.2.1'
HADOOPVERSION='3.2'
pwd=os.getcwd()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"{pwd}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}"

# print(os.environ['SPARK_HOME'])


In [11]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-{SPARKVERSION}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz
!tar xf spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 78, <> line 3.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 
--2024-10-23 06:56:32--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300971569 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop3.2.tgz’


2024-10-23 07:04:13 (638 KB/s) - ‘spark-3.2.1-bin-hadoop3.2.tgz’ saved [300971569/300971569]



In [12]:
!cp postgresql-42.5.0.jar spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars

In [13]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [14]:
import findspark
findspark.init()
findspark.find()

# get a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars",
                                                       "postgresql-42.2.5.jar").config(
                                                          "spark.driver.extraClassPath",
                                                          f"spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars"
                                                       ).getOrCreate()
print(spark.conf.get('spark.jars'))

%env PYARROW_IGNORE_TIMEZONE=1

postgresql-42.2.5.jar
env: PYARROW_IGNORE_TIMEZONE=1


# Questions

### Question 1

Using a PySpark dataframe, print the schema of customer table in the pagila PostgreSQL database by utilising a JDBC connection.

In [15]:
# define the postgresql connection properties
jdbcHostname = "localhost"
jdbcPort = 5432
jdbcDatabase = "pagila"
jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
connectionProperties = {
    "user": "postgres",
    "password": "test",
    "driver": "org.postgresql.Driver"
}

# read the customer table into a pyspark DataFrame
customer_df = spark.read.jdbc(
    url=jdbcUrl,
    table="customer",
    properties=connectionProperties
)

# print the schema
customer_df.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)



### Question 2

Use the Spark SQL API to query the customer table, compute the number of unique email addresses in that table and print the result in the notebook.

In [16]:
# register the dataframe as a temporary view
customer_df.createOrReplaceTempView("customer")

# use spark sql to compute the number of unique email addresses
unique_emails = spark.sql("""
    SELECT COUNT(DISTINCT email) AS unique_email_count
    FROM customer
""")

# show
unique_emails.show()


+------------------+
|unique_email_count|
+------------------+
|               599|
+------------------+



### Question 3

Repeat this calculation using only the Dataframe API and print the result.

In [17]:
from pyspark.sql.functions import countDistinct

unique_emails_df = customer_df.agg(countDistinct("email").alias("unique_email_count"))

# show
unique_emails_df.show()


+------------------+
|unique_email_count|
+------------------+
|               599|
+------------------+



### Question 4

How many partitions are present in the dataframe resulting from Question 3 (additionally provide the code necessary to determine that)

In [30]:
# get number of partitions
num_partitions = unique_emails_df.rdd.getNumPartitions()

print(f" number of partitions: {num_partitions}")


 number of partitions: 1


### Question 5

Compute the min and max of customer.create_date and print the result (once more using the Spark DataFrame API and not the Spark SQL API).

In [31]:
from pyspark.sql.functions import min, max

create_date_stats = customer_df.agg(
    min("create_date").alias("min_create_date"),
    max("create_date").alias("max_create_date")
)

# show
create_date_stats.show()


+---------------+---------------+
|min_create_date|max_create_date|
+---------------+---------------+
|     2020-02-14|     2020-02-14|
+---------------+---------------+



### Question 6.1

Determine which first names occur more than once:

1. using the Spark SQL API (printing the result)

In [32]:
# use spark sql to find duplicate first names
duplicate_first_names_sql = spark.sql("""
    SELECT first_name, COUNT(*) AS name_count
    FROM customer
    GROUP BY first_name
    HAVING COUNT(*) > 1
    ORDER BY name_count DESC
""")

# show
duplicate_first_names_sql.show()


+----------+----------+
|first_name|name_count|
+----------+----------+
|     TERRY|         2|
|    WILLIE|         2|
|    MARION|         2|
|     KELLY|         2|
|    LESLIE|         2|
|     JAMIE|         2|
|     TRACY|         2|
|    JESSIE|         2|
+----------+----------+



### Question 6.2

  2. using the Spark Dataframe API (printing the result once more).

In [21]:
from pyspark.sql.functions import col

duplicate_first_names_df = customer_df.groupBy("first_name") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy(col("count").desc())

# show
duplicate_first_names_df.show()


+----------+-----+
|first_name|count|
+----------+-----+
|     TERRY|    2|
|    WILLIE|    2|
|    MARION|    2|
|     KELLY|    2|
|    LESLIE|    2|
|     JAMIE|    2|
|     TRACY|    2|
|    JESSIE|    2|
+----------+-----+



### Question 7

Port the PostgreSQL below to the PySpark DataFrame API and execute the query within Spark (not directly on PostgreSQL):

```
SELECT
   staff.first_name
   ,staff.last_name
   ,SUM(payment.amount)
 FROM payment
   INNER JOIN staff ON payment.staff_id = staff.staff_id
 WHERE payment.payment_date BETWEEN '2007-01-01' AND '2007-02-01'
 GROUP BY
   staff.last_name
   ,staff.first_name
 ORDER BY SUM(payment.amount)
 ;
```

In [None]:
# read payment table
payment_df = spark.read.jdbc(
    url=jdbcUrl,
    table="payment",
    properties=connectionProperties
)

# read staff table
staff_df = spark.read.jdbc(
    url=jdbcUrl,
    table="staff",
    properties=connectionProperties
)

# convert payment_date to timestamp
from pyspark.sql.functions import to_timestamp

payment_df = payment_df.withColumn(
    "payment_date",
    to_timestamp("payment_date")
)

# filter payment_date
filtered_payment_df = payment_df.filter(
    (col("payment_date") >= '2007-01-01') & (col("payment_date") <= '2007-02-01')
)

# join payment and staff dataframes
joined_df = filtered_payment_df.join(staff_df, on="staff_id", how="inner")

# group by staff last_name and first_name and compute sum of amount
from pyspark.sql.functions import sum as _sum

result_df = joined_df.groupBy("last_name", "first_name") \
    .agg(_sum("amount").alias("total_amount")) \
    .orderBy("total_amount")

# show result
# Note no results since there are no records in between the dates of 2007-01-01 and 2007-02-01
result_df.show()


+---------+----------+------------+
|last_name|first_name|total_amount|
+---------+----------+------------+
+---------+----------+------------+



### Question 8

Are you currently executing commands on a driver node, or a worker? Provide the code you ran to determine that.

In [33]:
executor_id = spark.sparkContext.getConf().get("spark.executor.id")
print(f"Executor ID: {executor_id}")

Executor ID: driver


 We are executing commands on the driver node. This is because Spark is running in local mode, and there are no separate worker nodes.