In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import cv2
import numpy as np
import matplotlib.pyplot as plt

In [None]:
spark = SparkSession.builder \
    .appName("Image Classification") \
    .getOrCreate()

In [None]:
hdfs_path = "hdfs://namenode:9000"
image_files = spark.read.format("text").load(hdfs_path)


In [None]:
image_df = image_files.select(split(image_files.value, "/").getItem(0).alias("filename"))


In [None]:
image_df = image_df.withColumn("age", split(image_df.filename, "_").getItem(0).cast("int")) \
                   .withColumn("gender", split(image_df.filename, "_").getItem(1)) \
                   .withColumn("race", split(image_df.filename, "_").getItem(2))


In [None]:
image_df.show()

In [None]:
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
race_indexer = StringIndexer(inputCol="race", outputCol="race_index")


In [None]:
classifier = RandomForestClassifier(featuresCol="features", labelCol="gender_index")


In [None]:
pipeline = Pipeline(stages=[gender_indexer, race_indexer, classifier])


In [None]:
def read_image(image_path):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
    image = cv2.resize(image, (128, 128))  # Resize image
    return image.flatten()


In [None]:
features = []
labels = []
for row in image_df.collect():
    img_path = os.path.join(hdfs_path, row.filename)
    image_data = read_image(img_path)
    features.append(image_data)
    labels.append(row.gender)  # Or row.race based on what you're classifying


In [None]:
feature_df = spark.createDataFrame(zip(features, labels), ["features", "label"])


In [None]:
model = pipeline.fit(feature_df)


In [None]:
predictions = model.transform(feature_df)
predictions.select("label", "prediction").show()


In [None]:
sample_image = features[0].reshape(128, 128, 3)  # Adjust based on your resize dimensions
plt.imshow(sample_image)
plt.title(f"Predicted Gender: {predictions.collect()[0].prediction}")
plt.axis('off')
plt.show()


In [None]:
X = data.drop(columns=['Age'])  # Replace with your feature columns
y = data['Age']


In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [None]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)


In [None]:
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)


In [None]:
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse}")


In [None]:
new_person_data = scaler.transform([[feature1_value, feature2_value, ...]])  # Replace with new data
predicted_age = model.predict(new_person_data)
print(f"Predicted Age: {predicted_age}")