# Pyspark Code

In [15]:
from pyspark.sql.functions import udf, regexp_extract
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np
import cv2
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GenderDetectionPipeline") \
    .getOrCreate()

@udf(returnType=VectorUDT())
def preprocess_image(content):
    img = cv2.imdecode(np.frombuffer(content, np.uint8), cv2.IMREAD_COLOR)
    resized = cv2.resize(img, (128, 128))
    gray = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY)
    return Vectors.dense(gray.flatten().tolist())

images_df = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load("dataset_small/{female,male}/*.jpg")

preprocessed_df = images_df.withColumn("features", preprocess_image("content"))
preprocessed_df = preprocessed_df.withColumn("gender", regexp_extract("path", "dataset_small/([^/]+)", 1))


# Save preprocessed images to a paraquet file
preprocessed_df.write.parquet("Preprocessed")

spark.stop()

# Airflow Code

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'gender_detection_pipeline',
    default_args=default_args,
    description='Gender Detection Pipeline',
    schedule_interval=timedelta(days=1),
)

def check_folder_exists(folder_path):
    if not os.path.exists(folder_path):
        raise ValueError(f"Folder {folder_path} does not exist")

check_folder_task = PythonOperator(
    task_id='check_folder_exists',
    python_callable=check_folder_exists,
    op_kwargs={'folder_path': 'include/dataset/{female,male}'},
    dag=dag,
)

preprocess_images_task = SparkSubmitOperator(
    task_id='preprocess_images',
    application='preprocess_images.py',
    conn_id='spark_default',
    dag=dag,
)

check_folder_task >> preprocess_images_task


# MLFlow

In [12]:
import mlflow
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import mlflow
import mlflow.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
import random
random.seed(1234)

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CNNExperiment") \
    .getOrCreate()

# Read the preprocessed Parquet data
preprocessed_data = spark.read.parquet("Preprocessed")

# Adding Label
preprocessed_df = preprocessed_data.withColumn("label",
                                        when(preprocessed_data.gender == "female", 0)
                                        .when(preprocessed_data.gender == "male", 1)
                                        .otherwise(None))
preprocessed_df = preprocessed_df.select("features", "label")

# Split the data into training and testing sets
(train_data, test_data) = preprocessed_df.randomSplit([0.8, 0.2], seed=42)
# Set up MLflow tracking
mlflow.set_tracking_uri("http://localhost:5000")  # Replace with your MLflow tracking server URI
mlflow.set_experiment("CNN_Experiment")

# Define the layer configurations to try
layer_configs = [
    [preprocessed_df.select("features").first()[0].size, 64, 32, 2],
    [preprocessed_df.select("features").first()[0].size, 32, 16, 2],
    [preprocessed_df.select("features").first()[0].size, 32, 32, 32, 2]
]

# Iterate over activation functions and layer configurations
for layers in layer_configs:
        # Create the CNN model
        stepSize = (random.random()+0.01)*0.1
        cnn = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234, solver="gd", stepSize=stepSize)
        
        # Train the model
        with mlflow.start_run() as run:
            # Log parameters
            mlflow.log_param("layers", layers)
            mlflow.log_param("maxIter", 100)
            mlflow.log_param("Step_size",stepSize)
            
            # Train the model
            model = cnn.fit(train_data)
            
            # Evaluate the model
            predictions = model.transform(test_data)
            evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
            accuracy = evaluator.evaluate(predictions)
            
            # Log metrics
            mlflow.log_metric("accuracy", accuracy)
            
            # Save the model to a file
            model_name = f"model_{layers[1]}_{len(layers)}_layers"
            model_path = f"model/{model_name}"
            model.write().overwrite().save(model_path)
            mlflow.spark.log_model(model, "model", registered_model_name=model_name)
            
            # Print the run ID for reference
            print(f"Run ID for {model_name}: {run.info.run_id}")

# Stop the SparkSession
spark.stop()


