In [None]:
from pyspark.sql import SparkSession
import cv2
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType, StringType, StructType, StructField
import os
import pandas as pd

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ResizeImages") \
    .getOrCreate()

In [None]:
# Path to the main directory containing "true" and "fake" subdirectories
input_directory = '/dbfs/mnt/projectteam5/New_Dataset/'

# Path to the directory where resized images will be saved (local file system path)
output_directory = '/dbfs/mnt/projectteam5/Pyspark_output/'

In [None]:
# Define the resize function
def resize_image(image_path, width=250, height=250):
    # Read the image using OpenCV
    img = cv2.imread(image_path)
    # Resize the image
    resized_img = cv2.resize(img, (width, height))
    # Encode the resized image back into binary format
    retval, buffer = cv2.imencode('.jpg', resized_img)
    resized_image_data = buffer.tobytes()
    return resized_image_data

# Register the UDF (User Defined Function)
resize_image_udf = udf(resize_image, BinaryType())

In [None]:
# List of image file paths in the input directory
image_paths = []
for label in os.listdir(input_directory):
    label_dir = os.path.join(input_directory, label)
    if os.path.isdir(label_dir):
        for file in os.listdir(label_dir):
            image_paths.append(os.path.join(label_dir, file))

In [None]:
# Create a schema for the DataFrame
schema = StructType([StructField("image_path", StringType(), nullable=True)])

In [None]:
# Create a DataFrame with the image file paths and schema
image_paths_df = spark.createDataFrame([(path,) for path in image_paths], schema)

In [None]:
# Apply the resize function to the DataFrame
resized_images_df = image_paths_df.withColumn("resized_image", resize_image_udf(image_paths_df["image_path"]))

In [None]:
# Convert the DataFrame to a Pandas DataFrame
pandas_df = resized_images_df.toPandas()

In [None]:
# Save the resized images using OpenCV
for index, row in pandas_df.iterrows():
    image_data = np.frombuffer(row['resized_image'], dtype=np.uint8)
    img = cv2.imdecode(image_data, cv2.IMREAD_COLOR)
    label = os.path.basename(os.path.dirname(row['image_path']))
    cv2.imwrite(os.path.join(output_directory, f"{label}_resized_image_{index}.jpg"), img)

In [None]:
resized_images_df.show()

In [None]:
spark.stop()