In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

In [3]:
# Configuration des dépendances pour Spark Streaming avec Kafka et Elasticsearch
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.1,'
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,'
    'org.elasticsearch:elasticsearch-spark-30_2.12:8.6.1 pyspark-shell'
)

In [4]:
# Créer une session Spark
spark = SparkSession.builder \
    .appName("Dc2ToElasticsearch") \
    .master("local[*]") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.es.nodes", "localhost") \
    .config("spark.es.port", "9200") \
    .getOrCreate()

In [5]:
#Define Kafka source
kafka_brokers = "localhost:9092" 
kafka_topic = "test_stream3"  

In [6]:
raw_logs_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [7]:
logs_df = raw_logs_df.selectExpr("CAST(value AS STRING) AS text")

logs_df = logs_df.withColumn("Date", regexp_extract(col("text"), r"(\d{4}-\d{2}-\d{2})", 1)) \
                 .withColumn("Time", regexp_extract(col("text"), r"(\d{2}:\d{2}:\d{2},\d{3})", 1)) \
                 .withColumn("Level", regexp_extract(col("text"), r"(INFO|DEBUG|ERROR|WARN|FATAL)", 1)) \
                 .withColumn("Component", regexp_extract(col("text"), r"([a-zA-Z.]+):", 1)) \
                 .withColumn("Content", regexp_extract(col("text"), r": (.+)", 1)) \
                 .withColumn("EventTemplate", col("Content"))

In [8]:
#Select and format final columns for logs
processed_logs_df = logs_df.select(
    col("Date"),
    col("Time"),
    col("Level"),
    col("Component"),
    col("Content"),
    col("EventTemplate")
)

In [9]:
#Filter Logs (INFO/DEBUG/ERROR/WARN/FATAL messages only)
filtered_logs_df = processed_logs_df.filter(
    (col("Level").isin("INFO", "DEBUG", "ERROR", "WARN", "FATAL")) &  # Filter for these log levels
    ~col("Content").contains("ERROR") &    # Exclude ERROR messages in Content
    ~col("text").startswith("java.")       # Exclude java errors
)

In [10]:
#Filter Errors (Lines starting with "java." or containing "ERROR")
filtered_errors_df = logs_df.filter(
    col("text").startswith("java.") |  # Lines starting with "java."
    col("Content").contains("ERROR")   # Lines containing "ERROR"
)

In [11]:
#Function to send email alerts for errors
def send_email_alert(log_message):
    try:
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()
        server.login("Trifisinda@gmail.com", "iupasqjljgvymecu")

        subject = "Alerte Critique dans les Logs"
        body = f"Message d'erreur critique détecté : {log_message}"

        msg = MIMEMultipart()
        msg['From'] = "Trifisinda@gmail.com"
        msg['To'] = "Trifisinda@gmail.com"
        msg['Subject'] = subject
        msg.attach(MIMEText(body, 'plain', 'utf-8'))

        server.sendmail(msg['From'], msg['To'], msg.as_string())
        server.quit()
        print(f"Alerte envoyée par email pour le message : {log_message}")
    except Exception as e:
        print(f"Erreur lors de l'envoi de l'email : {e}")

#Send email alerts for errors (in streaming)
filtered_errors_query = filtered_errors_df.writeStream \
    .foreachBatch(lambda batch_df, _: [send_email_alert(row['text']) for row in batch_df.collect()]) \
    .outputMode("update") \
    .start()

In [12]:
# Define Elasticsearch options
es_options = {
  "es.nodes": "localhost",  
  "es.port": "9200",        
  "es.index.auto.create": "true",  
  "es.resource":"datacenter2-{Date}" 
}

Alerte envoyée par email pour le message : java.io.EOFException: End of File Exception between local host is: "mesos-slave-32/127.0.1.1"; destination host is: "mesos-master-1":9000; : java.io.EOFException; For more details see:  http://wiki.apache.org/hadoop/EOFException
Alerte envoyée par email pour le message : java.io.EOFException: End of File Exception between local host is: "mesos-slave-32/127.0.1.1"; destination host is: "mesos-master-1":9000; : java.io.EOFException; For more details see:  http://wiki.apache.org/hadoop/EOFException


In [13]:
es_error_options = {
  "es.nodes":"localhost",
  "es.port":"9200",
  "es.index.auto.create":"true",
  "es.resource":"erros_datacenter2"
}

Alerte envoyée par email pour le message : java.io.EOFException: End of File Exception between local host is: "mesos-slave-32/127.0.1.1"; destination host is: "mesos-master-1":9000; : java.io.EOFException; For more details see:  http://wiki.apache.org/hadoop/EOFException


In [14]:
query = filtered_logs_df.writeStream \
    .format("org.elasticsearch.spark.sql") \
    .options(**es_options) \
    .outputMode("append") \
    .option("checkpointLocation", "C:\\checkpoint") \
    .start()

Alerte envoyée par email pour le message : java.lang.NullPointerException


In [15]:
query_error = filtered_errors_df.writeStream \
    .format("org.elasticsearch.spark.sql") \
    .options(**es_error_options) \
    .outputMode("append") \
    .option("checkpointLocation", "C:\\checkpoint") \
    .start()

Alerte envoyée par email pour le message : java.net.SocketTimeoutException: Call From mesos-slave-32/127.0.1.1 to mesos-master-1:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.10.34.42:48476 remote=mesos-master-1/10.10.34.11:9000]; For more details see:  http://wiki.apache.org/hadoop/SocketTimeout


In [16]:
spark.streams.awaitAnyTermination()

Alerte envoyée par email pour le message : java.net.SocketTimeoutException: Call From mesos-slave-32/127.0.1.1 to mesos-master-1:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.10.34.42:56417 remote=mesos-master-1/10.10.34.11:9000]; For more details see:  http://wiki.apache.org/hadoop/SocketTimeout
Alerte envoyée par email pour le message : java.net.SocketTimeoutException: Call From mesos-slave-32/127.0.1.1 to mesos-master-1:9000 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.10.34.42:56418 remote=mesos-master-1/10.10.34.11:9000]; For more details see:  http://wiki.apache.org/hadoop/SocketTimeout
Alerte envoyée par email pour le message : java.net.SocketTimeoutException: Call From mesos-slav