<a href="https://colab.research.google.com/github/18708064/Post-block-3/blob/master/pyspark_postgres_18708064.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Purpose

Explore PySpark and the JDBC connection functionality to read from operational databases.

In this notebook we will setup a PostgreSQL instance and populate it with the Pagila dataset. We will then connect to the database via a JDBC connector.

# Setup

## PostgreSQL

Firstly, let's install postgres in the this Colab instance.

In [None]:
!sudo apt install postgresql postgresql-contrib

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
postgresql is already the newest version (14+238).
postgresql-contrib is already the newest version (14+238).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [None]:
!service postgresql start

 * Starting PostgreSQL 14 database server
   ...done.


Create a user in Postgres ([stackoverflow](https://stackoverflow.com/questions/12720967/how-to-change-postgresql-user-password/12721020#12721020))


In [None]:
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'test';"

ALTER ROLE


Store you database password in an environmental variable so that we need no type it in all the time (not advisable generally).

We'll use the notebook magic `%end`

In [None]:
%env PGPASSWORD=test

env: PGPASSWORD=test


## Pagila

Now, let's populate the PostgreSQL database with the Pagila data from the tutorial.

In [None]:
!git clone https://github.com/spatialedge-ai/pagila.git

fatal: destination path 'pagila' already exists and is not an empty directory.


In [None]:
!psql -h localhost -U postgres -c "create database pagila"

ERROR:  database "pagila" already exists


In [None]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-schema.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
psql:pagila/pagila-schema.sql:29: ERROR:  type "mpaa_rating" already exists
ALTER TYPE
psql:pagila/pagila-schema.sql:39: ERROR:  type "year" already exists
ALTER DOMAIN
psql:pagila/pagila-schema.sql:56: ERROR:  function "_group_concat" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:73: ERROR:  function "film_in_stock" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:90: ERROR:  function "film_not_in_stock" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:135: ERROR:  function "get_customer_balance" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:157: ERROR:  function "inventory_held_by_customer" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:194: ERROR:  function "inventory_in_stock" already exists with same argument types
ALTER FUN

In [None]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-data.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
psql:pagila/pagila-data.sql:224: ERROR:  duplicate key value violates unique constraint "actor_pkey"
DETAIL:  Key (actor_id)=(1) already exists.
CONTEXT:  COPY actor, line 1
psql:pagila/pagila-data.sql:341: ERROR:  duplicate key value violates unique constraint "country_pkey"
DETAIL:  Key (country_id)=(1) already exists.
CONTEXT:  COPY country, line 1
psql:pagila/pagila-data.sql:949: ERROR:  duplicate key value violates unique constraint "city_pkey"
DETAIL:  Key (city_id)=(1) already exists.
CONTEXT:  COPY city, line 1
psql:pagila/pagila-data.sql:1560: ERROR:  duplicate key value violates unique constraint "address_pkey"
DETAIL:  Key (address_id)=(1) already exists.
CONTEXT:  COPY address, line 1
psql:pagila/pagila-data.sql:1584: ERROR:  duplicate key value violates unique constraint "category_pkey"
DETAIL:  Key (category_id)=(1) already exists.
CONTEXT:  COPY category, line 1
psql:pagila/pagila-data.sql:1594: ERR

## PySpark Setup

Now, let's download what is necessary for initiating jdbc connections, as well as what is required to run PySpark itself.

In [None]:
# https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

--2024-10-27 19:00:22--  https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046274 (1022K) [application/java-archive]
Saving to: ‘postgresql-42.5.0.jar.1’


2024-10-27 19:00:24 (1.10 MB/s) - ‘postgresql-42.5.0.jar.1’ saved [1046274/1046274]



In [None]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np

%config Completer.use_jedi = False

SPARKVERSION='3.2.1'
HADOOPVERSION='3.2'
pwd=os.getcwd()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"{pwd}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}"

# print(os.environ['SPARK_HOME'])


In [None]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-{SPARKVERSION}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz
!tar xf spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz

--2024-10-27 19:00:50--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300971569 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop3.2.tgz.1’


2024-10-27 19:01:14 (12.3 MB/s) - ‘spark-3.2.1-bin-hadoop3.2.tgz.1’ saved [300971569/300971569]



In [None]:
!cp postgresql-42.5.0.jar spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars

In [None]:
!pip install findspark



In [None]:
import findspark
findspark.init()
findspark.find()

# get a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars",
                                                       "postgresql-42.2.5.jar").config(
                                                          "spark.driver.extraClassPath",
                                                          f"spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars"
                                                       ).getOrCreate()
print(spark.conf.get('spark.jars'))

%env PYARROW_IGNORE_TIMEZONE=1

postgresql-42.2.5.jar
env: PYARROW_IGNORE_TIMEZONE=1


# Questions

### Question 1

Using a PySpark dataframe, print the schema of customer table in the pagila PostgreSQL database by utilising a JDBC connection.

In [None]:
# Install PostgreSQL
!apt-get install postgresql postgresql-contrib -y

# Start the PostgreSQL service
!service postgresql start


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
postgresql is already the newest version (14+238).
postgresql-contrib is already the newest version (14+238).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.
 * Starting PostgreSQL 14 database server
   ...done.


In [None]:
# Set up the user with a password
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'test';"


ALTER ROLE


In [None]:
# Create a new database named 'pagila'
!sudo -u postgres createdb pagila


createdb: error: database creation failed: ERROR:  database "pagila" already exists


In [None]:
!wget -P /content/ https://jdbc.postgresql.org/download/postgresql-42.2.20.jar


--2024-10-27 19:21:36--  https://jdbc.postgresql.org/download/postgresql-42.2.20.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1005347 (982K) [application/java-archive]
Saving to: ‘/content/postgresql-42.2.20.jar’


2024-10-27 19:21:38 (1.07 MB/s) - ‘/content/postgresql-42.2.20.jar’ saved [1005347/1005347]



In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("PostgreSQL Connection") \
    .config("spark.jars", "/content/postgresql-42.2.20.jar") \
    .getOrCreate()

# Connection details
jdbc_url = "jdbc:postgresql://localhost:5432/pagila"  # Replace 'pagila' with your database name if different
properties = {
    "user": "postgres",  # Replace 'postgres' with your username if different
    "password": "test",  # Use the password set with ALTER USER
    "driver": "org.postgresql.Driver"
}

# Load a table as a DataFrame
customer_df = spark.read.jdbc(url=jdbc_url, table="customer", properties=properties)

# Print schema to confirm connection
customer_df.printSchema()



root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)



