**Due to the large volume of data processing involved, this part of the workflow was implemented using Google Colab, a cloud-based platform for executing Python code interactively. Google Colab provides access to powerful computing resources, including GPUs and TPUs, which are well-suited for handling large datasets and computationally intensive tasks.**

### Library Installation and Google Drive Authentication

In [None]:
# Installing PyDrive and importing necessary modules
!pip install -U -q PyDrive
!pip install pyspark


### Interacting with Google Drive

In [None]:
# Interagir avec Google Drive
import os
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Traitement des données
import pandas as pd
from glob import glob

# Traitement distribué des données
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, avg, first, expr, stddev
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, TimestampType, BooleanType

from glob import glob
import os
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialisez un DataFrame Spark vide
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, TimestampType, BooleanType

# Visualisation des données
import matplotlib.pyplot as plt
import seaborn as sns

# Manipulation des données Spark
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# Accéder aux fichiers Google Drive
from google.colab import drive

# Imprimer les données
import pprint




### Downloading Files from Google Drive

In [None]:

# Authentication and creation of PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Choosing the directory to contain downloaded files
folder = os.path.expanduser('~/my-directory')
try:
  os.makedirs(folder)
except:
  pass

# Browsing through the Google Drive directory and downloading files
file_list = drive.ListFile(
    {'q': "'1IBNfVfLCinWTFpSgku8x9--evi1q-o6y' in parents"}).GetList()

for file in file_list:
  name = os.path.join(folder, file['title'])
  print('Downloading {}'.format(name))
  new_file = drive.CreateFile({'id': file['id']})
  new_file.GetContentFile(name)


In [None]:
!ls -lha /root/my-directory/prepared0107.csv

### Spark Initialization

In [None]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("example") \
    .config("spark.driver.memory", "15g") \
    .config("spark.executor.memory", "15g") \
    .getOrCreate()


### Reading CSV Files and Creating a Input Spark DataFrame



In [None]:
# Specify the local directory
dossier = '/root/my-directory'  # Modify the path accordingly

# Get the list of all CSV files in the directory
fichiers_csv = glob(os.path.join(dossier, '*.csv'))

# Define your input schema
schema_input = StructType([
    StructField("TIMESTAMP", TimestampType(), True),
    StructField("BATCH_ID", LongType(), True),
    StructField("INPUT_MAPPING_Y", IntegerType(), True),
    StructField("INPUT_MAPPING_X", IntegerType(), True),
    StructField("ZONE", IntegerType(), True),
    StructField("PHASE", StringType(), True),
    StructField("PARAM_1", DoubleType(), True),
    StructField("PARAM_2", DoubleType(), True),
    StructField("PARAM_3", DoubleType(), True),
    StructField("PARAM_4", DoubleType(), True)
])

# Initialize an empty global DataFrame
dataframe_global = None

# Iterate through the CSV files in the local directory
for fichier_chemin in fichiers_csv:
    # Read the CSV file into a Spark DataFrame with the explicit schema
    dataframe_fichier = spark.read.csv(fichier_chemin, header=True, schema=schema_input)

    # If the global DataFrame hasn't been defined yet, use the schema from the first file
    if dataframe_global is None:
        dataframe_global = spark.createDataFrame([], schema=schema_input)

    # Ensure columns match before adding to the global DataFrame
    dataframe_fichier = dataframe_fichier.select(*schema_input.fieldNames())

    # Union the DataFrames
    dataframe_global = dataframe_global.union(dataframe_fichier)

# Show the global DataFrame
dataframe_global.show()


In [None]:

nombre_de_lignes_in = dataframe_global.count()
nombre_de_colonnes_in = len(dataframe_global.columns)

print("Nombre de lignes input filtré:", nombre_de_lignes_in)
print("Nombre de colonnes input filtré:", nombre_de_colonnes_in)


### Authentication and Downloading Files from Google Drive (output)

In [None]:
# Authentication and creating PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Choosing the directory to store downloaded files
directory = os.path.expanduser('~/mon-repertoire_output')
try:
  os.makedirs(directory)
except:
  pass

# Traversing the Google Drive directory and downloading files
file_list = drive.ListFile(
    {'q': "'1otDaK3HIoawL_wVb55OBBRNUuAGZ-JUh' in parents"}).GetList()

for file in file_list:
  name = os.path.join(directory, file['title'])
  print('Downloading {}'.format(name))
  new_file = drive.CreateFile({'id': file['id']})
  new_file.GetContentFile(name)


Downloading /root/mon-repertoire_output/df_results.xlsx
Downloading /root/mon-repertoire_output/data_final_A.xlsx
Downloading /root/mon-repertoire_output/lot_strip_relation_joined_windows_prepared.csv


