In [1]:
!pip3 install pyspark



In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.postgresql:postgresql:42.2.18 pyspark-shell'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Créer une session Spark
spark = SparkSession.builder \
    .appName("KafkaSparkPostgres").config("spark.driver.host", "localhost").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.postgresql:postgresql:42.2.18") \
    .getOrCreate()
print(spark.version)

3.5.1


In [4]:
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("timestamp", StringType(), True)
])

In [5]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "new") \
    .load()

In [6]:
# Parse the JSON data
parsed_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

# Process the data (example: double the amount)
processed_df = parsed_df.withColumn("amount", col("amount") * 2)

In [7]:
def write_to_postgres(df, epoch_id):
    df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/mydatabase") \
        .option("dbtable", "transactions") \
        .option("user", "myuser") \
        .option("password", "mypassword") \
        .mode("append") \
        .save()

# Write the data to PostgreSQL
query = processed_df \
    .writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("update") \
    .start()