In [None]:
import os
os.environ["SPARK_VERSION"] = "spark-3.5.0"
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget  http://apache.osuosl.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!echo $SPARK_VERSION-bin-hadoop3.tgz
!rm $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark/"
os.environ["DRIVE_DATA"]= "/content/gdrive/Shareddrives/BDF/Graded Exercise/clusterdata-2011-2"
DRIVE_DATA = os.environ["DRIVE_DATA"]

!rm /content/spark
!ln -s /content/$SPARK_VERSION-bin-hadoop3 /content/spark
!export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
!echo $SPARK_HOME
!env |grep  "DRIVE_DATA"

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!ls "$DRIVE_DATA"

MD5SUM
README
SHA1SUM
SHA256SUM
dominant_tasks
job_events
machine_attributes
machine_events
schema.csv
schema.xlsx
task_constraints
task_events
task_usage


## Imports


In [None]:
import os
from pyspark.sql.functions import desc, avg, isnull, max
from tqdm import tqdm
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, BooleanType
from pyspark.sql import SparkSession
from pyspark.sql.functions import corr, isnull
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, expr

## Final working code:

### Initialisation de la session spark

In [None]:
# Initialisation de la session Spark
spark = SparkSession.builder \
    .appName("Top CPU Dominant Tasks") \
    .master("local[*]") \
    .getOrCreate()

### jobs dominants en cpu

In [None]:
# Chemin vers les fichiers de données
data_path = os.path.join(os.environ["DRIVE_DATA"], 'task_usage')

# Générer la liste des noms de fichiers à traiter
file_list = [f"part-{str(i).zfill(5)}-of-00500.csv.gz" for i in range(500)]
file_paths = [os.path.join(data_path, filename) for filename in file_list]

# Définition du schéma basé sur l'hypothétique structure des fichiers task_usage
schema = StructType([
    StructField("start time", IntegerType(), True),
    StructField("end time", IntegerType(), True),
    StructField("job ID", StringType(), True),  # Assurez-vous que le type correspond à vos données
    StructField("task index", IntegerType(), True),
    StructField("machine ID", StringType(), True),
    StructField("CPU rate", FloatType(), True),
    StructField("canonical memory usage", FloatType(), True),
    StructField("assigned memory usage", FloatType(), True),
    StructField("unmapped page cache", FloatType(), True),
    StructField("total page cache", FloatType(), True),
    StructField("maximum memory usage", FloatType(), True),
    StructField("disk I/O time", FloatType(), True),
    StructField("local disk space usage", FloatType(), True),
    StructField("maximum CPU rate", FloatType(), True),
    StructField("maximum disk IO time", FloatType(), True),
    StructField("cycles per instruction", FloatType(), True),
    StructField("memory accesses per instruction", FloatType(), True),
    StructField("sample portion", FloatType(), True),
    StructField("aggregation type", IntegerType(), True),
    StructField("sampled CPU usage", FloatType(), True)
])

# Initialisation d'un DataFrame vide pour accumuler les résultats
accumulated_df = None

# Boucle sur chaque fichier avec tqdm pour la barre de progression
for file in tqdm(file_paths, desc="Processing files"):
    # Vérifie si le fichier existe pour éviter les erreurs
    if os.path.exists(file):
        # Lecture du fichier avec le schéma défini
        df = spark.read.csv(file, schema=schema, header=False)  # Assurez-vous que 'schema' est défini

        # Filtrer les tâches où l'ID de job est NULL
        df_filtered = df.filter(~isnull("job ID"))

        # Calcul des tâches dominantes en CPU pour le fichier actuel
        top_cpu_tasks_df = df_filtered.groupBy("job ID", "task index") \
                                      .agg(avg("CPU rate").alias("average_cpu_rate")) \
                                      .orderBy(desc("average_cpu_rate")) \
                                      .limit(10)

        # Accumulation des résultats
        if accumulated_df is None:
            accumulated_df = top_cpu_tasks_df
        else:
            accumulated_df = accumulated_df.union(top_cpu_tasks_df)

# Après avoir accumulé les données, trier pour trouver les tâches globalement dominantes
final_df = accumulated_df.orderBy(desc("average_cpu_rate")).limit(10)

# Afficher les tâches les plus dominantes en CPU à travers tous les fichiers
# spark.stop()  # Stop the SparkContext
# final_df_1.show()



In [None]:
final_df.show()