### Spark Initialization and Reading the Downloaded CSV File (output)

In [None]:
# Create the full path of the file
file_path = '/root/mon-repertoire_output/lot_strip_relation_joined_windows_prepared.csv'

# Initialize an empty Spark DataFrame
schema = None
dataframe_global_output = None

# Read the CSV file into a Spark DataFrame
dataframe_global_output = spark.read.csv(file_path, header=True, inferSchema=True)
schema = dataframe_global_output.schema

# Show the global DataFrame
dataframe_global_output.show()


In [None]:
nombre_de_lignes_out = dataframe_global_output.count()
nombre_de_colonnes_out = len(dataframe_global_output.columns)

print("Number of output rows:", nombre_de_lignes_out)
print("Number of output columns:", nombre_de_colonnes_out)


### Filtering the Input DataFrame Based on BATCH_IDs Present in the Output DataFrame

In [None]:
# Filter the input database based on the BATCH_IDs present in the output database
dataframe_filtered_input = dataframe_global.filter(col('BATCH_ID').isin(dataframe_global_output.select('BATCH_ID').distinct().rdd.flatMap(lambda x: x).collect()))

# Show the filtered DataFrame of the input database
dataframe_filtered_input.show()


### Calculating Statistics for Numeric Columns Grouped by "PHASE"

In [None]:
# Select the numeric columns for which you want to calculate statistics
numeric_columns = ['PARAM_1', 'PARAM_2', 'PARAM_3', 'PARAM_4']

# Group by "PHASE" and calculate the mean for each numeric column
dataframe_filtered_input.groupBy("PHASE") \
    .agg(mean("PARAM_1").alias("avg_p1"), \
         mean("PARAM_2").alias("avg_p2"), \
         mean("PARAM_3").alias("avg_p3"), \
         mean("PARAM_4").alias("avg_p4"), \
     ) \
    .show(truncate=False)


### Displaying Data for Phase A

In [None]:
# Filter the input database based on the BATCH_IDs present in the output database
data_input_A = dataframe_filtered_input.filter(col("PHASE") == "PHASE_A")

# Show the filtered input DataFrame for PHASE_A
data_input_A.show()


### Correlation Between Numeric Columns

In [None]:
# Select columns A, B, C, and D
selected_columns = ['PARAM_1', 'PARAM_2', 'PARAM_3', 'PARAM_4']
selected_data = data_input_A.select(selected_columns)
selected_data = selected_data.na.drop()

# Assemble the columns into a vector
assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")
assembled_data = assembler.transform(selected_data)

# Calculate the correlation matrix
correlation_matrix = Correlation.corr(assembled_data, "features").head()

# Obtenez la matrice DenseMatrix
dense_matrix = correlation_matrix[0]

# Convertir la matrice dense en une liste de listes
correlation_matrix_list = dense_matrix.toArray().tolist()

# Créer un DataFrame pandas à partir de la liste de listes
columns = selected_columns
corr_df = pd.DataFrame(correlation_matrix_list, index = columns, columns=columns)

# Tracer la heatmap avec les noms de colonnes
plt.figure(figsize=(10, 8))
sns.heatmap(corr_df, annot=True, cmap="coolwarm", fmt=".2f", linewidths=.5)
plt.title("Matrice de corrélation")
plt.show()

In [None]:
# Sort the DataFrame based on the timestamp column in ascending order
data_input_A = data_input_A.orderBy("TIMESTAMP")

# Display the sorted DataFrame
data_input_A.show()


### Grouping by BATCH_ID, PHASE, INPUT_MAPPING_X, and INPUT_MAPPING_Y with Statistics Calculation for Each Parameter

In [None]:
# Group by BATCH_ID, PHASE, INPUT_MAPPING_X, and INPUT_MAPPING_Y, calculate statistics for each parameter
data_input_A = data_input_A.groupBy("BATCH_ID", "PHASE", "INPUT_MAPPING_X", "INPUT_MAPPING_Y") \
    .agg(
        first("TIMESTAMP").alias("date_input"),
        avg("PARAM_1").alias("avg_param_1"),
        expr("percentile(PARAM_1, 0.25)").alias("q1_param_1"),
        expr("percentile(PARAM_1, 0.5)").alias("q2_param_1"),
        expr("percentile(PARAM_1, 0.75)").alias("q3_param_1"),
        stddev("PARAM_1").alias("stddev_param_1"),
        avg("PARAM_2").alias("avg_param_2"),
        expr("percentile(PARAM_2, 0.25)").alias("q1_param_2"),
        expr("percentile(PARAM_2, 0.5)").alias("q2_param_2"),
        expr("percentile(PARAM_2, 0.75)").alias("q3_param_2"),
        stddev("PARAM_2").alias("stddev_param_2"),
        avg("PARAM_3").alias("avg_param_3"),
        expr("percentile(PARAM_3, 0.25)").alias("q1_param_3"),
        expr("percentile(PARAM_3, 0.5)").alias("q2_param_3"),
        expr("percentile(PARAM_3, 0.75)").alias("q3_param_3"),
        stddev("PARAM_3").alias("stddev_param_3"),
        avg("PARAM_4").alias("avg_param_4"),
        expr("percentile(PARAM_4, 0.25)").alias("q1_param_4"),
        expr("percentile(PARAM_4, 0.5)").alias("q2_param_4"),
        expr("percentile(PARAM_4, 0.75)").alias("q3_param_4"),
        stddev("PARAM_4").alias("stddev_param_4")
    )