Registered model 'model_64_4_layers' already exists. Creating a new version of this model...
2024/05/16 09:56:40 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: model_64_4_layers, version 2
Created version '2' of model 'model_64_4_layers'.


Run ID for model_64_4_layers: 981b2e9f643f4511bc58d1372897dd27


Registered model 'model_32_4_layers' already exists. Creating a new version of this model...
2024/05/16 09:57:04 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: model_32_4_layers, version 2
Created version '2' of model 'model_32_4_layers'.


Run ID for model_32_4_layers: fd853578b93344beacbe6f053d4bd0d3


Registered model 'model_32_5_layers' already exists. Creating a new version of this model...
2024/05/16 09:57:29 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: model_32_5_layers, version 2
Created version '2' of model 'model_32_5_layers'.


Run ID for model_32_5_layers: 115c37b3fe6248b4b17b895eaeb076a3


In [13]:
import mlflow
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten

random.seed(1234)

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CNNExperiment") \
    .getOrCreate()

# Read the preprocessed Parquet data
preprocessed_data = spark.read.parquet("Preprocessed")

# Adding Label
preprocessed_df = preprocessed_data.withColumn("label",
                                        when(preprocessed_data.gender == "female", 0)
                                        .when(preprocessed_data.gender == "male", 1)
                                        .otherwise(None))
preprocessed_df = preprocessed_df.select("features", "label")

# Split the data into training and testing sets
(train_data, test_data) = preprocessed_df.randomSplit([0.8, 0.2], seed=42)

# Set up MLflow tracking
mlflow.set_tracking_uri("http://localhost:5000")  # Replace with your MLflow tracking server URI
mlflow.set_experiment("CNN_Experiment")

