In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dotenv import load_dotenv
import datetime

In [2]:
load_dotenv()
jdbc_driver_path = "postgresql-42.7.4.jar"

In [3]:
spark = SparkSession.builder \
    .appName('Solution') \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

In [4]:
jdbcHostname = os.getenv("HOST")
jdbcDatabase = os.getenv("DB_NAME")
jdbcUsername = os.getenv("USER")
jdbcPassword = os.getenv("PASSWORD")
jdbcPort = 5432 
jdbcDriver = "org.postgresql.Driver"

connProperties = {
  "user": jdbcUsername,
  "password": jdbcPassword,
  "driver": jdbcDriver
}

jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"


In [5]:
bonus_df = spark.read.jdbc(url=jdbcUrl, table="(SELECT * FROM bonus)", properties=connProperties)
title_df = spark.read.jdbc(url=jdbcUrl, table="(SELECT * FROM title)", properties=connProperties)
worker_df = spark.read.jdbc(url=jdbcUrl, table="(SELECT * FROM worker)", properties=connProperties)

In [12]:
dataframes = [("Bonus", bonus_df), ("Title", title_df), ("Worker", worker_df)]

In [16]:
for title, df in dataframes:
    print(f"{'='*10} {title} Schema {'='*10}")
    df.printSchema()
    print(f"{'='*30}\n")