# **HYBRID MODEL FOR CHURN PREDICTION WITH EXPLAINABLE AI**

In [None]:
# import libraries
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_color_codes("pastel")
sns.set_style("whitegrid")
%matplotlib inline

from pyspark.sql import SparkSession, Window

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import min as Fmin
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import avg, col, concat, count, desc, asc, explode, lit, split, stddev, udf, isnan, when, rank, from_unixtime

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [None]:
# Read in full sparkify dataset
event_data = "/content/mini_sparkify_event_data.json"
df = spark.read.json(event_data)

# Load and Clean Dataset

In this notebook, the file name, `mini_sparkify_event_data.json`, will be loaded and cleaned such as handling of invalid or missing values.

The first five rows of the dataset.

In [None]:
df.head(5)

Schema information

* artist: Artist name (ex. Daft Punk)
* auth: User authentication status (ex. Logged)
* firstName: User first name (ex. Colin)
* gender: Gender (ex. F or M)
* itemInSession: Item count in a session (ex. 52)
* lastName: User last name (ex. Freeman)
* length: Length of song (ex. 223.60771)
* level: User plan (ex. paid)
* location: User's location (ex. Bakersfield)
* method: HTTP method (ex. PUT)
* page: Page name (ex. NextSong)
* registration: Registration timestamp (unix timestamp) (ex. 1538173362000)
* sessionId: Session ID (ex. 29)
* song: Song (ex. Harder Better Faster Stronger)
* status: HTTP status (ex. 200)
* ts: Event timestamp(unix timestamp) (ex. 1538352676000)
* userAgent: User's browswer agent (ex. Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0)
* userId: User ID (ex. 30)

In [None]:
df.printSchema()

## Statistics

Statistics of the whole dataset

In [None]:
df.describe().show()

Statistics of the `artist` column

In [None]:
df.describe('artist').show()

Statistics of the `sessionId` column

In [None]:
df.describe('sessionId').show()

Statistics of the `userId` column

In [None]:
df.describe('userId').show()

Total rows: 286,500

In [None]:
df.count()

All the `page` events in the dataset:

- About
- Add Friend
- Add to Playlist
- Cancel
- Cancellation Confirmation: **This even wil be used as a flag of churn**
- Downgrade
- Error
- Help
- Home
- Login
- Logout
- NextSong
- Register
- Roll Advert
- Save Settings
- Settings
- Submit Downgrade
- Submit Registration
- Submit Upgrade
- Thumbs Down
- Thumbs Up
- Upgrade

`page` kind

In [None]:
df.select("page").dropDuplicates().sort("page").show()

## missing values

In [None]:
def count_missing(df, col):
    """
    A helper function which count how many missing values in a colum of the dataset.

    This function is useful because the data can be either three cases below:

    1. NaN
    2. Null
    3. "" (empty string)
    """
    return df.filter((isnan(df[col])) | (df[col].isNull()) | (df[col] == "")).count()

Check how many missing values in each column

In [None]:
print("[missing values]\n")
for col in df.columns:
    missing_count = count_missing(df, col)
    if missing_count > 0:
        print("{}: {}".format(col, missing_count))

`userId` and `sessionId`

If the below Ids are null or empty, delete those rows:

* userId
* sessionId

In [None]:
df_without_missing_id = df.dropna(how = "any", subset = ["userId", "sessionId"])
df_without_missing_id = df_without_missing_id.filter(df["userId"] != "") # `userId` should not be empty string

In [None]:
print("df:                    {}".format(df.count()))
print("df_without_missing_id: {}".format(df_without_missing_id.count())) # no missing values

if df.count() == df_without_missing_id.count():
    print("No missing values with userId and sessionId")
else:
    print("{} rows have been removed.".format(df.count() - df_without_missing_id.count()))

# Exploratory Data Analysis

Detect number columns and category columns.

* num_cols: Number columns (Long or Double)
* cat_cols: Category columns (String)

In [None]:
num_cols = []
cat_cols = []

for s in df.schema:
    data_type = str(s.dataType)
    if data_type == "StringType":
        cat_cols.append(s.name)

    if data_type == "LongType" or data_type == "DoubleType":
        num_cols.append(s.name)

In [None]:
num_cols

In [None]:
cat_cols

## Number columns

In [None]:
df_without_missing_id.describe(num_cols).show()

There are three HTTP status codes:

* 307: Temporary Redirect
* 404: Not Found
* 200: OK

In [None]:
df_without_missing_id.select("status").dropDuplicates().show()

### Category columns

auth

In [None]:
df_without_missing_id.select("auth").dropDuplicates().show()

