In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# PostgreSQL JDBC URL
POSTGRES_URL = "jdbc:postgresql://localhost:5432/test_db"
POSTGRES_PROPERTIES = {
    "user": "test",
    "password": "testpassword",
    "driver": "org.postgresql.Driver"
}

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("PySpark-Postgres") \
    .config('spark.jars',"./jars/postgresql-42.7.5.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [3]:
# Read data from PostgreSQL
df = spark.read \
    .format("jdbc") \
    .option("url", POSTGRES_URL) \
    .option("dbtable", "company.employees") \
    .option("user", POSTGRES_PROPERTIES["user"]) \
    .option("password", POSTGRES_PROPERTIES["password"]) \
    .option("driver", POSTGRES_PROPERTIES["driver"]) \
    .load()

print("Original Data:")
df.show()


Original Data:
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 25|
|  3|Charlie| 35|
+---+-------+---+



In [4]:
# Modify Data (Add a new column)
df_transformed = df.withColumn("bonus", col("age") * lit(100))

print("Modified Data:")
df_transformed.show()

Modified Data:
+---+-------+---+-----+
| id|   name|age|bonus|
+---+-------+---+-----+
|  1|  Alice| 30| 3000|
|  2|    Bob| 25| 2500|
|  3|Charlie| 35| 3500|
+---+-------+---+-----+



In [5]:
# Write back to PostgreSQL (New Table)
df_transformed.write \
    .format("jdbc") \
    .option("url", POSTGRES_URL) \
    .option("dbtable", "company.employees_bonus") \
    .option("user", POSTGRES_PROPERTIES["user"]) \
    .option("password", POSTGRES_PROPERTIES["password"]) \
    .option("driver", POSTGRES_PROPERTIES["driver"]) \
    .mode("overwrite") \
    .save()

print("Data successfully written to PostgreSQL!")

# Stop Spark Session
spark.stop()

Data successfully written to PostgreSQL!