### Question 2

Use the Spark SQL API to query the customer table, compute the number of unique email addresses in that table and print the result in the notebook.

In [None]:
# Register the DataFrame as a SQL temporary view
customer_df.createOrReplaceTempView("customer")

# Count unique emails using Spark SQL
unique_emails_sql = spark.sql("SELECT COUNT(DISTINCT email) AS unique_emails FROM customer")
unique_emails_sql.show()

+-------------+
|unique_emails|
+-------------+
|          599|
+-------------+



### Question 3

Repeat this calculation using only the Dataframe API and print the result.

In [None]:
# Count unique emails using only the DataFrame API
unique_email_count = customer_df.select("email").distinct().count()
print(f"Unique Email Count: {unique_email_count}")

Unique Email Count: 599


### Question 4

How many partitions are present in the dataframe resulting from Question 3 (additionally provide the code necessary to determine that)

In [None]:
# Determine the number of partitions in the resulting DataFrame
num_partitions = customer_df.rdd.getNumPartitions()
print(f"Number of Partitions: {num_partitions}")


Number of Partitions: 1


### Question 5

Compute the min and max of customer.create_date and print the result (once more using the Spark DataFrame API and not the Spark SQL API).

In [None]:
# Compute min and max of create_date using the DataFrame API
date_stats = customer_df.selectExpr("min(create_date) as min_date", "max(create_date) as max_date")
date_stats.show()


+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2020-02-14|2020-02-14|
+----------+----------+



### Question 6.1

Determine which first names occur more than once:

1. using the Spark SQL API (printing the result)

In [None]:
# Register the DataFrame as a SQL temporary view if not done already
customer_df.createOrReplaceTempView("customer")

# Query to find first names that appear more than once
duplicate_first_names_sql = spark.sql("""
    SELECT first_name, COUNT(*) AS name_count
    FROM customer
    GROUP BY first_name
    HAVING name_count > 1
""")

# Show the results
duplicate_first_names_sql.show()


+----------+----------+
|first_name|name_count|
+----------+----------+
|     TERRY|         2|
|    WILLIE|         2|
|    MARION|         2|
|     KELLY|         2|
|    LESLIE|         2|
|     JAMIE|         2|
|     TRACY|         2|
|    JESSIE|         2|
+----------+----------+



### Question 6.2

  2. using the Spark Dataframe API (printing the result once more).

In [None]:
# Use DataFrame API to find duplicate first names
duplicate_first_names_df = customer_df.groupBy("first_name").count().filter("count > 1")

# Show the results
duplicate_first_names_df.show()


+----------+-----+
|first_name|count|
+----------+-----+
|     TERRY|    2|
|    WILLIE|    2|
|    MARION|    2|
|     KELLY|    2|
|    LESLIE|    2|
|     JAMIE|    2|
|     TRACY|    2|
|    JESSIE|    2|
+----------+-----+



### Question 7

Port the PostgreSQL below to the PySpark DataFrame API and execute the query within Spark (not directly on PostgreSQL):

```
SELECT
   staff.first_name
   ,staff.last_name
   ,SUM(payment.amount)
 FROM payment
   INNER JOIN staff ON payment.staff_id = staff.staff_id
 WHERE payment.payment_date BETWEEN '2007-01-01' AND '2007-02-01'
 GROUP BY
   staff.last_name
   ,staff.first_name
 ORDER BY SUM(payment.amount)
 ;
```

In [None]:
# Load the payment and staff tables if not done already
payment_df = spark.read.jdbc(url=jdbc_url, table="payment", properties=properties)
staff_df = spark.read.jdbc(url=jdbc_url, table="staff", properties=properties)

# Perform the join and filter, then aggregate
result_df = payment_df.join(staff_df, payment_df.staff_id == staff_df.staff_id) \
    .filter((payment_df.payment_date >= '2007-01-01') & (payment_df.payment_date <= '2007-02-01')) \
    .groupBy(staff_df.first_name, staff_df.last_name) \
    .agg({"amount": "sum"}) \
    .withColumnRenamed("sum(amount)", "total_payment") \
    .orderBy("total_payment")

# Show the result
result_df.show()


+----------+---------+-------------+
|first_name|last_name|total_payment|
+----------+---------+-------------+
+----------+---------+-------------+



### Question 8

Are you currently executing commands on a driver node, or a worker? Provide the code you ran to determine that.

In [None]:
import socket

# Print the hostname to determine if it's a driver or worker node
print(f"Running on node: {socket.gethostname()}")


Running on node: dd36c26f82f0
