In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, from_json, udf, to_timestamp, date_format, length, avg, regexp_extract, explode
from pyspark.sql.types import StructType, StructField, StringType
from langdetect import detect
from pyspark.sql.types import StringType

spark = SparkSession.builder \
    .appName("MastodonKafkaStream") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.postgresql:postgresql:42.2.25") \
    .getOrCreate()

print("Session Spark créée")

Session Spark créée


In [2]:
schema = StructType([
    StructField("user", StringType(), True),
    StructField("content", StringType(), True),
    StructField("timestamp", StringType(), True)
])

In [3]:
# Données du topic Kafka 'mastodonstream'
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.1.10:9092") \
    .option("subscribe", "mastodonstream") \
    .load()

# Transformation 1

In [4]:
def detect_language(text):
    try:
        return detect(text)
    except:
        return None

lang_udf = udf(detect_language, StringType())

df_parsed = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

df_with_lang = df_parsed.withColumn("language", lang_udf(col("content")))

df_filtered = df_with_lang.filter(col("language") == "en")

# Transformation 2

In [5]:
# Colonne 'timestamp' en format timestamp
df_with_timestamp = df_filtered.withColumn("timestamp", to_timestamp(col("timestamp")))

df_with_time = df_with_timestamp \
    .withColumn("date", date_format(col("timestamp"), "dd/MM/yyyy")) \
    .withColumn("time", date_format(col("timestamp"), "HH:mm"))

# Action 1

In [6]:
df_with_length = df_with_time.withColumn("toot_length", length(col("content")))

# Longueur moyenne toots
df_avg_length_user = df_with_length.groupBy("user").agg(
    avg("toot_length").alias("avg_toot_length")
)

# Action 2

In [7]:
df_with_length = df_with_time.withColumn("toot_length", length(col("content")))

df_with_hashtags = df_with_length.withColumn("hashtag", regexp_extract(col("content"), r"#(\w+)", 1))

df_filtered_hashtags = df_with_hashtags.filter(col("hashtag") != "")

df_avg_length_hashtag = df_filtered_hashtags.groupBy("hashtag").agg(
    avg("toot_length").alias("avg_toot_length")
)

# Postgres

In [None]:
jdbc_url = "jdbc:postgresql://192.168.1.10:5432/postgres"
jdbc_properties = {
    "user": "postgres",
    "password": "test2021",
    "driver": "org.postgresql.Driver"
}

df_avg_length_user = df_with_length.groupBy("user").agg(
    avg("toot_length").alias("avg_toot_length")
)

def write_to_postgres(batch_df, batch_id):
    batch_df.write.jdbc(url=jdbc_url, table="hashtags", mode="append", properties=jdbc_properties)

# HASTAGS
df_filtered_hashtags = df_filtered_hashtags.withColumn("date", F.to_date("date", "yyyy-MM-dd"))

try:
    df_filtered_hashtags.writeStream \
        .outputMode("append") \
        .foreachBatch(write_to_postgres) \
        .start() \
        .awaitTermination()
except Exception as e:
    print(f"Erreur dans le flux d'écriture des hashtags dans PostgreSQL : {e}")