gender

In [None]:
df_without_missing_id.select("gender").dropDuplicates().show()

level

In [None]:
df_without_missing_id.select("level").dropDuplicates().show()

location (only showing top 10)

In [None]:
df_without_missing_id.select("location").dropDuplicates().show(10)

method

In [None]:
df_without_missing_id.select("method").dropDuplicates().show()

page

In [None]:
df_without_missing_id.select("page").dropDuplicates().show()

userAgent (only showing top 10)

In [None]:
df_without_missing_id.select("userAgent").dropDuplicates().show(10)

### Define Churn

Churn will be defined as when `Cancellation Confirmation` events happen, and users with the events are churned users in this analysis.

churn: `Cancellation Confirmation`

In [None]:
df_without_missing_id.filter("page = 'Cancellation Confirmation'").show(10)

In [None]:
flag_churned_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
df_churned = df_without_missing_id.withColumn("churned", flag_churned_event("page"))

churned rate (from total event logs)

In [None]:
churned_rate = df_churned.groupby("userId").agg({"churned": "sum"}).select(avg("sum(churned)")).collect()[0]["avg(sum(churned))"]
print("churned: {:.2f}%".format(churned_rate * 100))

In [None]:
df_churned.select(["userId", "gender", "level", "page", "status", "ts", "churned"]).show(30)

In [None]:
windowval = Window.partitionBy("userId").orderBy(asc("ts")).rangeBetween(Window.unboundedPreceding, 0)
df_phase = df_churned.withColumn("phase", Fsum('churned').over(windowval))
df_churn = df_phase.withColumn("churn", Fmax('churned').over(Window.partitionBy("userId")))

In [None]:
df_churn.select(["userId", "gender", "level", "page", "status", "ts", "churned", "phase", "churn"]).show(20)

In [None]:
df_churn.filter(df_churn["churn"] == 1).select(["userId", "gender", "level", "page", "status", "ts", "churned", "phase", "churn"]).show(20)

52 userIds were churned

In [None]:
churned_user_count = df_churn.filter(df_churn["churn"] == 1).select("userId").dropDuplicates().count()
print("churned user count: {} (total: {})".format(churned_user_count, df_churn.count()))
print("churned user rate: {:.2f}%".format(churned_user_count / df_churn.count() * 100))

### Explore Data

In this section, data exploration will be done comparing churned users with not churned users, inspecting if there are any big differences between the two groups.

The below columns will be examined:

* artist
  * [x] the number of artist
* [x] gender: 0 or 1
* length
  * [x] the total length
* [x] level: 0 or 1
* page
  * [x] the number of `Thumbs Up`
  * [x] the number of `Thumbs Down`
* song
  * [x] the number of song

Define a common function to convert churn value (0 or 1) to `Not Churn` or `Churn`

Both matplotlib and seaborn plot libraries require pandas dataframe, not pyspark dataframe, so I need to convert the pyspark dataframe to pandas one. I do this conversion every time for a small subset of the dataset because if I do this conversion for all the dataset, it takes time and causes an error.

In [None]:
func_churn_label = udf(lambda x: 'Churn' if x == 1 else 'Not Churn')

In [None]:
df_churn_user = df_churn.groupby("userId").max("churn").withColumnRenamed("max(churn)", "churn").select(["userId", "churn"])

gender

In [None]:
pd_gender = df_churn.select(["userId", "gender", "churn"]).withColumn("churn", func_churn_label("churn")).toPandas()
pd_gender.head()

In [None]:
sns.countplot(x="gender", hue="churn", data=pd_gender);

level

In [None]:
pd_level = df_churn.select(["userId", "level", "churn"]).withColumn("churn", func_churn_label("churn")).toPandas()
pd_level.head()

In [None]:
sns.countplot(x="level", hue="churn", data=pd_level);

artist

In [None]:
pd_artist = df_churn_user.join(df_churn.groupby("userId") \
                                    .agg({"artist": "count"}) \
                                    .withColumnRenamed("count(artist)", "artist_count"), ["userId"]) \
                         .withColumn("churn", func_churn_label("churn")).toPandas()
pd_artist.head()

In [None]:
sns.boxplot(x="churn", y="artist_count", data=pd_artist);

song

In [None]:
pd_song = df_churn_user.join(df_churn.groupby("userId") \
                                     .agg({"song": "count"}) \
                                     .withColumnRenamed("count(song)", "song_count"), ["userId"]) \
                       .withColumn("churn", func_churn_label("churn")).toPandas()
pd_song.head()

In [None]:
sns.boxplot(x="churn", y="song_count", data=pd_song);

length

