SETUP

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive

Mounted at /content/drive


## MERGING CSV FILES

In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Load event info
event_info = pd.read_csv('Wind Farm C/comma_event_info.csv')

# Get a list of CSV files (limit to 10 files)
csv_files = [file for file in os.listdir('Wind Farm C/datasets') if file.endswith('.csv') and file.startswith('comma_')][:10]

# Initialize an empty list to store DataFrames
dataframes = []

for file in csv_files:
    file_path = os.path.join('Wind Farm C/datasets', file)  # Full file path
    df = pd.read_csv(file_path)  # Read the CSV file

    # Extract `event_id` from the file name (e.g., 'comma_0' -> 0)
    event_id = int(file.split('_')[1].split('.')[0])
    df['event_id'] = event_id  # Add event ID column

    dataframes.append(df)  # Append the DataFrame to the list

# Concatenate all DataFrames
wind_farm_c = pd.concat(dataframes, ignore_index=True)

# Save combined DataFrame to CSV
#wind_farm_c.to_csv('wind_farm_c.csv', index=False)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Wind Turbine Fault Detection") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

# Load the combined CSV into a Spark DataFrame
path = '/content/drive/MyDrive/wind_farm_c.csv'
df = spark.read.csv(path, header=True, inferSchema=True)

wind_farm_c_spark = df

# Check missing values for each column
wind_farm_c_spark.select(
    [(sum(col(c).isNull().cast("int"))).alias(c) for c in wind_farm_c_spark.columns]
).show()


+----------+--------+---+----------+--------------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+------------+------------+----------

In [None]:
#wind_farm_c_spark.coalesce(1).write.csv("wind_farm_c_spark.csv", header=True, mode="overwrite")

In [None]:
event_info_spark=spark.createDataFrame(event_info)
wind_farm_c_spark_joined = wind_farm_c_spark.join(
    event_info_spark.select("event_id", "event_label", "event_description"),
    on="event_id",
    how="left")

In [None]:
wind_farm_c_spark_joined.show()

+--------+-------------------+--------+---+----------+--------------+------------+------------+------------+------------+------------+------------+------------+------------+------------------+-----------+------------------+------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------------+-----------+-----------+------------------+------------------+-----------+-----------+------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+----------

In [None]:
# CSV dosyasını tek bir dosya olarak kaydetme
#wind_farm_c_spark_joined.coalesce(1).write.option("header", "true").mode("overwrite").csv("/content/drive/MyDrive/wind_farm_c_joined.csv")

## LOAD JOINED CSV FROM DISK

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Wind Turbine Fault Detection").config("spark.executor.memory", "4g").config("spark.driver.memory", "4g").config("spark.executor.instances", "2").config("spark.executor.cores", "2").getOrCreate()
wind_farm_c_spark_joined = spark.read.csv('/content/drive/MyDrive/wind_farm_c_joined.csv/part-00000-6f226709-d6df-4e2b-9e27-9ad280bce88d-c000.csv', header=True, inferSchema=True)

In [None]:
from pyspark.sql.functions import col, sum, mean, hour, dayofweek

wind_farm_c_spark_joined = wind_farm_c_spark_joined.withColumn("hour", hour("time_stamp"))
wind_farm_c_spark_joined = wind_farm_c_spark_joined.withColumn("day_of_week", dayofweek("time_stamp"))

## RANDOM FOREST CLASSIFIER

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 'train_test' sütununa göre veri setini ayırıyoruz
train_df = wind_farm_c_spark_joined.filter(wind_farm_c_spark_joined['train_test'] == 'train')
test_df = wind_farm_c_spark_joined.filter(wind_farm_c_spark_joined['train_test'] == 'prediction')

In [None]:
print('train:', train_df.count(), 'test:', test_df.count())

train: 519552 test: 28305


In [None]:
# Adım 2: Etiketlerin sayısal verilere dönüştürülmesi
# 'event_label' sütununu sayısal hale getirelim: 1 -> 'normal', 0 -> 'anomaly'
train_df = train_df.withColumn("label", F.when(train_df['event_label'] == 'normal', 1)
                                           .otherwise(0))
test_df = test_df.withColumn("label", F.when(test_df['event_label'] == 'normal', 1)
                                        .otherwise(0))

In [None]:
cols=train_df.columns

In [None]:
# prompt: filter cols if they contain "avg"

