<a href="https://colab.research.google.com/github/SlyFox579/bdt_postblocks/blob/main/Copy_of_pyspark_postgres_template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Purpose

Explore PySpark and the JDBC connection functionality to read from operational databases.

In this notebook we will setup a PostgreSQL instance and populate it with the Pagila dataset. We will then connect to the database via a JDBC connector.

# Setup

## PostgreSQL

Firstly, let's install postgres in the this Colab instance.

In [None]:
!sudo apt install postgresql postgresql-contrib 

In [None]:
!service postgresql start

 * Starting PostgreSQL 10 database server
   ...done.


Create a user in Postgres ([stackoverflow](https://stackoverflow.com/questions/12720967/how-to-change-postgresql-user-password/12721020#12721020))


In [None]:
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'test';"

ALTER ROLE


Store you database password in an environmental variable so that we need no type it in all the time (not advisable generally).

We'll use the notebook magic `%end`

In [None]:
%env PGPASSWORD=test

env: PGPASSWORD=test


## Pagila

Now, let's populate the PostgreSQL database with the Pagila data from the tutorial.

In [None]:
!git clone https://github.com/spatialedge-ai/pagila.git

Cloning into 'pagila'...
remote: Enumerating objects: 94, done.[K
remote: Counting objects: 100% (94/94), done.[K
remote: Compressing objects: 100% (50/50), done.[K
remote: Total 94 (delta 46), reused 87 (delta 42), pack-reused 0[K
Unpacking objects: 100% (94/94), done.


In [None]:
!psql -h localhost -U postgres -c "create database pagila"

CREATE DATABASE


In [None]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-schema.sql"

In [None]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-data.sql"

## PySpark Setup

Now, let's download what is necessary for initiating jdbc connections, as well as what is required to run PySpark itself.

In [None]:
# https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

--2022-09-25 20:15:18--  https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046274 (1022K) [application/java-archive]
Saving to: ‘postgresql-42.5.0.jar’


2022-09-25 20:15:18 (5.11 MB/s) - ‘postgresql-42.5.0.jar’ saved [1046274/1046274]



In [None]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np  

%config Completer.use_jedi = False


SPARKVERSION='2.4.8'
HADOOPVERSION='2.7'
pwd=os.getcwd()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"{pwd}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}"

print(os.environ['SPARK_HOME'])


/content/spark-2.4.8-bin-hadoop2.7


In [None]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-{SPARKVERSION}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz
!tar xf spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 2.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 
--2022-09-25 20:15:48--  https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 235899716 (225M) [application/x-gzip]
Saving to: ‘spark-2.4.8-bin-hadoop2.7.tgz’


2022-09-25 20:16:01 (18.2 MB/s) - ‘spark-2.4.8-bin-hadoop2.7.tgz’ saved [235899716/235899716]



In [None]:
!cp postgresql-42.5.0.jar spark-2.4.8-bin-hadoop2.7/jars

In [None]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

# get a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars", 
                                                       "postgresql-42.2.5.jar").config(
                                                          "spark.driver.extraClassPath",
                                                          "spark-2.4.8-bin-hadoop2.7/jars"
                                                       ).getOrCreate()
print(spark.conf.get('spark.jars'))

%env PYARROW_IGNORE_TIMEZONE=1

postgresql-42.2.5.jar
env: PYARROW_IGNORE_TIMEZONE=1


# Questions

### Question 1

Using a PySpark dataframe, print the schema of customer table in the pagila PostgreSQL database by utilising a JDBC connection.

In [None]:
# pyspark code

from pyspark.sql import SparkSession

spark = SparkSession.builder \
       .appName("Python Spark SQL basic example") \
       .config("spark.jars", "/content/postgresql-42.5.0.jar") \
       .getOrCreate()

iris_df = spark.read \
          .format("jdbc") \
          .option("url", "jdbc:postgresql://localhost:5432/pagila") \
    	  .option("dbtable", "customer") \
    	  .option("user", "postgres") \
    	  .option("password", "test") \
    	  .option("driver", "org.postgresql.Driver") \
    	  .load()

iris_df.show(10)

+-----------+--------+----------+---------+--------------------+----------+----------+-----------+-------------------+------+
|customer_id|store_id|first_name|last_name|               email|address_id|activebool|create_date|        last_update|active|
+-----------+--------+----------+---------+--------------------+----------+----------+-----------+-------------------+------+
|          1|       1|      MARY|    SMITH|MARY.SMITH@sakila...|         5|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          2|       1|  PATRICIA|  JOHNSON|PATRICIA.JOHNSON@...|         6|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          3|       1|     LINDA| WILLIAMS|LINDA.WILLIAMS@sa...|         7|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          4|       2|   BARBARA|    JONES|BARBARA.JONES@sak...|         8|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          5|       1| ELIZABETH|    BROWN|ELIZABETH.BROWN@s...|         9|      true| 2020-02-14|2020-02-15 09:57:20|

### Question 2

Use the Spark SQL API to query the customer table, compute the number of unique email addresses in that table and print the result in the notebook.

In [None]:
# pyspark code
iris_df.registerTempTable("customer")
select = spark.sql("""SELECT * FROM customer""")
select.show(20)

+-----------+--------+----------+---------+--------------------+----------+----------+-----------+-------------------+------+
|customer_id|store_id|first_name|last_name|               email|address_id|activebool|create_date|        last_update|active|
+-----------+--------+----------+---------+--------------------+----------+----------+-----------+-------------------+------+
|          1|       1|      MARY|    SMITH|MARY.SMITH@sakila...|         5|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          2|       1|  PATRICIA|  JOHNSON|PATRICIA.JOHNSON@...|         6|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          3|       1|     LINDA| WILLIAMS|LINDA.WILLIAMS@sa...|         7|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          4|       2|   BARBARA|    JONES|BARBARA.JONES@sak...|         8|      true| 2020-02-14|2020-02-15 09:57:20|     1|
|          5|       1| ELIZABETH|    BROWN|ELIZABETH.BROWN@s...|         9|      true| 2020-02-14|2020-02-15 09:57:20|

In [None]:
count = spark.sql("""
      SELECT DISTINCT email, COUNT(*) as freq
      FROM customer
      GROUP BY email
                           """)

count.registerTempTable("count")

sum = spark.sql("""
      SELECT SUM(freq) as no_of_unique_emails FROM count
                           """)

sum.show()

+-------------------+
|no_of_unique_emails|
+-------------------+
|                599|
+-------------------+



### Question 3 

Repeat this calculation using only the Dataframe API and print the result.

In [None]:
# pyspark code

from pyspark.sql import functions as f

count = iris_df.groupBy('email').count()

sum = count.groupBy('count').agg(f.sum('count').alias('no_of_unique_emails'))

sum.select('no_of_unique_emails').show()

+-------------------+
|no_of_unique_emails|
+-------------------+
|                599|
+-------------------+



### Question 4 

How many partitions are present in the dataframe resulting from Question 3 (additionally provide the code necessary to determine that)

### Question 5

Compute the min and max of customer.create_date and print the result (once more using the Spark DataFrame API and not the Spark SQL API).

In [None]:
#maximum create date
iris_df.select('create_date').orderBy('create_date', ascending=False).show(1)

#minimum create date
iris_df.select('create_date').orderBy('create_date', ascending=True).show(1)

### Question 6.1

Determine which first names occur more than once:

1. using the Spark SQL API (printing the result)

In [None]:
count = spark.sql("""
      SELECT first_name, COUNT(*) as freq
      FROM customer
      GROUP BY first_name
                           """)

count.registerTempTable("count")

count2 = spark.sql("""
      SELECT first_name, freq
      FROM count
      WHERE freq > 1
                           """)

count2.show()


+----------+----+
|first_name|freq|
+----------+----+
|     TERRY|   2|
|    WILLIE|   2|
|    MARION|   2|
|     KELLY|   2|
|    LESLIE|   2|
|     JAMIE|   2|
|     TRACY|   2|
|    JESSIE|   2|
+----------+----+



### Question 6.2

  2. using the Spark Dataframe API (printing the result once more).

In [None]:
count = iris_df.groupBy('first_name').count().orderBy('count', ascending=False).show()

### Question 7

Port the PostgreSQL below to the PySpark DataFrame API and execute the query within Spark (not directly on PostgreSQL): 

```
SELECT
   staff.first_name
   ,staff.last_name
   ,SUM(payment.amount)
 FROM payment
   INNER JOIN staff ON payment.staff_id = staff.staff_id
 WHERE payment.payment_date BETWEEN '2007-01-01' AND '2007-02-01'
 GROUP BY
   staff.last_name
   ,staff.first_name
 ORDER BY SUM(payment.amount)
 ;
```

### Question 8

Are you currently executing commands on a driver node, or a worker? Provide the code you ran to determine that.