In [None]:
pd_length = df_churn_user.join(df_churn.groupby("userId") \
                                       .agg({"length": "sum"}) \
                                       .withColumnRenamed("sum(length)", "total_length"), ["userId"]) \
                          .withColumn("churn", func_churn_label("churn")).toPandas()
pd_length.head()

In [None]:
sns.boxplot(x="churn", y="total_length", data=pd_length);

page: total visits

In [None]:
pd_visit = df_churn_user.join(df_churn.groupby("userId") \
                                      .count() \
                                      .withColumnRenamed("count", "visit_count"), ["userId"]) \
                         .withColumn("churn", func_churn_label("churn")).toPandas()
pd_visit.head()

In [None]:
sns.boxplot(x="churn", y="visit_count", data=pd_visit);

page: Thumbs Up / Thumbs Down

up

In [None]:
pd_up = df_churn_user.join(df_churn.filter((df_churn["page"] == 'Thumbs Up')) \
                                   .groupby("userId") \
                                   .count() \
                                   .withColumnRenamed("count", "up_count"), ["userId"]) \
                     .withColumn("churn", func_churn_label("churn")).toPandas()
pd_up.head()

In [None]:
sns.boxplot(x="churn", y="up_count", data=pd_up);

down

In [None]:
pd_down = df_churn_user.join(df_churn.filter((df_churn["page"] == 'Thumbs Down')) \
                                   .groupby("userId") \
                                   .count() \
                                   .withColumnRenamed("count", "down_count"), ["userId"]) \
                     .withColumn("churn", func_churn_label("churn")).toPandas()
pd_down.head()

In [None]:
sns.boxplot(x="churn", y="down_count", data=pd_down);

# Feature Engineering

### Feature Engineering Ideas

* artist
  * [x] the number of artist
* [x] gender: 0 or 1
* length
  * [x] the total length
* [x] level: 0 or 1
* page
  * [x] the number of `Thumbs Up`
  * [x] the number of `Thumbs Down`
* song
  * [x] the number of song

In [None]:
df_churn.show(1)

Original dataframe to be merged later

In [None]:
df_original = df_churn.groupby('userId').max("churn").withColumnRenamed("max(churn)", "target")

In [None]:
df_original.show(10)

artist count per userId

In [None]:
user_artist = df_churn.groupby("userId").agg({"artist": "count"}).withColumnRenamed("count(artist)", "artist_count")
user_artist.show(5)

gender

In [None]:
flag_gender = udf(lambda x: 1 if x == "F" else 0, IntegerType())
df_churn_with_gender = df_churn.withColumn("gender", flag_gender("gender"))
df_churn_with_gender.show(3)

In [None]:
user_gender = df_churn_with_gender.groupby('userId').agg({"gender": "max"}).withColumnRenamed("max(gender)", "gender")
user_gender.show(5)

length

In [None]:
user_length = df_churn.groupby('userId').agg({"length": "sum"}).withColumnRenamed("sum(length)", "length")
user_length.show(5)

Page

* Thumbs Up
* Thumbs Down

In [None]:
user_thumbs_up = df_churn.filter(df_churn["page"] == 'Thumbs Up').groupby('userId').count().withColumnRenamed("count", "thumb_up")
user_thumbs_up.show(5)

In [None]:
user_thumbs_down = df_churn.filter(df_churn["page"] == 'Thumbs Down').groupby('userId').count().withColumnRenamed("count", "thumb_down")
user_thumbs_down.show(5)

level

In [None]:
flag_level = udf(lambda x: 1 if x == "paid" else 0, IntegerType())
df_churn_with_level = df_churn.withColumn("level", flag_level("level"))
df_churn_with_level.show(1)

In [None]:
user_level = df_churn_with_level.groupby('userId').agg({"level": "max"}).withColumnRenamed("max(level)", "level")
user_level.show(5)

song count per userId

In [None]:
user_song = df_churn.groupby("userId").agg({"song": "count"}).withColumnRenamed("count(song)", "song_count")
user_song.show(5)

Join all the features

In [None]:
merged_df = df_original.join(user_artist, ['userId']) \
    .join(user_gender, ['userId']) \
    .join(user_length, ['userId']) \
    .join(user_level, ['userId']) \
    .join(user_thumbs_up, ['userId']) \
    .join(user_thumbs_down, ['userId']) \
    .join(user_song, ['userId'])

In [None]:
merged_df.show(20)

In [None]:
# Display the schema of merged_df, which includes column names and data types
merged_df.printSchema()


In [None]:
from pyspark.sql.types import IntegerType