In [None]:
# save df_combined to a csv file
final_df.coalesce(1).write.csv('top_cpu_tasks', header=True)

### jobs dominants en ram

In [None]:
# Définir le chemin d'accès aux fichiers et la liste des fichiers
data_path = os.path.join(os.environ["DRIVE_DATA"], 'task_usage')
file_list = [f"part-{str(i).zfill(5)}-of-00500.csv.gz" for i in range(500)]
file_paths = [os.path.join(data_path, filename) for filename in file_list]

# Définir le schéma basé sur votre schéma de données
schema = StructType([
    StructField("job ID", StringType(), True),
    StructField("task index", StringType(), True),
    StructField("canonical memory usage", FloatType(), True),
    StructField("assigned memory usage", FloatType(), True),
    StructField("total page cache", FloatType(), True),
    StructField("maximum memory usage", FloatType(), True)
])

accumulated_df = None

# Boucle sur chaque fichier avec tqdm
for file_path in tqdm(file_paths, desc="Processing files"):
    if os.path.exists(file_path):
        # Lire le fichier
        df = spark.read.csv(file_path, schema=schema, header=False)

        # Filtrer les lignes où l'ID de job est NULL
        df_filtered = df.filter(~isnull("job ID"))

        # Calculer les statistiques d'utilisation de la mémoire
        memory_usage_stats = df_filtered.groupBy("job ID", "task index") \
            .agg(
                avg("canonical memory usage").alias("average canonical memory usage"),
                max("canonical memory usage").alias("maximum canonical memory usage"),
                avg("assigned memory usage").alias("average assigned memory usage"),
                max("assigned memory usage").alias("maximum assigned memory usage"),
                avg("total page cache").alias("average total page cache"),
                max("total page cache").alias("maximum total page cache"),
                avg("maximum memory usage").alias("average maximum memory usage"),
                max("maximum memory usage").alias("max maximum memory usage")
            ).orderBy(desc("max maximum memory usage")).limit(10)

        # Accumuler les résultats
        if accumulated_df is None:
            accumulated_df = memory_usage_stats
        else:
            accumulated_df = accumulated_df.union(memory_usage_stats)

# Trier pour obtenir les 10 tâches globalement les plus gourmandes en mémoire
final_dominant_memory_tasks = accumulated_df.orderBy(desc("max maximum memory usage")).limit(10)




Processing files: 100%|██████████| 100/100 [00:04<00:00, 24.43it/s]


In [None]:
# Affichage des résultats
final_dominant_memory_tasks.show()

+------------+------------+------------------------------+------------------------------+-----------------------------+-----------------------------+------------------------+------------------------+----------------------------+------------------------+
|      job ID|  task index|average canonical memory usage|maximum canonical memory usage|average assigned memory usage|maximum assigned memory usage|average total page cache|maximum total page cache|average maximum memory usage|max maximum memory usage|
+------------+------------+------------------------------+------------------------------+-----------------------------+-----------------------------+------------------------+------------------------+----------------------------+------------------------+
| 93810000000| 93900000000|                  5.84456448E9|                   6.2588063E9|                       1064.5|                       8008.0|       8.9123321959375E8|             4.4860575E9|           4.570299230454566|          

### classification des jobs dominants par classe de priorité

In [None]:
# import cpu dominant tasks csv F:\hary ptor\clutser data\top_cpu_tasks
top_cpu_tasks_df = spark.read.csv('top_cpu_tasks.csv', header=True, inferSchema=True)
top_cpu_tasks_df.show()

+----------+----------+------------------+
|    job ID|task index|  average_cpu_rate|
+----------+----------+------------------+
|6210686295|       115| 9.135018315864727|
|5115856683|        12|  4.14795548300026|
|6000622471|        16|1.2118503998654584|
|6210686295|        72|1.2022168225958012|
|6000622471|         2|1.1960032834322192|
|6210686295|       110| 1.090427543153055|
|6000622646|        30|1.0567158031687875|
|6000619150|         5|1.0038247148999397|
|6000619150|        58|0.8901584565639495|
|6000622646|        44|0.8730517601156059|
|4811385404|        55| 2.314707846955999|
|6295212302|        67|1.6543118150597862|
|6295201188|        78|1.3711346668501696|
|6184860354|       233|0.6114499997347593|
|6238840043|       167|0.6112993460569885|
|6184860354|       132|0.5874687507748604|
|6184860354|       233|0.5664588248028475|
|6184860354|        98|0.5617941131486612|
|6184860354|       273| 0.557211763718549|
|6184860354|       132|0.5571000017225742|
+----------

