# Connecting PG RDS 

### Libs

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, rand, round, current_date
from pyspark.sql.types import IntegerType, DoubleType, StringType
import random
import adlfs
import pandas
from dotenv import load_dotenv
import os

In [4]:
load_dotenv()

AZURE_STORAGE_ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
AZURE_ACCOUNT_KEY = os.getenv("AZURE_ACCOUNT_KEY")
AZURE_CONTAINER_NAME = os.getenv("AZURE_CONTAINER_NAME")

RDS_HOST = os.getenv("RDS_HOST")
RDS_PORT = os.getenv("RDS_PORT")
RDS_DB = os.getenv("RDS_DB")
RDS_USER = os.getenv("RDS_USER")
RDS_PASSWORD = os.getenv("RDS_PASSWORD")


### Testing Connection

In [12]:
import psycopg2

conn = psycopg2.connect(
    host=RDS_HOST,
    database=RDS_DB,
    user=RDS_USER,
    password=RDS_PASSWORD,
    port=RDS_PORT
)
print("Connection successful!")
conn.close()

Connection successful!


### Synthatic data

In [12]:
# Start Spark session
spark = SparkSession.builder.appName("GenerateEcommerceData").config("spark.jars", f"{os.getcwd()}/postgresql-42.7.5.jar").getOrCreate()

# Generate base DataFrame
df = spark.range(100000).withColumnRenamed("id", "order_id")

# Add columns
df = df.withColumn("customer_id", (rand() * 10000).cast(IntegerType())) \
       .withColumn("product_id", (rand() * 1000).cast(IntegerType())) \
       .withColumn("quantity", (rand() * 10 + 1).cast(IntegerType())) \
       .withColumn("price", round(rand() * 100 + 5, 2)) \
       .withColumn("order_date", expr("current_date() - cast(rand() * 365 as int)")) \
       .withColumn("country", expr("CASE WHEN rand() < 0.5 THEN 'USA' ELSE 'UK' END"))

df.show(5)
df.count()

25/06/22 11:23:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------+-----------+----------+--------+-----+----------+-------+
|order_id|customer_id|product_id|quantity|price|order_date|country|
+--------+-----------+----------+--------+-----+----------+-------+
|       0|       2116|       262|       2|98.74|2024-12-29|    USA|
|       1|       7478|       619|       3| 5.16|2024-08-21|    USA|
|       2|       3043|        19|       7|10.14|2024-12-30|     UK|
|       3|       8162|       268|       4|76.05|2024-12-18|    USA|
|       4|       2164|       707|      10|12.82|2024-09-08|     UK|
+--------+-----------+----------+--------+-----+----------+-------+
only showing top 5 rows


100000

### Connection 

In [8]:
jdbc_url = f"jdbc:postgresql://{RDS_HOST}:{RDS_PORT}/{RDS_DB}"
properties = {
    "user": RDS_USER,
    "password": RDS_PASSWORD,
    "driver": "org.postgresql.Driver"
}

# Write synthetic data to RDS
df.write.jdbc(url=jdbc_url, table="orders", mode="overwrite", properties=properties)


                                                                                

### read from data base

In [10]:
orders_df = spark.read.jdbc(
        url=f"jdbc:postgresql://{RDS_HOST}:{RDS_PORT}/{RDS_DB}",
        table="orders",
        properties={
            "user": RDS_USER,
            "password": RDS_PASSWORD,
            "driver": "org.postgresql.Driver"
        }
)

orders_df.show(5) 
orders_df.count() 


                                                                                

+--------+-----------+----------+--------+-----+----------+-------+
|order_id|customer_id|product_id|quantity|price|order_date|country|
+--------+-----------+----------+--------+-----+----------+-------+
|   41666|       5354|       490|       8|28.36|2024-11-30|     UK|
|   41667|       7446|       505|       3| 31.9|2025-05-15|     UK|
|   41668|       8081|       630|       7|52.06|2024-07-24|    USA|
|   41669|       7174|       890|       8|71.57|2025-04-23|    USA|
|   41670|       4521|       119|       7|17.21|2024-09-16|     UK|
+--------+-----------+----------+--------+-----+----------+-------+
only showing top 5 rows


                                                                                

100000

### Connecting with adfs or data lake 

In [11]:
storage_account_name = AZURE_STORAGE_ACCOUNT_NAME 
container_name = AZURE_CONTAINER_NAME
account_key = AZURE_ACCOUNT_KEY
fs = adlfs.AzureBlobFileSystem(account_name=storage_account_name, account_key=account_key)

test_output_path = "/tmp/test_output.csv"
orders_df.limit(10).toPandas().to_csv(test_output_path, index=False)

remote_path = f"{container_name}/test_output.csv"
fs.put(test_output_path, remote_path)
print(f"Uploaded {test_output_path} to {remote_path}")

if fs.exists(remote_path):
    print(f"Verified: {remote_path} exists in Azure storage.")

fs.rm(remote_path)
print(f"Deleted {remote_path} from Azure storage.")

                                                                                

Uploaded /tmp/test_output.csv to output/test_output.csv
Verified: output/test_output.csv exists in Azure storage.
Deleted output/test_output.csv from Azure storage.