# Cast userId to IntegerType
merged_df = merged_df.withColumn("userId", merged_df["userId"].cast(IntegerType()))

# Verify the schema to confirm the change
merged_df.printSchema()


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import ChiSquareTest

# List of feature columns to include in Chi-Square Test
feature_columns = [ 'userId', 'target', 'artist_count', 'gender', 'length', 'level', 'thumb_up', 'thumb_down', 'song_count']  # Adjust based on actual columns

# Assemble the features into a single feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_vector = assembler.transform(merged_df)


In [None]:
import pandas as pd

# Perform Chi-Square Test on the assembled data
chi_square_test = ChiSquareTest.test(df_vector, 'features', 'target')
chi_square_results = chi_square_test.head()  # Get the first result (there should only be one in the output)

# Creating a dictionary to hold the Chi-Square results
chi_square_data = {
    'Feature': feature_columns,
    'Chi-Square Statistic': chi_square_results['statistics'],
    'p-value': chi_square_results['pValues'],
    'Degrees of Freedom': chi_square_results['degreesOfFreedom']
}

# Convert the dictionary to a Pandas DataFrame for a tabular view
chi_square_df = pd.DataFrame(chi_square_data)

# Display the results
print(chi_square_df)


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize a Logistic Regression model
lr = LogisticRegression(labelCol='target', featuresCol='features')

# Evaluation metric
evaluator = BinaryClassificationEvaluator(labelCol='target', rawPredictionCol='prediction', metricName='areaUnderROC')

# Function to perform SFS
def sequential_feature_selection(data, feature_columns, target_col, max_features):
    selected_features = []
    remaining_features = feature_columns.copy()

    for _ in range(max_features):
        best_feature = None
        best_score = 0

        for feature in remaining_features:
            # Create the feature vector
            current_features = selected_features + [feature]
            assembler = VectorAssembler(inputCols=current_features, outputCol='features')
            df_vector = assembler.transform(data)

            # Train the model
            model = lr.fit(df_vector)
            predictions = model.transform(df_vector)

            # Evaluate the model
            score = evaluator.evaluate(predictions)

            # Select the best feature based on the score
            if score > best_score:
                best_score = score
                best_feature = feature

        # Update selected and remaining features
        if best_feature is not None:
            selected_features.append(best_feature)
            remaining_features.remove(best_feature)
            print(f'Selected feature: {best_feature}, Score: {best_score}')

    return selected_features

# Define your feature columns and target column
feature_columns = [ 'artist_count', 'gender', 'length', 'level', 'thumb_up', 'thumb_down', 'song_count']
target_col = 'target'

# Perform SFS with a maximum of 3 features
best_features = sequential_feature_selection(merged_df, feature_columns, target_col, max_features=9)

print("Best Features Selected: ", best_features)


In [None]:
import numpy as np
import pandas as pd
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Convert the PySpark DataFrame to a Pandas DataFrame
# This assumes merged_df is your PySpark DataFrame
merged_df_pd = merged_df.toPandas()  # Convert to Pandas DataFrame

# Prepare the data
features = ['artist_count', 'gender', 'length', 'level', 'thumb_up', 'thumb_down', 'song_count']
target = 'target'  # Replace with your actual target column name

X = merged_df_pd[features].values
y = merged_df_pd[target].values

# Split the data into training and test sets (70% train, 30% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Reshape the data for LSTM [samples, time steps, features]
# Here, we treat each feature set as a time step of 1
X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], X_train_scaled.shape[1], 1))
X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], X_test_scaled.shape[1], 1))

# Define the LSTM model
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(X_train_reshaped.shape[1], 1)))
model.add(Dropout(0.4))
model.add(LSTM(50))
model.add(Dropout(0.4))
model.add(Dense(1, activation='sigmoid'))  # Assuming binary classification

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_train_reshaped, y_train, epochs=50, batch_size=32, validation_split=0.2)

# Evaluate the model
loss, accuracy = model.evaluate(X_test_reshaped, y_test)
print(f'Test Accuracy: {accuracy:.4f}')


In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM, GRU, Dense, Dropout, Input
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Generate synthetic data for demonstration
def generate_data(num_samples, timesteps, features):
    # Random data for example purposes
    X = np.random.rand(num_samples, timesteps, features)
    y = np.random.randint(0, 2, num_samples)  # Binary labels
    return X, y

# Parameters
num_samples = 1000  # Number of samples
timesteps = 10      # Number of time steps in each sample
features = 5        # Number of features at each time step

