In [1]:

from pyspark.sql.types import ArrayType, FloatType, IntegerType, StringType, StructType, StructField
from pyspark.ml.feature import SQLTransformer
from pyspark.sql import SparkSession
from tensorflow.keras.models import load_model
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np
from PIL import Image
import os
import cloudpickle

2024-12-23 01:24:02.479093: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-23 01:24:02.492470: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1734909842.506667   10898 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1734909842.510319   10898 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-23 01:24:02.525180: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
spark: SparkSession = SparkSession.builder.appName("Traffic Signs Classification Streaming") \
    .master("local[*]") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.python.worker.memory", "12g") \
    .config("spark.executor.pyspark.memory", "12g") \
    .config("spark.rpc.message.maxSize", "128") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.sql.streaming.checkpointLocation", "./tmp") \
    .getOrCreate()

model = load_model('best_model.keras')
serialized_model = cloudpickle.dumps(model)
broadcast_model = spark.sparkContext.broadcast(serialized_model)

dataset_path = "./dataset"
def process_image(img_path, roi_x1, roi_y1, roi_x2, roi_y2):
    try:
        with Image.open(os.path.join(dataset_path, img_path)) as img:
            cropped_img = img.crop((roi_x1, roi_y1, roi_x2, roi_y2))
            resized_img = np.array(cropped_img.resize((32, 32), resample=Image.Resampling.LANCZOS))
            return resized_img / 255.0
    except Exception as e:
        print(f"Error processing image {img_path}: {e}")
        return [0.0] * (32 * 32 * 3)

def predict_image(features):
    features = np.expand_dims(features, axis=0)
    predictions = cloudpickle.loads(broadcast_model.value).predict(features, verbose=0)
    predicted_class = np.argmax(predictions, axis=1)
    return predicted_class[0]

def image_to_vector(img_features):
    return Vectors.dense(img_features)

spark.udf.register("process_image", process_image, ArrayType(FloatType()))
spark.udf.register("predict_image", predict_image, IntegerType())
spark.udf.register("image_to_vector", image_to_vector, VectorUDT())

schema = StructType([
    StructField("Width", IntegerType(), True),
    StructField("Height", IntegerType(), True),
    StructField("Roi.X1", IntegerType(), True),
    StructField("Roi.Y1", IntegerType(), True),
    StructField("Roi.X2", IntegerType(), True),
    StructField("Roi.Y2", IntegerType(), True),
    StructField("ClassId", IntegerType(), True),
    StructField("Path", StringType(), True)
])

input_stream = spark.readStream.option("header", "true").schema(schema).csv(os.path.join(dataset_path, "streaming"))

image_transformer = SQLTransformer(statement='SELECT *, process_image(Path, `Roi.X1`, `Roi.Y1`, `Roi.X2`, `Roi.Y2`) features FROM __THIS__')
processed_stream = image_transformer.transform(input_stream)

prediction_transformer = SQLTransformer(statement='SELECT *, predict_image(features) prediction FROM __THIS__')
prediction_stream = prediction_transformer.transform(processed_stream).select("Path", "ClassId", "prediction")

output_stream = prediction_stream.writeStream.select("prediction").outputMode("append").format("parquet").option("path", "output").start()
output_stream.awaitTermination()


your 131072x1 screen size is bogus. expect trouble
24/12/23 01:24:05 WARN Utils: Your hostname, DESKTOP-SMHNFU4 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/12/23 01:24:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/23 01:24:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
I0000 00:00:1734909846.506946   10898 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 7537 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3080, pci bus id: 0000:01:00.0, compute capability: 8.6


AttributeError: 'DataStreamWriter' object has no attribute 'select'