In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (SparkSession.builder.master('local').appName('dss_spark')
        # Install and set up the OpenLineage listener
        .config('spark.jars.packages', 'io.openlineage:openlineage-spark_2.12:1.24.2,org.postgresql:postgresql:42.2.18')
        .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
        .config('spark.openlineage.transport.url', 'http://marquez-api:5000')
        .config('spark.openlineage.transport.type', 'http')
        .config('spark.openlineage.namespace', 'spark_integration')
        .enableHiveSupport()
        .getOrCreate())


spark.sql('CREATE DATABASE IF NOT EXISTS public')

spark.sql('''CREATE TABLE IF NOT EXISTS speakers (
    speaker_id INT,
    name VARCHAR(255),
    bio STRING
);''')
    
spark.sql("""CREATE TABLE IF NOT EXISTS sessions (
    session_id INT,
    title VARCHAR(255),
    speaker_id INT,
    start_time TIMESTAMP,
    end_time TIMESTAMP
);""")

spark.sql("""CREATE TABLE IF NOT EXISTS attendees (
    attendee_id INT,
    name VARCHAR(255),
    email VARCHAR(255),
    session_id INT
);""")

spark.sql("""INSERT INTO speakers (speaker_id, name, bio)
VALUES 
(1, 'John Smith', 'John is a renowned expert in the field of data science'),
(2, 'Jane Doe', 'Jane is a leading researcher in machine learning'),
(3, 'Bob Johnson', 'Bob is a seasoned industry professional with expertise in Python'),
(4, 'Jakub Dardzinski', 'Just a random guy')
;""")

spark.sql("""
INSERT INTO sessions (session_id, title, speaker_id, start_time, end_time)
VALUES (1, 'Introduction to Data Science', 1, timestamp('2024-11-22 09:00:00'), timestamp('2024-11-22 10:30:00')),
       (2, 'Machine Learning with Python', 2, timestamp('2024-11-22 10:45:00'), timestamp('2024-11-22 12:15:00')),
       (3, 'Big Data and Analytics', 3, timestamp('2024-11-22 13:30:00'), timestamp('2024-11-22 15:00:00')),
       (4, 'Make Data Science Great', 1, timestamp('2024-11-22 15:00:00'), timestamp('2024-11-22 15:30:00')),
       (5, 'OpenLineage: Easy Way to Unlock Your Data Potential', 4, timestamp('2024-11-22 15:30:00'), timestamp('2024-11-22 16:00:00')),
       (6, 'Machine Learning Unlocked', 2, timestamp('2024-11-22 16:30:00'), timestamp('2024-11-22 17:00:00'))
;""")

spark.sql("""INSERT INTO attendees (attendee_id, name, email, session_id)
VALUES 
(1, 'Alice Brown', 'alicebrown@example.com', 1),
(2, 'Bob Davis', 'bobdavis@example.com', 2),
(3, 'Charlie Wilson', 'charliewilson@example.com', 3)
;""")

df = spark.sql("""SELECT s.session_id, COUNT(a.attendee_id) AS attendees_count
FROM sessions s
JOIN attendees a ON s.session_id = a.session_id
GROUP BY s.session_id;""")

df.write.mode("overwrite").saveAsTable('public.attendees_by_session')

df.write \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://postgres/airflow") \
    .option("dbtable", "public.attendees_by_session") \
    .option("user", "airflow") \
    .option("password", "airflow") \
    .mode("overwrite") \
    .save()

In [4]:
top_speakers = spark.read.table("sessions").groupBy("speaker_id").count().sort("count", ascending=False).limit(2)
top_speakers.createOrReplaceTempView("top_speakers")

df = spark.sql("""
SELECT DISTINCT 
sp.speaker_id,
sp.name, s.session_id
FROM speakers sp
JOIN sessions s ON s.speaker_id = sp.speaker_id
WHERE sp.speaker_id IN (SELECT speaker_id FROM top_speakers)
ORDER BY sp.name, s.session_id
;""")

df.write \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://postgres/airflow") \
    .option("dbtable", "public.top_speakers") \
    .option("user", "airflow") \
    .option("password", "airflow") \
    .mode("overwrite") \
    .save()