# Define the CNN model architecture
def create_cnn_model(input_shape,config):
    model = Sequential()
    model.add(Conv2D(config['filters'][0], (3, 3), activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(config['filters'][1], (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(config['filters'][2], (3, 3), activation='relu'))
    model.add(Flatten())
    model.add(Dense(config['dense_units'], activation='relu'))
    model.add(Dense(2, activation='softmax'))
    return model

# Iterate over different CNN configurations
cnn_configs = [
    {'filters': [32, 64, 64], 'dense_units': 64},
    {'filters': [16, 32, 64], 'dense_units': 32},
    {'filters': [8, 16, 32], 'dense_units': 16}
]

# Get the input shape from the preprocessed data
input_shape = (128, 128, 1)
i = 1

for config in cnn_configs:
    with mlflow.start_run() as run:
        # Create the CNN model
        model = create_cnn_model(input_shape,config)
        
        # Compile the model
        model.compile(optimizer='adam',
                      loss='sparse_categorical_crossentropy',
                      metrics=['accuracy'])
        
        # Log parameters
        mlflow.log_param("filters", config['filters'])
        mlflow.log_param("dense_units", config['dense_units'])
        
        # Convert Spark DataFrame to NumPy arrays
        train_data_np = np.array(train_data.select("features").collect())
        test_data_np = np.array(test_data.select("features").collect())
        train_data_np_lab = np.array(train_data.select("label").collect())
        test_data_np_lab = np.array(test_data.select("label").collect())
        
        X_train = train_data_np[:, 0].reshape(-1, input_shape[0], input_shape[1], 1)
        y_train = train_data_np_lab
        X_test = test_data_np[:, 0].reshape(-1, input_shape[0], input_shape[1], 1)
        y_test = test_data_np_lab
        
        # Train the model
        model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))
        
        # Evaluate the model
        _, accuracy = model.evaluate(X_test, y_test)
        
        # Log metrics
        mlflow.log_metric("accuracy", accuracy)
        
        # Save the model
        model_name = f"cnn_model_{i}_{config['filters']}_{config['dense_units']}"
        mlflow.keras.log_model(model, "model", registered_model_name=model_name)
        model.save(f"model/cnn_model_{i}.keras")
        i+=1
        
        # Print the run ID for reference
        print(f"Run ID for {model_name}: {run.info.run_id}")

# Stop the SparkSession
spark.stop()

  super().__init__(


Epoch 1/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 167ms/step - accuracy: 0.4998 - loss: 183.6264 - val_accuracy: 0.5096 - val_loss: 0.6951
Epoch 2/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 154ms/step - accuracy: 0.6017 - loss: 0.6879 - val_accuracy: 0.6178 - val_loss: 0.6778
Epoch 3/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 152ms/step - accuracy: 0.6754 - loss: 0.6100 - val_accuracy: 0.6369 - val_loss: 0.6461
Epoch 4/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 145ms/step - accuracy: 0.7227 - loss: 0.5269 - val_accuracy: 0.7134 - val_loss: 0.7413
Epoch 5/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 144ms/step - accuracy: 0.8137 - loss: 0.4627 - val_accuracy: 0.7261 - val_loss: 0.5616
Epoch 6/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 149ms/step - accuracy: 0.8940 - loss: 0.3135 - val_accuracy: 0.7580 - val_loss: 0.6424
Epoch 7/10
[1m21/21[0m 

Registered model 'cnn_model_1_[32, 64, 64]_64' already exists. Creating a new version of this model...
2024/05/17 08:46:39 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: cnn_model_1_[32, 64, 64]_64, version 2
Created version '2' of model 'cnn_model_1_[32, 64, 64]_64'.


Run ID for cnn_model_1_[32, 64, 64]_64: 81d5764a961e476eba9c72e1fc076b0f
Epoch 1/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 73ms/step - accuracy: 0.5100 - loss: 85.1658 - val_accuracy: 0.6115 - val_loss: 0.7678
Epoch 2/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 63ms/step - accuracy: 0.7896 - loss: 0.4571 - val_accuracy: 0.7389 - val_loss: 0.5261
Epoch 3/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 65ms/step - accuracy: 0.8881 - loss: 0.3055 - val_accuracy: 0.8344 - val_loss: 0.4668
Epoch 4/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 61ms/step - accuracy: 0.9421 - loss: 0.1785 - val_accuracy: 0.8280 - val_loss: 0.4853
Epoch 5/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 61ms/step - accuracy: 0.9533 - loss: 0.1421 - val_accuracy: 0.8471 - val_loss: 0.4637
Epoch 6/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 62ms/step - accuracy: 0.9757 - loss: 0.0786 - 

Successfully registered model 'cnn_model_2_[16, 32, 64]_32'.
2024/05/17 08:47:02 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: cnn_model_2_[16, 32, 64]_32, version 1
Created version '1' of model 'cnn_model_2_[16, 32, 64]_32'.


Run ID for cnn_model_2_[16, 32, 64]_32: 18a3e7d2ff294b9ab06861d17a2bb14d
Epoch 1/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 31ms/step - accuracy: 0.5484 - loss: 9.6225 - val_accuracy: 0.5159 - val_loss: 0.6924
Epoch 2/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 23ms/step - accuracy: 0.4829 - loss: 1.3847 - val_accuracy: 0.5096 - val_loss: 0.6931
Epoch 3/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - accuracy: 0.5067 - loss: 0.6931 - val_accuracy: 0.5096 - val_loss: 0.6931
Epoch 4/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 23ms/step - accuracy: 0.4946 - loss: 0.6932 - val_accuracy: 0.5096 - val_loss: 0.6931
Epoch 5/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 23ms/step - accuracy: 0.5002 - loss: 0.6932 - val_accuracy: 0.5096 - val_loss: 0.6931
Epoch 6/10
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 23ms/step - accuracy: 0.4828 - loss: 0.6933 - v

Successfully registered model 'cnn_model_3_[8, 16, 32]_16'.
2024/05/17 08:47:17 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: cnn_model_3_[8, 16, 32]_16, version 1
Created version '1' of model 'cnn_model_3_[8, 16, 32]_16'.


Run ID for cnn_model_3_[8, 16, 32]_16: 486f57494ed94b6c99959ff99dc0fadb


Fast API

In [None]:
import cv2
import numpy as np
from fastapi import FastAPI, Request, File, UploadFile
from fastapi.responses import JSONResponse
from tensorflow.keras.models import load_model
from PIL import Image
import uvicorn
from starlette.middleware.base import BaseHTTPMiddleware
from collections import defaultdict
from datetime import datetime, timedelta
from prometheus_client import Counter, Histogram, CollectorRegistry, generate_latest
from starlette.responses import Response

REQUESTS = Counter('http_requests_total', 'Total HTTP Requests', ['method', 'endpoint', 'status_code'])
REQUEST_LATENCY = Histogram('http_request_latency_seconds', 'HTTP Request Latency', ['method', 'endpoint'])


app = FastAPI()

# Load the trained Keras model
model_path = "model/cnn_model_2.keras"
model = load_model(model_path)

def preprocess_image(image):
    resized = cv2.resize(image, (128, 128))
    gray = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY)
    normalized = gray / 255.0
    reshaped = np.reshape(normalized, (1, 128, 128, 1))
    return reshaped

class RateLimitingMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, limit=1000, duration=60):
        super().__init__(app)
        self.limit = limit
        self.duration = duration
        self.request_counts = defaultdict(lambda: (0, datetime.now()))

    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host
        count, last_request_time = self.request_counts[client_ip]

        if count >= self.limit and (datetime.now() - last_request_time).total_seconds() < self.duration:
            return JSONResponse(
                status_code=429,
                content={"error": f"Rate limit exceeded for IP {client_ip}. Please try again later."}
            )

        self.request_counts[client_ip] = (count + 1, datetime.now())
        response = await call_next(request)
        return response

app.add_middleware(RateLimitingMiddleware)

@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
    method = request.method
    path = request.url.path

    with REQUEST_LATENCY.labels(method=method, endpoint=path).time():
        response = await call_next(request)

    status_code = response.status_code
    REQUESTS.labels(method=method, endpoint=path, status_code=status_code).inc()

    return response

@app.get("/metrics")
def metrics():
    registry = CollectorRegistry()
    registry.register(REQUESTS)
    registry.register(REQUEST_LATENCY)
    return Response(generate_latest(registry), media_type="text/plain")

@app.post("/predict")
async def predict_gender(request: Request, file: UploadFile = File(...)):
    # Read the uploaded image file
    img = Image.open(file.file)
    img = np.array(img)

    # Preprocess the image
    features = preprocess_image(img)

    # Make predictions using the loaded model
    prediction = model.predict(features)

    # Map the prediction to gender
    gender = 'female' if prediction[0][0] < 0.5 else 'male'

    # Get the client IP address
    client_ip = request.client.host

    return {"gender": gender}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

In [24]:
from pyspark.ml.classification import MultilayerPerceptronClassificationModel
import sys
import os
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GenderDetectionAPI") \
    .getOrCreate()

model_path = "model\\model_32_4_layers"
model_2 = MultilayerPerceptronClassificationModel.load(model_path)
os.path.join(sys.argv[1],model_path)

Py4JJavaError: An error occurred while calling o3818.load.
: java.lang.UnsatisfiedLinkError: 'org.apache.hadoop.io.nativeio.NativeIO$POSIX$Stat org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(java.lang.String)'
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:608)
	at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:934)
	at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:848)
	at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:816)
	at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)
	at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2199)
	at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2179)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1471)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1465)
	at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1506)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1506)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel$MultilayerPerceptronClassificationModelReader.load(MultilayerPerceptronClassifier.scala:383)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel$MultilayerPerceptronClassificationModelReader.load(MultilayerPerceptronClassifier.scala:376)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