In [None]:
# final_df devrait contenir les colonnes "job ID" et "task index" des jobs dominants en CPU
task_indices = top_cpu_tasks_df.select("job ID", "task index").distinct()


In [None]:
# task_indices.show(10)

In [None]:
# Liste des fichiers task_events (simplifiée pour l'exemple)
task_event_files = [f'part-{i:05d}-of-00500.csv.gz' for i in range(500)]

task_events_schema = StructType([
    StructField("timestamp", StringType(), True),  # Exemple, ajustez selon votre besoin
    StructField("missing info", StringType(), True),  # Exemple, ajustez selon votre besoin
    StructField("job ID", StringType(), True),  # Assurez-vous que ce type correspond à celui utilisé dans final_df
    StructField("task index", StringType(), True),  # Assurez-vous que ce type correspond
    StructField("machine ID", StringType(), True),  # Exemple, ajustez selon votre besoin
    StructField("event type", StringType(), True),  # Exemple, ajustez selon votre besoin
    StructField("user", StringType(), True),  # Exemple, ajustez selon votre besoin
    StructField("scheduling class", StringType(), True),  # Exemple, ajustez selon votre besoin
    StructField("priority", IntegerType(), True),  # La priorité, essentielle pour votre analyse
    # Ajoutez d'autres champs selon le schéma de votre fichier task_events
])

# Initialiser un DataFrame pour accumuler les résultats
accumulated_priority_df = None

for file_name in task_event_files:
    file_path = os.path.join(os.environ["DRIVE_DATA"], 'task_events', file_name)
    if os.path.exists(file_path):
        df_temp = spark.read.csv(file_path, schema=task_events_schema, header=False)

        # Filtrer df_temp pour garder seulement les lignes où "task index" est dans task_indices
        # Ceci est une simplification; en réalité, vous auriez besoin d'une jointure ici
        filtered_df = df_temp.join(task_indices, ["job ID", "task index"], "inner")

        # Accumuler
        if accumulated_priority_df is None:
            accumulated_priority_df = filtered_df
        else:
            accumulated_priority_df = accumulated_priority_df.union(filtered_df)

# À ce stade, accumulated_priority_df contient les priorités pour les indices de tâches de final_df


In [None]:
# accumulated_priority_df a une colonne "priority"
top_priority_jobs = accumulated_priority_df.orderBy(col("priority").desc()).limit(10)
top_priority_jobs.show()


+----------+----------+-------------+------------+----------+----------+--------------------+----------------+--------+
|    job ID|task index|    timestamp|missing info|machine ID|event type|                user|scheduling class|priority|
+----------+----------+-------------+------------+----------+----------+--------------------+----------------+--------+
|6272076905|         0| 163027215040|        NULL|      NULL|         0|Gx2a4JlY7sTN3Jiqp...|               3|      10|
|6272076905|         0|1347760079184|        NULL|4820073668|         3|Gx2a4JlY7sTN3Jiqp...|               3|      10|
|6272076905|         0| 163059602676|        NULL|4820073668|         1|Gx2a4JlY7sTN3Jiqp...|               3|      10|
|6272076905|         0|1347760079195|        NULL|      NULL|         0|Gx2a4JlY7sTN3Jiqp...|               3|      10|
|6272076905|         0|1347790444482|        NULL| 257347123|         1|Gx2a4JlY7sTN3Jiqp...|               3|      10|
|6295212302|        67|1976031581791|   

### etude de la corrélation entre la consommation de cpu et de la mémoire vive

In [None]:
# Définir le schéma basé sur votre structure de données
schema = StructType([
    StructField("start time", StringType(), True),
    StructField("end time", StringType(), True),
    StructField("job ID", StringType(), True),
    StructField("task index", StringType(), True),
    StructField("machine ID", StringType(), True),
    StructField("CPU rate", FloatType(), True),
    StructField("canonical memory usage", FloatType(), True),
    StructField("assigned memory usage", FloatType(), True),
    StructField("unmapped page cache", FloatType(), True),
    StructField("total page cache", FloatType(), True),
    StructField("maximum memory usage", FloatType(), True),
    StructField("disk I/O time", FloatType(), True),
    StructField("local disk space usage", FloatType(), True),
    StructField("maximum CPU rate", FloatType(), True),
    StructField("maximum disk IO time", FloatType(), True),
    StructField("cycles per instruction", FloatType(), True),
    StructField("memory accesses per instruction", FloatType(), True),
    StructField("sample portion", FloatType(), True),
    StructField("aggregation type", StringType(), True),
    StructField("sampled CPU usage", FloatType(), True)
])

