# Purpose

Explore PySpark and the JDBC connection functionality to read from operational databases.

In this notebook we will setup a PostgreSQL instance and populate it with the Pagila dataset. We will then connect to the database via a JDBC connector.

# Setup

## PostgreSQL

Firstly, let's install postgres in the this Colab instance.

In [1]:
!sudo apt install postgresql postgresql-contrib

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
postgresql is already the newest version (14+238).
postgresql-contrib is already the newest version (14+238).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [2]:
!service postgresql start

 * Starting PostgreSQL 14 database server
   ...done.


Create a user in Postgres ([stackoverflow](https://stackoverflow.com/questions/12720967/how-to-change-postgresql-user-password/12721020#12721020))


In [3]:
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'test';"

ALTER ROLE


Store you database password in an environmental variable so that we need no type it in all the time (not advisable generally).

We'll use the notebook magic `%end`

In [4]:
%env PGPASSWORD=test

env: PGPASSWORD=test


## Pagila

Now, let's populate the PostgreSQL database with the Pagila data from the tutorial.

In [5]:
!git clone https://github.com/spatialedge-ai/pagila.git

fatal: destination path 'pagila' already exists and is not an empty directory.


In [6]:
!psql -h localhost -U postgres -c "create database pagila"

ERROR:  database "pagila" already exists


In [7]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-schema.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
psql:pagila/pagila-schema.sql:29: ERROR:  type "mpaa_rating" already exists
ALTER TYPE
psql:pagila/pagila-schema.sql:39: ERROR:  type "year" already exists
ALTER DOMAIN
psql:pagila/pagila-schema.sql:56: ERROR:  function "_group_concat" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:73: ERROR:  function "film_in_stock" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:90: ERROR:  function "film_not_in_stock" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:135: ERROR:  function "get_customer_balance" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:157: ERROR:  function "inventory_held_by_customer" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:194: ERROR:  function "inventory_in_stock" already exists with same argument types
ALTER FUN

In [8]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-data.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
psql:pagila/pagila-data.sql:224: ERROR:  duplicate key value violates unique constraint "actor_pkey"
DETAIL:  Key (actor_id)=(1) already exists.
CONTEXT:  COPY actor, line 1
psql:pagila/pagila-data.sql:341: ERROR:  duplicate key value violates unique constraint "country_pkey"
DETAIL:  Key (country_id)=(1) already exists.
CONTEXT:  COPY country, line 1
psql:pagila/pagila-data.sql:949: ERROR:  duplicate key value violates unique constraint "city_pkey"
DETAIL:  Key (city_id)=(1) already exists.
CONTEXT:  COPY city, line 1
psql:pagila/pagila-data.sql:1560: ERROR:  duplicate key value violates unique constraint "address_pkey"
DETAIL:  Key (address_id)=(1) already exists.
CONTEXT:  COPY address, line 1
psql:pagila/pagila-data.sql:1584: ERROR:  duplicate key value violates unique constraint "category_pkey"
DETAIL:  Key (category_id)=(1) already exists.
CONTEXT:  COPY category, line 1
psql:pagila/pagila-data.sql:1594: ERR

## PySpark Setup

Now, let's download what is necessary for initiating jdbc connections, as well as what is required to run PySpark itself.

In [9]:
# https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

--2024-11-03 17:08:09--  https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046274 (1022K) [application/java-archive]
Saving to: ‘postgresql-42.5.0.jar.1’


2024-11-03 17:08:10 (6.47 MB/s) - ‘postgresql-42.5.0.jar.1’ saved [1046274/1046274]



In [10]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np

%config Completer.use_jedi = False

SPARKVERSION='3.2.1'
HADOOPVERSION='3.2'
pwd=os.getcwd()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"{pwd}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}"

# print(os.environ['SPARK_HOME'])


In [11]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-{SPARKVERSION}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz
!tar xf spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz

--2024-11-03 17:08:17--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300971569 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop3.2.tgz.1’


2024-11-03 17:08:32 (19.3 MB/s) - ‘spark-3.2.1-bin-hadoop3.2.tgz.1’ saved [300971569/300971569]



In [12]:
!cp postgresql-42.5.0.jar spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars

In [13]:
!pip install findspark



In [14]:
import findspark
findspark.init()
findspark.find()

# get a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars",
                                                       "postgresql-42.2.5.jar").config(
                                                          "spark.driver.extraClassPath",
                                                          f"spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars"
                                                       ).getOrCreate()
print(spark.conf.get('spark.jars'))

%env PYARROW_IGNORE_TIMEZONE=1

postgresql-42.2.5.jar
env: PYARROW_IGNORE_TIMEZONE=1


# Questions

### Question 1

Using a PySpark dataframe, print the schema of customer table in the pagila PostgreSQL database by utilising a JDBC connection.

In [15]:
# pyspark code
# Question 1

# Initialize SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession

jdbc_jar_path = "/content/postgresql-42.5.0.jar"

spark = SparkSession.builder \
    .appName("PostgreSQL Example") \
    .config("spark.jars", jdbc_jar_path) \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://localhost:5432/pagila"
connection_properties = {
    "user": "postgres",
    "password": "test",
    "driver": "org.postgresql.Driver"
}

customer_df = spark.read.jdbc(
    url=jdbc_url,
    table="customer",
    properties=connection_properties
)

customer_df.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)



### Question 2

Use the Spark SQL API to query the customer table, compute the number of unique email addresses in that table and print the result in the notebook.

In [16]:
# pyspark code
# Question 2

# Create a temporary view of the 'customer' table
customer_df.createOrReplaceTempView("customer")