avg_cols = [col for col in cols if 'avg' in col]
avg_cols.append("hour")
avg_cols.append("day_of_week")
feature_columns=avg_cols
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(train_df)
train_predictions = model.transform(train_df)
evaluator = BinaryClassificationEvaluator(labelCol="label")
train_accuracy = evaluator.evaluate(train_predictions)
print(f"Training Accuracy: {train_accuracy}")

Training Accuracy: 0.9780074033881659


### Evaluation

In [None]:
# Confusion Matrix bileşenlerini hesapla
predictions = model.transform(test_df)
true_positives = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()
true_negatives = predictions.filter((predictions.label == 0) & (predictions.prediction == 0)).count()
false_positives = predictions.filter((predictions.label == 0) & (predictions.prediction == 1)).count()
false_negatives = predictions.filter((predictions.label == 1) & (predictions.prediction == 0)).count()

print(f"TP: {true_positives}, TN: {true_negatives}, FP: {false_positives}, FN: {false_negatives}")

TP: 4905, TN: 21222, FP: 24, FN: 2154


In [None]:
test_accuracy = evaluator.evaluate(predictions)


In [None]:
test_accuracy

0.9795648741700596

In [None]:
# prompt: print stats for test predictions

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# ... (your existing code) ...

# Evaluate the model using MulticlassClassificationEvaluator
evaluator_multi = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
test_accuracy_multi = evaluator_multi.evaluate(predictions)
print(f"Test Accuracy (Multiclass): {test_accuracy_multi}")

# Calculate precision, recall and F1-score
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
test_precision = evaluator_precision.evaluate(predictions)
print(f"Test Precision (Weighted): {test_precision}")

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
test_recall = evaluator_recall.evaluate(predictions)
print(f"Test Recall (Weighted): {test_recall}")

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
test_f1 = evaluator_f1.evaluate(predictions)
print(f"Test F1-score: {test_f1}")

Test Accuracy (Multiclass): 0.9230524642289348
Test Precision (Weighted): 0.929620182364162
Test Recall (Weighted): 0.9230524642289348
Test F1-score: 0.9180530529342907


### Save the model
* skip this step

In [None]:
# prompt: i saved the model, now i want to load it from path /content/drive/MyDrive/wind_farm_c_model.h/stages/1_RandomForestClassifier_6a785af8fd60

from pyspark.ml import PipelineModel

# Load the saved model
model = PipelineModel.load("/content/drive/MyDrive/wind_farm_c_model.h")

# Now you can use the loaded model for predictions
# Example:
# predictions = model.transform(your_test_data)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")

predictions = model.transform(test_df)
test_accuracy=evaluator.evaluate(predictions)

In [None]:
test_accuracy

0.9795670011839399

### Normal and anomalous events

In [None]:
# Normal ve anormal olaylar için temel istatistiksel özet
normal_events = wind_farm_c_spark_joined.filter(wind_farm_c_spark_joined['event_label'] == 'normal')
anomalous_events = wind_farm_c_spark_joined.filter(wind_farm_c_spark_joined['event_label'] == 'anomaly')

In [None]:
normal_stats = normal_events.describe().show()

+-------+------------------+------------------+------------------+----------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+------------+------------+------------+------------+-----------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--

In [None]:
anomalous_stats = anomalous_events.describe().show()

+-------+------------------+------------------+------------------+----------+------------------+------------------+------------------+------------------+-----------------+--------------------+------------------+-------------------+-------------------+-------------------+-------------------+-----------------+--------------------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+------------------+-------------------+--------------------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+-

## LSTM

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import when

# Label sütununu oluştur
wind_farm_c_spark_joined = wind_farm_c_spark_joined.withColumn(
    "label", when(wind_farm_c_spark_joined["event_label"] == "normal", 1).otherwise(0)
)

# PySpark DataFrame'i Pandas'a dönüştür
df = wind_farm_c_spark_joined.select(
    "time_stamp", 'sensor_217_avg',
'sensor_226_avg',
'sensor_218_avg',
'sensor_230_avg',
'sensor_45_avg',
'sensor_112_avg',
'sensor_111_avg',
'sensor_227_avg',
'sensor_74_avg',
'sensor_90_avg',
'sensor_93_avg',
'sensor_213_avg',
'sensor_14_avg',
'sensor_91_avg',
'sensor_16_avg', "day_of_week", "label"
).orderBy("time_stamp").toPandas()

