In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, avg, round as _round

# Database connection properties
db_properties = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver"
}

jdbc_url = "jdbc:postgresql://postgres-db:5432/analytics"

# Create Spark session
spark = SparkSession.builder \
    .appName("DatabaseAnalysis") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "./jars/postgresql-42.7.8.jar") \
    .getOrCreate()

print("\\n=== READING DATA FROM POSTGRESQL ===")

# Read customers table
customers_df = spark.read.jdbc(
    url=jdbc_url,
    table="customers",
    properties=db_properties
)

print(f"Total customers: {customers_df.count()}")
customers_df.show()

# Read orders table
orders_df = spark.read.jdbc(
    url=jdbc_url,
    table="orders",
    properties=db_properties
)

print(f"\\nTotal orders: {orders_df.count()}")
orders_df.show()

# Perform analysis: Customer order summary
print("\\n=== CUSTOMER ORDER SUMMARY ===")
customer_summary = customers_df.join(
    orders_df,
    customers_df.customer_id == orders_df.customer_id,
    "left"
).groupBy(
    customers_df.customer_id,
    "first_name",
    "last_name",
    "country"
).agg(
    count("order_id").alias("total_orders"),
    _round(_sum("total_amount"), 2).alias("total_spent"),
    _round(avg("total_amount"), 2).alias("avg_order_value")
).orderBy(col("total_spent").desc())

customer_summary.show()

# Analyze by country
print("\\n=== SALES BY COUNTRY ===")
country_sales = customers_df.join(
    orders_df,
    customers_df.customer_id == orders_df.customer_id
).groupBy("country").agg(
    count("order_id").alias("total_orders"),
    _round(_sum("total_amount"), 2).alias("total_revenue")
).orderBy(col("total_revenue").desc())

country_sales.show()

# Write results back to database
print("\\n=== WRITING RESULTS TO DATABASE ===")
customer_summary.write.jdbc(
    url=jdbc_url,
    table="customer_analytics",
    mode="overwrite",
    properties=db_properties
)

print("Analysis complete! Results saved to 'customer_analytics' table.")

spark.stop()


26/01/14 16:56:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/14 16:57:47 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
26/01/14 16:57:47 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
26/01/14 16:57:48 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master


Py4JJavaError: An error occurred while calling None.org.apache.spark.sql.classic.SparkSession.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
py4j.ClientServerConnection.run(ClientServerConnection.java:108)
java.base/java.lang.Thread.run(Thread.java:1583)

And it was stopped at:

org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2295)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:128)
	at org.apache.spark.sql.classic.SparkSession.<init>(SparkSession.scala:125)
	at org.apache.spark.sql.classic.SparkSession.<init>(SparkSession.scala:118)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
