<a href="https://colab.research.google.com/github/24557401/Day-3/blob/main/pyspark_postgres_template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Purpose

Explore PySpark and the JDBC connection functionality to read from operational databases.

In this notebook we will setup a PostgreSQL instance and populate it with the Pagila dataset. We will then connect to the database via a JDBC connector.

# Setup

## PostgreSQL

Firstly, let's install postgres in the this Colab instance.

In [69]:
!sudo apt install postgresql postgresql-contrib

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
postgresql is already the newest version (14+238).
postgresql-contrib is already the newest version (14+238).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [71]:
!service postgresql start

 * Starting PostgreSQL 14 database server
   ...done.


Create a user in Postgres ([stackoverflow](https://stackoverflow.com/questions/12720967/how-to-change-postgresql-user-password/12721020#12721020))


In [72]:
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'test';"

ALTER ROLE


Store you database password in an environmental variable so that we need no type it in all the time (not advisable generally).

We'll use the notebook magic `%end`

In [73]:
%env PGPASSWORD=test

env: PGPASSWORD=test


## Pagila

Now, let's populate the PostgreSQL database with the Pagila data from the tutorial.

In [74]:
!git clone https://github.com/spatialedge-ai/pagila.git

fatal: destination path 'pagila' already exists and is not an empty directory.


In [76]:
!psql -h localhost -U postgres -c "create database pagila"

ERROR:  database "pagila" already exists


In [77]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-schema.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
psql:pagila/pagila-schema.sql:29: ERROR:  type "mpaa_rating" already exists
ALTER TYPE
psql:pagila/pagila-schema.sql:39: ERROR:  type "year" already exists
ALTER DOMAIN
psql:pagila/pagila-schema.sql:56: ERROR:  function "_group_concat" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:73: ERROR:  function "film_in_stock" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:90: ERROR:  function "film_not_in_stock" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:135: ERROR:  function "get_customer_balance" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:157: ERROR:  function "inventory_held_by_customer" already exists with same argument types
ALTER FUNCTION
psql:pagila/pagila-schema.sql:194: ERROR:  function "inventory_in_stock" already exists with same argument types
ALTER FUN

In [78]:
!psql -h localhost -U postgres -d pagila -f "pagila/pagila-data.sql"

SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
psql:pagila/pagila-data.sql:224: ERROR:  duplicate key value violates unique constraint "actor_pkey"
DETAIL:  Key (actor_id)=(1) already exists.
CONTEXT:  COPY actor, line 1
psql:pagila/pagila-data.sql:341: ERROR:  duplicate key value violates unique constraint "country_pkey"
DETAIL:  Key (country_id)=(1) already exists.
CONTEXT:  COPY country, line 1
psql:pagila/pagila-data.sql:949: ERROR:  duplicate key value violates unique constraint "city_pkey"
DETAIL:  Key (city_id)=(1) already exists.
CONTEXT:  COPY city, line 1
psql:pagila/pagila-data.sql:1560: ERROR:  duplicate key value violates unique constraint "address_pkey"
DETAIL:  Key (address_id)=(1) already exists.
CONTEXT:  COPY address, line 1
psql:pagila/pagila-data.sql:1584: ERROR:  duplicate key value violates unique constraint "category_pkey"
DETAIL:  Key (category_id)=(1) already exists.
CONTEXT:  COPY category, line 1
psql:pagila/pagila-data.sql:1594: ERR

## PySpark Setup

Now, let's download what is necessary for initiating jdbc connections, as well as what is required to run PySpark itself.

In [79]:
# https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

--2024-11-03 20:20:38--  https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046274 (1022K) [application/java-archive]
Saving to: ‘postgresql-42.5.0.jar.2’


2024-11-03 20:20:38 (4.96 MB/s) - ‘postgresql-42.5.0.jar.2’ saved [1046274/1046274]



In [80]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np

%config Completer.use_jedi = False

SPARKVERSION='3.2.1'
HADOOPVERSION='3.2'
pwd=os.getcwd()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"{pwd}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}"

# print(os.environ['SPARK_HOME'])


In [81]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-{SPARKVERSION}/spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz
!tar xf spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}.tgz

--2024-11-03 20:21:03--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300971569 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop3.2.tgz.2’


2024-11-03 20:21:15 (25.4 MB/s) - ‘spark-3.2.1-bin-hadoop3.2.tgz.2’ saved [300971569/300971569]



In [82]:
!cp postgresql-42.5.0.jar spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars

In [83]:
!pip install findspark



In [90]:
import findspark
findspark.init()
findspark.find()

# get a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars", "/content/postgresql-42.5.0.jar").config(
                                                          "spark.driver.extraClassPath",
                                                          f"spark-{SPARKVERSION}-bin-hadoop{HADOOPVERSION}/jars").getOrCreate()
