In [4]:
print(spark.sparkContext.master)

local[*]


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Load Data to PostgreSQL") \
    .config("spark.jars", "/home/jovyan/drivers/postgresql-42.7.3.jar") \
    .getOrCreate()


In [37]:
# PostgreSQL connection properties
url = "jdbc:postgresql://postgres_warehouse:5432/warehouse"

properties = {
    "user": "warehouse",  
    "password": "warehouse",  
    "driver": "org.postgresql.Driver"
}


In [38]:
data = [("John", 30), ("Jane", 25), ("Sam", 40)]
columns = ["name", "age"]

df = spark.createDataFrame(data, columns)

### Test Connection

In [42]:
# Write DataFrame to PostgreSQL
try:
    df.write.jdbc(url=url, table="test_table", mode="overwrite", properties=properties)
    print("Table 'test_table' created and data inserted.")
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")

Table 'test_table' created and data inserted.


In [65]:
df = spark.read.csv("/home/jovyan/data/Social_Valid.csv",header=True , inferSchema=True)  

In [66]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- username: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- date_created: timestamp (nullable = true)
 |-- post_id: string (nullable = true)
 |-- post_text: string (nullable = true)
 |-- location: string (nullable = true)
 |-- post_timestamp: timestamp (nullable = true)
 |-- shares: integer (nullable = true)
 |-- angry: integer (nullable = true)
 |-- haha: integer (nullable = true)
 |-- like: integer (nullable = true)
 |-- love: integer (nullable = true)
 |-- sad: integer (nullable = true)
 |-- wow: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- comment_timestamp: timestamp (nullable = true)
 |-- comment_user_id: string (nullable = true)



In [67]:
#from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumnRenamed("like", "like_count")

In [68]:
# Add surrogate keys to dim_users
dim_users_df = df.select("user_id", "username", "age", "email", "gender", "name", "date_created").dropDuplicates()


In [69]:
# Add surrogate keys to dim_posts
dim_posts_df = df.select("post_id", "post_text", "location", "post_timestamp", "tags").dropDuplicates()


In [74]:

# Write the DataFrame to the PostgreSQL table 'dim_users'
dim_users_df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dim_users") \
    .option("user", "warehouse") \
    .option("password", "warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

In [75]:
dim_posts_df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dim_posts") \
    .option("user", "warehouse") \
    .option("password", "warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

In [98]:
dim_users_with_sk = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dim_users") \
    .option("user", "warehouse") \
    .option("password", "warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [99]:
dim_posts_with_sk = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dim_posts") \
    .option("user", "warehouse") \
    .option("password", "warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [100]:
fact_post_interactions_df = df \
    .join(dim_users_with_sk.alias("main_user"), df["user_id"] == col("main_user.user_id"), "inner") \
    .join(dim_posts_with_sk.alias("post"), df["post_id"] == col("post.post_id"), "inner")

# Join again with dim_users to map `comment_user_id` to `comment_user_sk`
fact_post_interactions_df = fact_post_interactions_df \
    .join(dim_users_with_sk.alias("comment_user"), fact_post_interactions_df["comment_user_id"] == col("comment_user.user_id"), "left") \
    .select(col("post.post_sk").alias("post_sk"), 
            col("main_user.user_sk").alias("user_sk"), 
            "shares", "angry", "haha", "like_count", "love", "sad", "wow", 
            col("comment_user.user_sk").alias("comment_user_sk"), 
            "comment_text", "comment_timestamp")

In [100]:
fact_post_interactions_df = df \
    .join(dim_users_with_sk.alias("main_user"), df["user_id"] == col("main_user.user_id"), "inner") \
    .join(dim_posts_with_sk.alias("post"), df["post_id"] == col("post.post_id"), "inner")

# Join again with dim_users to map `comment_user_id` to `comment_user_sk`
fact_post_interactions_df = fact_post_interactions_df \
    .join(dim_users_with_sk.alias("comment_user"), fact_post_interactions_df["comment_user_id"] == col("comment_user.user_id"), "left") \
    .select(col("post.post_sk").alias("post_sk"), 
            col("main_user.user_sk").alias("user_sk"), 
            "shares", "angry", "haha", "like_count", "love", "sad", "wow", 
            col("comment_user.user_sk").alias("comment_user_sk"), 
            "comment_text", "comment_timestamp")

In [101]:
fact_post_interactions_df.printSchema()

root
 |-- post_sk: integer (nullable = true)
 |-- user_sk: integer (nullable = true)
 |-- shares: integer (nullable = true)
 |-- angry: integer (nullable = true)
 |-- haha: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- love: integer (nullable = true)
 |-- sad: integer (nullable = true)
 |-- wow: integer (nullable = true)
 |-- comment_user_sk: integer (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- comment_timestamp: timestamp (nullable = true)



In [102]:
# Write the DataFrame to PostgreSQL's fact_post_interactions table
fact_post_interactions_df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "fact_post_interactions") \
    .option("user", "warehouse") \
    .option("password", "warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()