# Display the result
data_input_A.show()


In [None]:
# Sort the DataFrame based on the timestamp column in ascending order
data_input_A = data_input_A.orderBy("BATCH_ID", "PHASE", "INPUT_MAPPING_X", "INPUT_MAPPING_Y", "date_input")

number_of_distinct_batch_ids = data_input_A.count()

print("Number of distinct batch IDs:", number_of_distinct_batch_ids)


Number of distinct batch IDs: 55555


### Adding OUTPUT_MAPPING_X and OUTPUT_MAPPING_Y Columns

In [None]:
# Add the columns iox and ioy
data_input_A = data_input_A.withColumn("OUTPUT_MAPPING_X", 16 - col("INPUT_MAPPING_X")).withColumn("OUTPUT_MAPPING_Y", 4 - col("INPUT_MAPPING_Y"))

# Show the updated DataFrame
data_input_A.show()


In [None]:
# Group by batch_id, phase, input_x, and input_y, calculate the mean for each parameter
dataframe_global_output = dataframe_global_output.orderBy("BATCH_ID", "OUTPUT_MAPPING_X", "OUTPUT_MAPPING_Y")

# Show the result
dataframe_global_output.show()


In [None]:
nb_by_batch_id = dataframe_global_output.groupby('BATCH_ID').count()
nb_by_batch_id.show()


### Joining Input and Output DataFrames for Displaying the Final DataFrame

In [None]:
# Perform the join on the columns 'batch_id', 'iox', and 'ioy'
data_final_A = data_input_A.join(dataframe_global_output, on=['BATCH_ID', 'OUTPUT_MAPPING_X', 'OUTPUT_MAPPING_Y'], how='inner')

# Show the result
data_final_A.show()


In [None]:
nombre_de_ligne_A = data_final_A.count()

print("Nombre distinct de batch id :", nombre_de_ligne_A)


### Grouping by "OUTPUT" with Statistics Calculation for Each Parameter

In [None]:
# Group by "OUTPUT", calculate statistics for each parameter
sum_output_A = data_final_A.groupBy("OUTPUT")\
    .agg(
        avg("avg_param_1").alias("avg_avg_param_1"),
        avg("q1_param_1").alias("avg_q1_param_1"),
        avg("q2_param_1").alias("avg_q2_param_1"),
        avg("q3_param_1").alias("avg_q3_param_1"),
        avg("stddev_param_1").alias("avg_stddev_param_1"),
        avg("avg_param_2").alias("avg_avg_param_2"),
        avg("q1_param_2").alias("avg_q1_param_2"),
        avg("q2_param_2").alias("avg_q2_param_2"),
        avg("q3_param_2").alias("avg_q3_param_2"),
        avg("stddev_param_2").alias("avg_stddev_param_2"),
        avg("avg_param_3").alias("avg_avg_param_3"),
        avg("q1_param_3").alias("avg_q1_param_3"),
        avg("q2_param_3").alias("avg_q2_param_3"),
        avg("q3_param_3").alias("avg_q3_param_3"),
        avg("stddev_param_3").alias("avg_stddev_param_3"),
        avg("avg_param_4").alias("avg_avg_param_4"),
        avg("q1_param_4").alias("avg_q1_param_4"),
        avg("q2_param_4").alias("avg_q2_param_4"),
        avg("q3_param_4").alias("avg_q3_param_4"),
        avg("stddev_param_4").alias("avg_stddev_param_4")
    )

# Show the result
sum_output_A.show()


### Converting the Final DataFrame to a Pandas DataFrame for Creating an Excel File for Future Analyses

In [None]:
# Select the desired columns
data_final_A_pd = data_final_A.toPandas()

# Display the final result
data_final_A_pd.head()


In [None]:
# Specify the path where you want to save the Excel file
excel_file_path = '/content/drive/MyDrive/output/data_final_A.xlsx'

# Save the DataFrame to Excel
data_final_A_pd.to_excel(excel_file_path, index=False)