print(spark.conf.get('spark.jars'))

%env PYARROW_IGNORE_TIMEZONE=1

Py4JError: An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)



# Questions

### Question 1

Using a PySpark dataframe, print the schema of customer table in the pagila PostgreSQL database by utilising a JDBC connection.

In [87]:
# pyspark code
# pyspark code
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://jdbc.postgresql.org/download/postgresql-42.2.5.jar -P /content/


from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Pagila") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar").getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the customer table into a DataFrame
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)

# Print the schema of the DataFrame
customer_df.printSchema()

# Stop the Spark session
spark.stop()



spark-3.2.1-bin-hadoop3.2/
spark-3.2.1-bin-hadoop3.2/LICENSE
spark-3.2.1-bin-hadoop3.2/NOTICE
spark-3.2.1-bin-hadoop3.2/R/
spark-3.2.1-bin-hadoop3.2/R/lib/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/DESCRIPTION
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/INDEX
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/Rd.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/features.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/hsearch.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/links.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/nsInfo.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/package.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/vignette.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/NAMESPACE
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/SparkR
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/SparkR.rdb
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/SparkR.rdx
spark-3.2.1-bin-hadoop3.2/R/lib/Sp

Py4JError: An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)



### Question 2

Use the Spark SQL API to query the customer table, compute the number of unique email addresses in that table and print the result in the notebook.

In [None]:
# pyspark code
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
# !pip install -q findspark

import os
import findspark

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "postgresql-42.2.5.jar") \
    .getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the customer table into a DataFrame
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)

# Register the DataFrame as a temporary SQL table
customer_df.createOrReplaceTempView("customer")

# Execute SQL query to count unique email addresses
unique_email_count_df = spark.sql("SELECT Count(Distinct email) AS unique_email_count FROM customer")

# Collect the result and print
unique_email_count = unique_email_count_df.collect()[0]["unique_email_count"]
print(f"Number of unique email addresses: {unique_email_count}")

# Stop the Spark session
spark.stop()


Number of unique email addresses: 599


### Question 3

Repeat this calculation using only the Dataframe API and print the result.

In [None]:
# pyspark code

# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
# !pip install -q findspark

import os
import findspark

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar") \
    .getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the customer table into a DataFrame
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)

# Calculate the number of unique email addresses using DataFrame API
unique_email_count = customer_df.select("email").distinct().count()

# Print the result
print(f"Number of unique email addresses: {unique_email_count}")

# Stop the Spark session
spark.stop()


Number of unique email addresses: 599


### Question 4

How many partitions are present in the dataframe resulting from Question 3 (additionally provide the code necessary to determine that)

In [None]:
num_partitions = customer_df.rdd.getNumPartitions()
print(f"Number of partitions in the DataFrame: {num_partitions}")

Number of partitions in the DataFrame: 1


### Question 5

Compute the min and max of customer.create_date and print the result (once more using the Spark DataFrame API and not the Spark SQL API).

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
# !pip install -q findspark

import os
import findspark

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar") \
    .getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the customer table into a DataFrame
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)

# Compute the min and max of create_date using DataFrame API
min_max_dates = customer_df.agg(
    F.min("create_date").alias("min_create_date"),
    F.max("create_date").alias("max_create_date")
)

# Collect the result and print
min_create_date, max_create_date = min_max_dates.collect()[0]
print(f"Minimum create_date: {min_create_date}")
print(f"Maximum create_date: {max_create_date}")

# Stop the Spark session
spark.stop()


Minimum create_date: 2020-02-14
Maximum create_date: 2020-02-14


In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

### Question 6.1

Determine which first names occur more than once:

1. using the Spark SQL API (printing the result)

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
# !pip install -q findspark

import os
import findspark

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar") \
    .getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the customer table into a DataFrame
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)

# Register the DataFrame as a temporary SQL view
customer_df.createOrReplaceTempView("customer")

# Execute SQL query to find first names that occur more than once
duplicate_first_names_df = spark.sql("""
    SELECT first_name, COUNT(*) AS count
    FROM customer
    GROUP BY first_name
    HAVING COUNT(*) > 1
""")

# Collect and print the result
duplicate_first_names = duplicate_first_names_df.collect()
for row in duplicate_first_names:
    print(f"First Name: {row['first_name']}, Count: {row['count']}")

# Stop the Spark session
spark.stop()