# Zaman sütununu datetime formatına çevir
df['time_stamp'] = pd.to_datetime(df['time_stamp'])
df = df.sort_values(by='time_stamp')

# Sensör ve zaman temelli özellikleri hazırlama
features = df[['sensor_217_avg',
'sensor_226_avg',
'sensor_218_avg',
'sensor_230_avg',
'sensor_45_avg',
'sensor_112_avg',
'sensor_111_avg',
'sensor_227_avg',
'sensor_74_avg',
'sensor_90_avg',
'sensor_93_avg',
'sensor_213_avg',
'sensor_14_avg',
'sensor_91_avg',
'sensor_16_avg', "day_of_week"]].values
labels = df["label"].values

# Sliding window oluşturma
def create_sequences(features, labels, window_size=10):
    X, y = [], []
    for i in range(len(features) - window_size):
        X.append(features[i:i+window_size])
        y.append(labels[i+window_size])
    return np.array(X), np.array(y)

# Sliding window ile veri seti oluştur
X, y = create_sequences(features, labels, window_size=10)

# Eğitim ve test setine ayır
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print("Eğitim verisi boyutu:", X_train.shape)
print("Test verisi boyutu:", X_test.shape)


Eğitim verisi boyutu: (438277, 10, 16)
Test verisi boyutu: (109570, 10, 16)


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

# LSTM Modeli
model = Sequential([
    LSTM(64, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True),
    Dropout(0.2),
    LSTM(32, return_sequences=False),
    Dropout(0.2),
    Dense(1, activation='sigmoid')  # Binary classification için sigmoid aktivasyon
])

# Modeli derle
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Modeli eğit
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=4,
    batch_size=64
)


  super().__init__(**kwargs)