# Use Spark SQL to compute the number of unique email addresses
unique_emails_sql = spark.sql("""
    SELECT COUNT(DISTINCT email) AS unique_emails
    FROM customer
""")

# Display the result
unique_emails_sql.show()


+-------------+
|unique_emails|
+-------------+
|          599|
+-------------+



### Question 3

Repeat this calculation using only the Dataframe API and print the result.

In [17]:
# pyspark code
# Question 3

# Use DataFrame API to compute the number of unique email addresses
unique_emails_count = customer_df.select("email").distinct().count()

# Print the result
print(f"Number of unique email addresses: {unique_emails_count}")


Number of unique email addresses: 599


### Question 4

How many partitions are present in the dataframe resulting from Question 3 (additionally provide the code necessary to determine that)

In [18]:
# Question 4

# Get the number of partitions in the DataFrame
num_partitions = customer_df.select("email").distinct().rdd.getNumPartitions()

# Print the number of partitions
print(f"Number of partitions: {num_partitions}")


Number of partitions: 1


### Question 5

Compute the min and max of customer.create_date and print the result (once more using the Spark DataFrame API and not the Spark SQL API).

In [19]:
# Question 5

from pyspark.sql.functions import min, max

# Compute the min and max of 'create_date'
min_max_dates = customer_df.agg(
    min("create_date").alias("min_create_date"),
    max("create_date").alias("max_create_date")
)

# Display the result
min_max_dates.show()


+---------------+---------------+
|min_create_date|max_create_date|
+---------------+---------------+
|     2020-02-14|     2020-02-14|
+---------------+---------------+



### Question 6.1

Determine which first names occur more than once:

1. using the Spark SQL API (printing the result)

In [20]:
# Question 6.1

duplicate_first_names_sql = spark.sql("""
    SELECT first_name, COUNT(*) as count
    FROM customer
    GROUP BY first_name
    HAVING COUNT(*) > 1
    ORDER BY count DESC
""")

duplicate_first_names_sql.show()


+----------+-----+
|first_name|count|
+----------+-----+
|     TERRY|    2|
|    WILLIE|    2|
|    MARION|    2|
|     KELLY|    2|
|    LESLIE|    2|
|     JAMIE|    2|
|     TRACY|    2|
|    JESSIE|    2|
+----------+-----+



### Question 6.2

  2. using the Spark Dataframe API (printing the result once more).

In [21]:
# Question 6.2

from pyspark.sql.functions import count

duplicate_first_names_df = customer_df.groupBy("first_name") \
    .agg(count("*").alias("count")) \
    .filter("count > 1") \
    .orderBy("count", ascending=False)

duplicate_first_names_df.show()


+----------+-----+
|first_name|count|
+----------+-----+
|     TERRY|    2|
|    WILLIE|    2|
|    MARION|    2|
|     KELLY|    2|
|    LESLIE|    2|
|     JAMIE|    2|
|     TRACY|    2|
|    JESSIE|    2|
+----------+-----+



### Question 7

Port the PostgreSQL below to the PySpark DataFrame API and execute the query within Spark (not directly on PostgreSQL):

```
SELECT
   staff.first_name
   ,staff.last_name
   ,SUM(payment.amount)
 FROM payment
   INNER JOIN staff ON payment.staff_id = staff.staff_id
 WHERE payment.payment_date BETWEEN '2007-01-01' AND '2007-02-01'
 GROUP BY
   staff.last_name
   ,staff.first_name
 ORDER BY SUM(payment.amount)
 ;
```

In [22]:
# Question 7

from pyspark.sql.functions import sum as _sum, to_timestamp

# Read the 'payment' and 'staff' tables into DataFrames
payment_df = spark.read.jdbc(url=jdbc_url, table="payment", properties=connection_properties)
staff_df = spark.read.jdbc(url=jdbc_url, table="staff", properties=connection_properties)

# Perform an inner join on 'staff_id'
joined_df = payment_df.join(staff_df, on="staff_id", how="inner")

# Convert 'payment_date' to timestamp
joined_df = joined_df.withColumn("payment_date", to_timestamp("payment_date"))

# Check the min and max payment dates
joined_df.selectExpr("MIN(payment_date) AS min_date", "MAX(payment_date) AS max_date").show()

# Adjust the date range based on your data
# For example, filter between '2020-02-01' and '2020-03-01'
filtered_df = joined_df.filter(
    (joined_df.payment_date >= "2020-02-01") &
    (joined_df.payment_date <= "2020-03-01")
)

# Group by 'first_name' and 'last_name' and compute the sum of 'amount'
aggregated_df = filtered_df.groupBy(
    "first_name",
    "last_name"
).agg(
    _sum("amount").alias("total_amount")
)

# Order the results by 'total_amount'
result_df = aggregated_df.orderBy("total_amount")

# Display the result
result_df.show()



+--------------------+--------------------+
|            min_date|            max_date|
+--------------------+--------------------+
|2020-01-24 21:21:...|2020-05-14 12:44:...|
+--------------------+--------------------+

+----------+---------+------------+
|first_name|last_name|total_amount|
+----------+---------+------------+
|      Mike|  Hillyer|     9552.72|
|       Jon| Stephens|     9711.04|
+----------+---------+------------+



### Question 8

Are you currently executing commands on a driver node, or a worker? Provide the code you ran to determine that.

In [23]:
# Question 8

master_url = spark.sparkContext.master

print(f"The SparkContext master URL is: {master_url}")

if "local" in master_url:
    print("I am currently executing commands on the driver node.")
else:
    print("I am executing commands on a worker node.")


The SparkContext master URL is: local[*]
I am currently executing commands on the driver node.
