## Start Spark Session

In [None]:
# Import required libraries
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import psycopg2
import matplotlib.pyplot as plt
import seaborn as sns

os.environ["SPARK_CLASSPATH"] = "/opt/spark/jars/postgresql-42.7.0.jar"

# Initialize Spark Session with PostgreSQL JDBC driver (using local JAR)
spark = SparkSession.builder \
    .appName("SQL_Problems_Analysis") \
    .master("local[*]") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.7.0.jar") \
    .config("spark.driver.extraClassPath", "/opt/spark/jars/postgresql-42.7.0.jar") \
    .config("spark.executor.extraClassPath", "/opt/spark/jars/postgresql-42.7.0.jar") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/29 12:04:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/29 12:04:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Configure Postgres

In [2]:
# PostgreSQL connection configuration
postgres_config = {
    "host": "postgres",  # Docker service name
    "port": "5432",
    "database": "postgres",
    "user": "postgres",
    "password": "postgres"
}

# JDBC URL for Spark
jdbc_url = f"jdbc:postgresql://{postgres_config['host']}:{postgres_config['port']}/{postgres_config['database']}"
connection_properties = {
    "user": postgres_config["user"],
    "password": postgres_config["password"],
    "driver": "org.postgresql.Driver"
}

print("PostgreSQL configuration:")
print(f"JDBC URL: {jdbc_url}")
print(f"User: {postgres_config['user']}")

PostgreSQL configuration:
JDBC URL: jdbc:postgresql://postgres:5432/postgres
User: postgres


## Read table data in Data

In [27]:
tables_query = """SELECT * FROM fb_friend_requests"""

table_df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("query", tables_query) \
    .option("user", connection_properties["user"]) \
    .option("password", connection_properties["password"]) \
    .option("driver", connection_properties["driver"]) \
    .load()

table_df.show()

+--------------+----------------+----------+--------+
|user_id_sender|user_id_receiver|      date|  action|
+--------------+----------------+----------+--------+
|     ad4943sdz|      948ksx123d|2020-01-04|    sent|
|     ad4943sdz|      948ksx123d|2020-01-06|accepted|
|    dfdfxf9483|      9djjjd9283|2020-01-04|    sent|
|    dfdfxf9483|      9djjjd9283|2020-01-15|accepted|
| ffdfff4234234|     lpjzjdi4949|2020-01-06|    sent|
|   fffkfld9499|     993lsldidif|2020-01-06|    sent|
|   fffkfld9499|     993lsldidif|2020-01-10|accepted|
|    fg503kdsdd|       ofp049dkd|2020-01-04|    sent|
|    fg503kdsdd|       ofp049dkd|2020-01-10|accepted|
|    hh643dfert|      847jfkf203|2020-01-04|    sent|
|    r4gfgf2344|      234ddr4545|2020-01-06|    sent|
|    r4gfgf2344|      234ddr4545|2020-01-11|accepted|
+--------------+----------------+----------+--------+



In [39]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# Bring the sent and accepted in single row using lead
window = Window.partitionBy('user_id_sender').orderBy('date')
df = table_df.withColumn('next_action', f.lead('action').over(window))

# Consider only row with 'sent'
df = df.filter(f.col('action') == 'sent')

df.show()

+--------------+----------------+----------+------+-----------+
|user_id_sender|user_id_receiver|      date|action|next_action|
+--------------+----------------+----------+------+-----------+
|     ad4943sdz|      948ksx123d|2020-01-04|  sent|   accepted|
|    dfdfxf9483|      9djjjd9283|2020-01-04|  sent|   accepted|
| ffdfff4234234|     lpjzjdi4949|2020-01-06|  sent|       NULL|
|   fffkfld9499|     993lsldidif|2020-01-06|  sent|   accepted|
|    fg503kdsdd|       ofp049dkd|2020-01-04|  sent|   accepted|
|    hh643dfert|      847jfkf203|2020-01-04|  sent|       NULL|
|    r4gfgf2344|      234ddr4545|2020-01-06|  sent|   accepted|
+--------------+----------------+----------+------+-----------+



In [40]:
total_user_count = df.count()
total_user_count

7

In [41]:
accepted_count = df.filter(f.col('next_action') == 'accepted').agg(f.count('*')).collect()[0][0]
accepted_count

5

In [44]:
acceptance_percentage = round((accepted_count / total_user_count) * 100, 2)
acceptance_percentage

71.43