First Name: TERRY, Count: 2
First Name: WILLIE, Count: 2
First Name: MARION, Count: 2
First Name: KELLY, Count: 2
First Name: LESLIE, Count: 2
First Name: JAMIE, Count: 2
First Name: TRACY, Count: 2
First Name: JESSIE, Count: 2


### Question 6.2

  2. using the Spark Dataframe API (printing the result once more).

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
# !pip install -q findspark

import os
import findspark

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar") \
    .getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the customer table into a DataFrame
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)

# Use the DataFrame API to find first names that occur more than once
duplicate_first_names_df = (customer_df
                             .groupBy("first_name")
                             .agg(F.count("*").alias("count"))
                             .filter(F.col("count") > 1))

# Collect and print the result
duplicate_first_names = duplicate_first_names_df.collect()
for row in duplicate_first_names:
    print(f"First Name: {row['first_name']}, Count: {row['count']}")

# Stop the Spark session
spark.stop()


FileNotFoundError: [Errno 2] No such file or directory: '/content/spark-3.2.1-bin-hadoop3.2/./bin/spark-submit'

### Question 7

Port the PostgreSQL below to the PySpark DataFrame API and execute the query within Spark (not directly on PostgreSQL):

```
SELECT
   staff.first_name
   ,staff.last_name
   ,SUM(payment.amount)
 FROM payment
   INNER JOIN staff ON payment.staff_id = staff.staff_id
 WHERE payment.payment_date BETWEEN '2007-01-01' AND '2007-02-01'
 GROUP BY
   staff.last_name
   ,staff.first_name
 ORDER BY SUM(payment.amount)
 ;
```

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# !tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
# !pip install -q findspark

import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar") \
    .getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the payment and staff tables into DataFrames
payment_df = spark.read.jdbc(url=url, table="payment", properties=properties)
staff_df = spark.read.jdbc(url=url, table="staff", properties=properties)

# Perform the inner join between payment and staff DataFrames
joined_df = payment_df.join(staff_df, payment_df.staff_id == staff_df.staff_id)

# Filter by payment_date and group by staff first and last names
result_df = (joined_df
             .filter((F.col("payment_date") >= '2007-01-01') &
                     (F.col("payment_date") <= '2007-02-01'))
             .groupBy(staff_df.first_name, staff_df.last_name)
             .agg(F.sum(payment_df.amount).alias("total_amount"))
             .orderBy("total_amount"))

# Collect and print the result
result = result_df.collect()
for row in result:
    print(f"First Name: {row['first_name']}, Last Name: {row['last_name']}, Total Amount: {row['total_amount']}")

# Stop the Spark session
spark.stop()





FileNotFoundError: [Errno 2] No such file or directory: '/content/spark-3.2.1-bin-hadoop3.2/./bin/spark-submit'

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
import findspark
from pyspark.sql import SparkSession

# Set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()

# Create a Spark session
spark = SparkSession.builder \
    .appName("pagila") \
    .config("spark.jars", "postgresql-42.2.5.jar").getOrCreate()

# Define the JDBC connection parameters
url = "jdbc:postgresql://localhost:5432/pagila"  # Update with your database host, port, and database name
properties = {
    "user": "postgres",  # Update with your database username
    "password": "test",  # Update with your database password
    "driver": "org.postgresql.Driver"
}

# Load the payment table into a DataFrame
payment_df = spark.read.jdbc(url=url, table="payment", properties=properties)

# Create a temporary view
payment_df.createOrReplaceTempView("payment")

# Run SQL query to select unique payment dates
sql_query = """
SELECT DISTINCT CAST(payment_date AS DATE) AS payment_date
FROM payment
ORDER BY payment_date
"""

# Execute the SQL query
unique_dates_df = spark.sql(sql_query)

# Collect and print the result
unique_dates = unique_dates_df.collect()
if unique_dates:
    for row in unique_dates:
        print(f"Unique Payment Date: {row['payment_date']}")
else:
    print("No unique payment dates found.")

# Stop the Spark session
spark.stop()



tar: spark-3.2.1-bin-hadoop3.2.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


FileNotFoundError: [Errno 2] No such file or directory: '/content/spark-3.2.1-bin-hadoop3.2/./bin/spark-submit'

### Question 8

Are you currently executing commands on a driver node, or a worker? Provide the code you ran to determine that.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Check Node Type") \
    .getOrCreate()

sc = spark.sparkContext

is_driver = sc.master.startswith("local")
if is_driver:
    print("This is the Driver node.")
else:
    print("This is a Worker node.")

spark.stop()


This is the Driver node.