Epoch 1/4
[1m6849/6849[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 7ms/step - accuracy: 0.8846 - loss: 0.2392 - val_accuracy: 0.9720 - val_loss: 0.0768
Epoch 2/4
[1m6849/6849[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 7ms/step - accuracy: 0.9646 - loss: 0.0893 - val_accuracy: 0.9780 - val_loss: 0.0524
Epoch 3/4
[1m6849/6849[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 7ms/step - accuracy: 0.9734 - loss: 0.0629 - val_accuracy: 0.9799 - val_loss: 0.0460
Epoch 4/4
[1m6849/6849[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 7ms/step - accuracy: 0.9762 - loss: 0.0551 - val_accuracy: 0.9797 - val_loss: 0.0451


### Evaluation

In [None]:
# prompt: find test accuracy of model (lstm model above)

# Evaluate the LSTM model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")

[1m3425/3425[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 3ms/step - accuracy: 0.9796 - loss: 0.0449
Test Loss: 0.04505384340882301
Test Accuracy: 0.9797298312187195


In [None]:
# prompt: show matches of labels and predictions counts for lstm model above

from sklearn.metrics import confusion_matrix
import numpy as np

# Predict on the test data
y_pred = model.predict(X_test)
y_pred = (y_pred > 0.5).astype(int) # Convert probabilities to binary predictions

# Compute the confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Print the confusion matrix
print("Confusion Matrix:")
print(cm)

# Extract TP, TN, FP, FN
TP = cm[1, 1]
TN = cm[0, 0]
FP = cm[0, 1]
FN = cm[1, 0]

print(f"\nTrue Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")

[1m3425/3425[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2ms/step
Confusion Matrix:
[[76156   954]
 [ 1267 31193]]

True Positives (TP): 31193
True Negatives (TN): 76156
False Positives (FP): 954
False Negatives (FN): 1267


### Save the model

In [None]:
# prompt: save the lstm model to path

model.save("/content/drive/MyDrive/wind_farm_c_lstm_model.h5")



## STREAMING
### STREAMING TRIAL WITH TEST DATA

In [None]:
import numpy as np
import time
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # Suppress TensorFlow INFO and WARNING logs

import tensorflow as tf
from sklearn.metrics import accuracy_score

# Load TensorFlow model
#model = tf.keras.models.load_model("lstm_trained_model.h5")

# Use X_test for streaming (loaded from your previous code)
print("X_test shape:", X_test.shape)

# Define streaming simulation from X_test
def stream_from_test_data(X_test, batch_size=10, interval=1):
    """Simulate streaming using X_test data."""
    for i in range(0, len(X_test), batch_size):
        batch = X_test[i:i + batch_size]
        yield batch, i  # Return batch and its starting index
        time.sleep(interval)

# Stream data from X_test
data_stream = stream_from_test_data(X_test, batch_size=10, interval=1)

# Function to process each streamed batch
def process_stream(data_stream):
    total_predictions = 0
    correct_predictions = 0

    for batch, batch_start_index in data_stream:
        # Predict using TensorFlow model
        predictions = model.predict(batch)

        # Convert predictions to binary labels
        predicted_labels = (np.array(predictions) > 0.5).astype(int)

        # Compare predictions with corresponding ground truth
        ground_truth = y_test[batch_start_index:batch_start_index + len(predicted_labels)]
        batch_accuracy = accuracy_score(ground_truth, predicted_labels)

        # Update running totals for global accuracy
        total_predictions += len(ground_truth)
        correct_predictions += (predicted_labels.flatten() == ground_truth).sum()

        # Output predictions and accuracy
        print(f"Batch {batch_start_index // 10 + 1}: Accuracy: {batch_accuracy:.2f}")
        for i, pred in enumerate(predictions):
            print(f"  Sequence {i + 1}: Prediction: {pred[0]} | Actual: {ground_truth[i]}")

    # Final global accuracy
    global_accuracy = correct_predictions / total_predictions
    print(f"Global Streaming Accuracy: {global_accuracy:.2f}")

# Start processing stream
try:
    process_stream(data_stream)
except KeyboardInterrupt:
    print("Streaming stopped.")


X_test shape: (109570, 10, 16)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
Batch 1: Accuracy: 0.90
  Sequence 1: Prediction: 5.9627785958582535e-05 | Actual: 0
  Sequence 2: Prediction: 3.3821283977886196e-06 | Actual: 0
  Sequence 3: Prediction: 0.5234358906745911 | Actual: 0
  Sequence 4: Prediction: 0.001333013060502708 | Actual: 0
  Sequence 5: Prediction: 8.828578756947536e-07 | Actual: 0
  Sequence 6: Prediction: 1.7378281427227193e-06 | Actual: 0
  Sequence 7: Prediction: 5.397406539486838e-07 | Actual: 0
  Sequence 8: Prediction: 3.5140695331392635e-07 | Actual: 0
  Sequence 9: Prediction: 1.5509087916143471e-06 | Actual: 0
  Sequence 10: Prediction: 0.9945386052131653 | Actual: 1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
Batch 2: Accuracy: 1.00
  Sequence 1: Prediction: 0.9999618530273438 | Actual: 1
  Sequence 2: Prediction: 1.405944090038247e-06 | Actual: 0
  Sequence 3: Prediction: 0.0004950721049681306 | Actual: 0
 

### LSTM AND RANDOM FOREST COMPARISON

In [None]:
# prompt: i had loaded two different models. lstm model and rf model. load them, test them, compare them. but go through my previous code

from pyspark.ml import PipelineModel

# Load the saved Random Forest model
rf_model = PipelineModel.load("/content/drive/MyDrive/wind_farm_c_model.h")

# Load the saved LSTM model
lstm_model = tf.keras.models.load_model("/content/drive/MyDrive/wind_farm_c_lstm_model.h5")

# ... (Your existing code for data preparation and test data (X_test, y_test)) ...


# Make predictions using the Random Forest model
rf_predictions = rf_model.transform(test_df)

# Evaluate Random Forest model
rf_evaluator = BinaryClassificationEvaluator(labelCol="label")
rf_accuracy = rf_evaluator.evaluate(rf_predictions)
print(f"Random Forest Test Accuracy: {rf_accuracy}")


# Make predictions using the LSTM model
lstm_predictions = lstm_model.predict(X_test)
lstm_predicted_labels = (lstm_predictions > 0.5).astype(int)

# Evaluate LSTM model
lstm_accuracy = accuracy_score(y_test, lstm_predicted_labels)
print(f"LSTM Test Accuracy: {lstm_accuracy}")

# Compare the models
print("\nModel Comparison:")
print(f"Random Forest Accuracy: {rf_accuracy}")
print(f"LSTM Accuracy: {lstm_accuracy}")

if rf_accuracy > lstm_accuracy:
    print("Random Forest model performs better.")
elif lstm_accuracy > rf_accuracy:
    print("LSTM model performs better.")
else:
    print("Both models have similar performance.")



Random Forest Test Accuracy: 0.9795649441814882
[1m3425/3425[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 2ms/step
LSTM Test Accuracy: 0.9797298530619695

Model Comparison:
Random Forest Accuracy: 0.9795649441814882
LSTM Accuracy: 0.9797298530619695
LSTM model performs better.


In [None]:
# prompt: compare the above me-odels with other metrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator # Importing MulticlassClassificationEvaluator
# ... (rest of your code) ...
# ... (your existing code) ...

# Evaluate the model using MulticlassClassificationEvaluator
evaluator_multi = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
test_accuracy_multi = evaluator_multi.evaluate(predictions)
print(f"Test Accuracy (Multiclass): {test_accuracy_multi}")

# Calculate precision, recall and F1-score
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
test_precision = evaluator_precision.evaluate(predictions)
print(f"Test Precision (Weighted): {test_precision}")

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
test_recall = evaluator_recall.evaluate(predictions)
print(f"Test Recall (Weighted): {test_recall}")

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
test_f1 = evaluator_f1.evaluate(predictions)
print(f"Test F1-score: {test_f1}")

# ... (rest of your code) ...

# Load the saved Random Forest model
rf_model = PipelineModel.load("/content/drive/MyDrive/wind_farm_c_model.h")

# Load the saved LSTM model
lstm_model = tf.keras.models.load_model("/content/drive/MyDrive/wind_farm_c_lstm_model.h5")

# ... (Your existing code for data preparation and test data (X_test, y_test)) ...

# Make predictions using the Random Forest model
rf_predictions = rf_model.transform(test_df)

# Evaluate Random Forest model using multiple metrics
rf_evaluator_accuracy = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC") #Example metric, change as needed
rf_accuracy = rf_evaluator_accuracy.evaluate(rf_predictions)
print(f"Random Forest Test Accuracy (Area Under ROC): {rf_accuracy}")

rf_evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
rf_precision = rf_evaluator_precision.evaluate(rf_predictions)
print(f"Random Forest Test Precision: {rf_precision}")


rf_evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
rf_recall = rf_evaluator_recall.evaluate(rf_predictions)
print(f"Random Forest Test Recall: {rf_recall}")

rf_evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
rf_f1 = rf_evaluator_f1.evaluate(rf_predictions)
print(f"Random Forest Test F1-score: {rf_f1}")


# Make predictions using the LSTM model
lstm_predictions = lstm_model.predict(X_test)
lstm_predicted_labels = (lstm_predictions > 0.5).astype(int)

# Evaluate LSTM model using multiple metrics
lstm_accuracy = accuracy_score(y_test, lstm_predicted_labels)
print(f"LSTM Test Accuracy: {lstm_accuracy}")

from sklearn.metrics import precision_score, recall_score, f1_score

lstm_precision = precision_score(y_test, lstm_predicted_labels)
print(f"LSTM Test Precision: {lstm_precision}")

lstm_recall = recall_score(y_test, lstm_predicted_labels)
print(f"LSTM Test Recall: {lstm_recall}")

lstm_f1 = f1_score(y_test, lstm_predicted_labels)
print(f"LSTM Test F1-score: {lstm_f1}")

# Compare the models based on multiple metrics (example)
print("\nModel Comparison:")
print(f"Random Forest Accuracy (AUC): {rf_accuracy}, Precision: {rf_precision}, Recall: {rf_recall}, F1: {rf_f1}")
print(f"LSTM Accuracy: {lstm_accuracy}, Precision: {lstm_precision}, Recall: {lstm_recall}, F1: {lstm_f1}")

Test Accuracy (Multiclass): 0.9230524642289348
Test Precision (Weighted): 0.929620182364162
Test Recall (Weighted): 0.9230524642289348
Test F1-score: 0.9180530529342907




Random Forest Test Accuracy (Area Under ROC): 0.9795689881749631
Random Forest Test Precision: 0.929620182364162
Random Forest Test Recall: 0.9230524642289348
Random Forest Test F1-score: 0.9180530529342907
[1m3425/3425[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2ms/step
LSTM Test Accuracy: 0.9797298530619695
LSTM Test Precision: 0.9703238249292313
LSTM Test Recall: 0.9609673444239063
LSTM Test F1-score: 0.9656229201170152

Model Comparison:
Random Forest Accuracy (AUC): 0.9795689881749631, Precision: 0.929620182364162, Recall: 0.9230524642289348, F1: 0.9180530529342907
LSTM Accuracy: 0.9797298530619695, Precision: 0.9703238249292313, Recall: 0.9609673444239063, F1: 0.9656229201170152


### Streaming with additional data

#### Prep the data

In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Load event info
event_info = pd.read_csv('Wind Farm C/comma_event_info.csv')
# Get a list of CSV files (limit to 10 files)
csv_files = ['comma_67.csv','comma_89.csv']

# Initialize an empty list to store DataFrames
dataframes = []

for file in csv_files:
    file_path = os.path.join('Wind Farm C/datasets', file)  # Full file path
    df = pd.read_csv(file_path)  # Read the CSV file

    # Extract `event_id` from the file name (e.g., 'comma_0' -> 0)
    event_id = int(file.split('_')[1].split('.')[0])
    df['event_id'] = event_id  # Add event ID column

    dataframes.append(df)  # Append the DataFrame to the list

# Concatenate all DataFrames
wind_farm_c_s = pd.concat(dataframes, ignore_index=True)

# Save combined DataFrame to CSV
wind_farm_c_s.to_csv('wind_farm_c_s.csv', index=False)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Wind Turbine Fault Detection") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

# Load the combined CSV into a Spark DataFrame
path = '/content/drive/MyDrive/wind_farm_c_s.csv'
streaming_df = spark.read.csv(path, header=True, inferSchema=True)

wind_farm_c_spark_streaming = streaming_df

# Check missing values for each column
wind_farm_c_spark_streaming.select(
    [(sum(col(c).isNull().cast("int"))).alias(c) for c in wind_farm_c_spark_streaming.columns]
).show()

wind_farm_c_spark_streaming.coalesce(1).write.csv("wind_farm_c_spark_streaming.csv", header=True, mode="overwrite")
event_info_spark=spark.createDataFrame(event_info)
wind_farm_c_spark_streaming_joined = wind_farm_c_spark_streaming.join(
    event_info_spark.select("event_id", "event_label", "event_description"),
    on="event_id",
    how="left")
# CSV dosyasını tek bir dosya olarak kaydetme
wind_farm_c_spark_streaming_joined.coalesce(1).write.option("header", "true").mode("overwrite").csv("/content/drive/MyDrive/wind_farm_c_joined_streaming.csv")

+----------+--------+---+----------+--------------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+------------+------------+----------

In [None]:
from pyspark.sql.functions import col, sum, mean, hour, dayofweek

wind_farm_c_spark_streaming_joined = wind_farm_c_spark_streaming_joined.withColumn("hour", hour("time_stamp"))
wind_farm_c_spark_streaming_joined = wind_farm_c_spark_streaming_joined.withColumn("day_of_week", dayofweek("time_stamp"))

+--------+-------------------+--------+---+----------+--------------+------------+------------+------------+------------+------------+------------+------------+------------+------------------+------------------+------------------+------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------------+-----------+-----------+------------------+------------------+-----------+-----------+------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+---

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import when

# Label sütununu oluştur
wind_farm_c_spark_streaming_joined = wind_farm_c_spark_streaming_joined.withColumn(
    "label", when(wind_farm_c_spark_streaming_joined["event_label"] == "normal", 1).otherwise(0)
)

# PySpark DataFrame'i Pandas'a dönüştür
df = wind_farm_c_spark_streaming_joined.select(
    "time_stamp", 'sensor_217_avg',
'sensor_226_avg',
'sensor_218_avg',
'sensor_230_avg',
'sensor_45_avg',
'sensor_112_avg',
'sensor_111_avg',
'sensor_227_avg',
'sensor_74_avg',
'sensor_90_avg',
'sensor_93_avg',
'sensor_213_avg',
'sensor_14_avg',
'sensor_91_avg',
'sensor_16_avg', "day_of_week", "label"
).orderBy("time_stamp").toPandas()

# Zaman sütununu datetime formatına çevir
df['time_stamp'] = pd.to_datetime(df['time_stamp'])
df = df.sort_values(by='time_stamp')

# Sensör ve zaman temelli özellikleri hazırlama
features = df[['sensor_217_avg',
'sensor_226_avg',
'sensor_218_avg',
'sensor_230_avg',
'sensor_45_avg',
'sensor_112_avg',
'sensor_111_avg',
'sensor_227_avg',
'sensor_74_avg',
'sensor_90_avg',
'sensor_93_avg',
'sensor_213_avg',
'sensor_14_avg',
'sensor_91_avg',
'sensor_16_avg', "day_of_week"]].values
labels = df["label"].values

# Sliding window oluşturma
def create_sequences(features, labels, window_size=10):
    X, y = [], []
    for i in range(len(features) - window_size):
        X.append(features[i:i+window_size])
        y.append(labels[i+window_size])
    return np.array(X), np.array(y)

# Sliding window ile veri seti oluştur
X, y = create_sequences(features, labels, window_size=10)

# Eğitim ve test setine ayır
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.9, random_state=42)

print("Eğitim verisi boyutu:", X_train.shape)
print("Test verisi boyutu:", X_test.shape)


Eğitim verisi boyutu: (11605, 10, 16)
Test verisi boyutu: (104451, 10, 16)


#### Streaming

In [None]:
import numpy as np
import time
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # Suppress TensorFlow INFO and WARNING logs

import tensorflow as tf
from sklearn.metrics import accuracy_score

# Load TensorFlow model
#model = tf.keras.models.load_model("lstm_trained_model.h5")

# Use X_test for streaming (loaded from your previous code)
print("X_test shape:", X_test.shape)

# Define streaming simulation from X_test
def stream_from_test_data(X_test, batch_size=10, interval=1):
    """Simulate streaming using X_test data."""
    for i in range(0, len(X_test), batch_size):
        batch = X_test[i:i + batch_size]
        yield batch, i  # Return batch and its starting index
        time.sleep(interval)

# Stream data from X_test
data_stream = stream_from_test_data(X_test, batch_size=10, interval=1)

# Function to process each streamed batch
def process_stream(data_stream):
    total_predictions = 0
    correct_predictions = 0

    for batch, batch_start_index in data_stream:
        # Predict using TensorFlow model
        predictions = model.predict(batch)

        # Convert predictions to binary labels
        predicted_labels = (np.array(predictions) > 0.5).astype(int)

        # Compare predictions with corresponding ground truth
        ground_truth = y_test[batch_start_index:batch_start_index + len(predicted_labels)]
        batch_accuracy = accuracy_score(ground_truth, predicted_labels)

        # Update running totals for global accuracy
        total_predictions += len(ground_truth)
        correct_predictions += (predicted_labels.flatten() == ground_truth).sum()

        # Output predictions and accuracy
        print(f"Batch {batch_start_index // 10 + 1}: Accuracy: {batch_accuracy:.2f}")
        for i, pred in enumerate(predictions):
            print(f"  Sequence {i + 1}: Prediction: {pred[0]} | Actual: {ground_truth[i]}")

    # Final global accuracy
    global_accuracy = correct_predictions / total_predictions
    print(f"Global Streaming Accuracy: {global_accuracy:.2f}")

# Start processing stream
try:
    process_stream(data_stream)
except KeyboardInterrupt:
    print("Streaming stopped.")


X_test shape: (104451, 10, 16)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 235ms/step
Batch 1: Accuracy: 1.00
  Sequence 1: Prediction: 0.9990140199661255 | Actual: 1
  Sequence 2: Prediction: 0.99935382604599 | Actual: 1
  Sequence 3: Prediction: 0.9999356269836426 | Actual: 1
  Sequence 4: Prediction: 0.00047663127770647407 | Actual: 0
  Sequence 5: Prediction: 0.001103456481359899 | Actual: 0
  Sequence 6: Prediction: 0.9989318251609802 | Actual: 1
  Sequence 7: Prediction: 0.0008966624736785889 | Actual: 0
  Sequence 8: Prediction: 0.0006182707147672772 | Actual: 0
  Sequence 9: Prediction: 0.9989946484565735 | Actual: 1
  Sequence 10: Prediction: 0.999980092048645 | Actual: 1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
Batch 2: Accuracy: 1.00
  Sequence 1: Prediction: 0.9991050362586975 | Actual: 1
  Sequence 2: Prediction: 0.998353123664856 | Actual: 1
  Sequence 3: Prediction: 0.9990894794464111 | Actual: 1
  Sequence 4: Prediction: 