# Chemin d'accès aux fichiers et liste des fichiers
data_path = os.path.join(os.environ["DRIVE_DATA"], 'task_usage')
file_list = [f"part-{str(i).zfill(5)}-of-00500.csv.gz" for i in range(500)]
file_paths = [os.path.join(data_path, filename) for filename in file_list]

# Accumuler les données nécessaires pour le calcul de corrélation
accumulated_data_for_correlation = None

# Boucle sur chaque fichier avec tqdm
for file_path in tqdm(file_paths, desc="Processing files"):
    if os.path.exists(file_path):
        # Lire le fichier
        df = spark.read.csv(file_path, schema=schema, header=False)

        # Filtrer les lignes où l'ID de job est NULL et sélectionner les colonnes nécessaires
        filtered_df = df.filter(~isnull("job ID")).select("CPU rate", "canonical memory usage", "assigned memory usage")

        # Accumuler les données
        if accumulated_data_for_correlation is None:
            accumulated_data_for_correlation = filtered_df
        else:
            accumulated_data_for_correlation = accumulated_data_for_correlation.union(filtered_df)

# Calculer le coefficient de corrélation sur les données accumulées
correlation_cpu_memory = accumulated_data_for_correlation.select(
    corr("CPU rate", "canonical memory usage").alias("correlation_cpu_canonical_memory"),
    corr("CPU rate", "assigned memory usage").alias("correlation_cpu_assigned_memory")
)




Processing files: 100%|██████████| 500/500 [00:18<00:00, 26.62it/s]


In [None]:
# Afficher le coefficient de corrélation
correlation_cpu_memory.show()

+--------------------------------+-------------------------------+
|correlation_cpu_canonical_memory|correlation_cpu_assigned_memory|
+--------------------------------+-------------------------------+
|               0.352491425557883|           0.002450362193947116|
+--------------------------------+-------------------------------+



### analyse de la durée des jobs et des tâches

In [None]:
# Définir le schéma pour la lecture des données
schema = StructType([
    StructField("start time", StringType(), True),
    StructField("end time", StringType(), True),
    StructField("job ID", StringType(), True),
    StructField("task index", StringType(), True),
    StructField("machine ID", StringType(), True),
    StructField("CPU rate", FloatType(), True),
    StructField("canonical memory usage", FloatType(), True),
    StructField("assigned memory usage", FloatType(), True),
    # Ajoutez les autres champs selon la structure de vos données
])

# Chemin d'accès aux fichiers
data_path = os.path.join(os.environ["DRIVE_DATA"], 'task_usage')
file_list = [f"part-{str(i).zfill(5)}-of-00500.csv.gz" for i in range(4)]
file_paths = [os.path.join(data_path, filename) for filename in file_list]

# Accumuler les durées de toutes les tâches
accumulated_durations = None

for file_path in tqdm(file_paths, desc="Processing files"):
    if os.path.exists(file_path):
        df = spark.read.csv(file_path, schema=schema, header=False)
        # Calculer la durée et la convertir en secondes
        durations = df.withColumn("duration", (col("end time").cast("long") - col("start time").cast("long")) / 1000000)

        if accumulated_durations is None:
            accumulated_durations = durations
        else:
            accumulated_durations = accumulated_durations.union(durations)

# Après l'accumulation, analyser la distribution de la durée des tâches
accumulated_durations.describe("duration").show()

# Calcul des percentiles pour la durée
percentiles = accumulated_durations.approxQuantile("duration", [0.25, 0.5, 0.75], 0)

# Calcul de l'IQR pour détecter les outliers
IQR = percentiles[2] - percentiles[0]
lower_bound = percentiles[0] - 1.5 * IQR
upper_bound = percentiles[2] + 1.5 * IQR

# Filtrer les tâches considérées comme outliers
outliers = accumulated_durations.filter((col("duration") < lower_bound) | (col("duration") > upper_bound))
outliers.show()
