In [9]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dotenv import load_dotenv
import datetime

In [10]:
load_dotenv()
jdbc_driver_path = "D:/Data_Engineering/SQL_and_PySpark/postgresql-42.7.4.jar"

In [11]:
spark = SparkSession.builder \
    .appName('Solution') \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

In [12]:
# Define JDBC properties
jdbcHostname = os.getenv("HOST")
jdbcDatabase = os.getenv("DB_NAME")
jdbcUsername = os.getenv("USER")
jdbcPassword = os.getenv("PASSWORD")
jdbcPort = 5432 
jdbcDriver = "org.postgresql.Driver"

connProperties = {
  "user": jdbcUsername,
  "password": jdbcPassword,
  "driver": jdbcDriver
}

jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"


In [13]:
sql_work = "(SELECT * FROM work) AS work_alias"
work_df = spark.read.jdbc(url=jdbcUrl, table=sql_work, properties=connProperties)

In [14]:
sql_product_size = "(SELECT * FROM product_size) AS product_size_alias"
product_size_df = spark.read.jdbc(url=jdbcUrl, table=sql_product_size, properties=connProperties)

In [15]:
sql_subject = "(SELECT * FROM subject) AS subject_alias"
subject_df = spark.read.jdbc(url=jdbcUrl, table=sql_subject, properties=connProperties)

In [16]:
sql_artist = "(SELECT * FROM artist) AS artist_alias"
artist_df = spark.read.jdbc(url=jdbcUrl, table=sql_artist, properties=connProperties)

In [17]:
sql_canvas_size = "(SELECT * FROM canvas_size) AS canvas_size_alias"
canvas_size_df = spark.read.jdbc(url=jdbcUrl, table=sql_canvas_size, properties=connProperties)

In [18]:
sql_image_link = "(SELECT * FROM image_link) AS image_link_alias"
image_link_df = spark.read.jdbc(url=jdbcUrl, table=sql_image_link, properties=connProperties)

In [19]:
sql_museum_hours = "(SELECT * FROM museum_hours) AS museum_hours_alias"
museum_hours_df = spark.read.jdbc(url=jdbcUrl, table=sql_museum_hours, properties=connProperties)

In [20]:
sql_museum = "(SELECT * FROM museum) AS museum_alias"
museum_df = spark.read.jdbc(url=jdbcUrl, table=sql_museum, properties=connProperties)

### 1.  Fetch all the paintings which are not displayed on any museums?

In [24]:
not_displayed_paintings = work_df.filter(work_df.museum_id.isNull())
print(not_displayed_paintings.count())
not_displayed_paintings.show()

10223
+-------+--------------------+---------+-------+---------+
|work_id|                name|artist_id|  style|museum_id|
+-------+--------------------+---------+-------+---------+
| 125752|Arabian Horses at...|      757|Baroque|     NULL|
| 125818|Count Halm on His...|      757|Baroque|     NULL|
| 125763|Napoleon Before t...|      757|Baroque|     NULL|
| 125774|Peasants Resting ...|      757|Baroque|     NULL|
| 125785|Portrait Oberleut...|      757|Baroque|     NULL|
| 125796|The Rescue of Cou...|      757|Baroque|     NULL|
| 125807|     The Stable Yard|      757|Baroque|     NULL|
|  24532|Jacob A. Stamler ...|      563|   NULL|     NULL|
| 124470| Kaleda off Le Havre|      563|   NULL|     NULL|
| 124479|R. Bell &amp; Co....|      563|   NULL|     NULL|
| 124488|Steam Sailing Shi...|      563|   NULL|     NULL|
| 124497|The American Ship...|      563|   NULL|     NULL|
| 124506|The Atalanta Runn...|      563|   NULL|     NULL|
| 124515|The Auxiliary Ste...|      563|   NULL|  

### 2.  Are there museums without any paintings?