# Generate data
X, y = generate_data(num_samples, timesteps, features)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Normalize the data (optional, depending on your dataset)
scaler = StandardScaler()
X_train_reshaped = X_train.reshape(-1, timesteps * features)
X_test_reshaped = X_test.reshape(-1, timesteps * features)
X_train_scaled = scaler.fit_transform(X_train_reshaped).reshape(X_train.shape)
X_test_scaled = scaler.transform(X_test_reshaped).reshape(X_test.shape)

# Create the Sequential model
model = Sequential()

# Input Layer
model.add(Input(shape=(timesteps, features)))

# Add an LSTM layer
model.add(LSTM(units=64, return_sequences=True))

# Add a GRU layer
model.add(GRU(units=32, return_sequences=False))

# Optional: Add a Dropout layer to prevent overfitting
model.add(Dropout(0.2))

# Output layer
model.add(Dense(units=1, activation='sigmoid'))  # For binary classification

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Summary of the model
model.summary()

# Train the model
history = model.fit(X_train_scaled, y_train, validation_split=0.2, epochs=50, batch_size=32)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test_scaled, y_test)
print(f'Test Accuracy: {test_accuracy:.4f}')

# Plotting training history (optional)
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.show()

plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, GRU, Dense, Dropout, Input
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from keras.callbacks import EarlyStopping

# Generate synthetic data for demonstration
def generate_data(num_samples, timesteps, features):
    # Random data for example purposes
    X = np.random.rand(num_samples, timesteps, features)
    y = np.random.randint(0, 2, num_samples)  # Binary labels
    return X, y

# Parameters
num_samples = 1000  # Number of samples
timesteps = 10      # Number of time steps in each sample
features = 5        # Number of features at each time step

# Generate data
X, y = generate_data(num_samples, timesteps, features)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Normalize the data (optional, depending on your dataset)
scaler = StandardScaler()
X_train_reshaped = X_train.reshape(-1, timesteps * features)
X_test_reshaped = X_test.reshape(-1, timesteps * features)
X_train_scaled = scaler.fit_transform(X_train_reshaped).reshape(X_train.shape)
X_test_scaled = scaler.transform(X_test_reshaped).reshape(X_test.shape)

# Create the Sequential model
model = Sequential()

# Input Layer
model.add(Input(shape=(timesteps, features)))

# Add an LSTM layer
model.add(LSTM(units=64, return_sequences=True))
model.add(Dropout(0.2))  # Dropout after LSTM

# Add a GRU layer
model.add(GRU(units=32, return_sequences=False))
model.add(Dropout(0.2))  # Dropout after GRU

# Output layer
model.add(Dense(units=1, activation='sigmoid'))  # For binary classification

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Summary of the model
model.summary()

# Set up early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model
history = model.fit(X_train_scaled, y_train, validation_split=0.2, epochs=50, batch_size=32, callbacks=[early_stopping])

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test_scaled, y_test)
print(f'Test Accuracy: {test_accuracy:.4f}')

# Plotting training history (optional)
plt.figure(figsize=(12, 5))

# Accuracy plot
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()

# Loss plot
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, GRU, Dense, Dropout, Input, Bidirectional
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from keras.callbacks import EarlyStopping

# Generate synthetic data for demonstration
def generate_data(num_samples, timesteps, features):
    X = np.random.rand(num_samples, timesteps, features)
    y = np.random.randint(0, 2, num_samples)  # Binary labels
    return X, y

# Parameters
num_samples = 1000
timesteps = 10
features = 5

# Generate data
X, y = generate_data(num_samples, timesteps, features)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Normalize the data
scaler = StandardScaler()
X_train_reshaped = X_train.reshape(-1, timesteps * features)
X_test_reshaped = X_test.reshape(-1, timesteps * features)
X_train_scaled = scaler.fit_transform(X_train_reshaped).reshape(X_train.shape)
X_test_scaled = scaler.transform(X_test_reshaped).reshape(X_test.shape)

# Create the Sequential model
model = Sequential()

# Input Layer
model.add(Input(shape=(timesteps, features)))

# Bidirectional LSTM Layer
model.add(Bidirectional(LSTM(units=64, return_sequences=True)))
model.add(Dropout(0.3))

# Bidirectional GRU Layer
model.add(Bidirectional(GRU(units=32, return_sequences=False)))
model.add(Dropout(0.3))

# Output layer
model.add(Dense(units=1, activation='sigmoid'))

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model
history = model.fit(X_train_scaled, y_train, validation_split=0.2, epochs=50, batch_size=32, callbacks=[early_stopping])

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test_scaled, y_test)
print(f'Test Accuracy: {test_accuracy:.4f}')

# Plotting training history
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()

plt.tight_layout()
plt.show()
