In [1]:
import numpy as np
import tensorflow as tf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, IntegerType
import pandas as pd


# Load CIFAR-10 Dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

# Flatten the labels
y_train = y_train.flatten()
y_test = y_test.flatten()


2024-05-19 17:35:21.834107: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-19 17:35:21.837907: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-19 17:35:21.880727: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
def preprocessing_using_pyspark(x_train_batch, y_train_batch, x_test_batch, y_test_batch):
    
    number_train_images = x_train_batch.shape[0]
    number_test_images = x_test_batch.shape[0]
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("CIFAR10Preprocessing").getOrCreate()
    
    #print(x_train_batch.shape, x_test_batch.shape)
    
    # Prepare data as a list of tuples
    train_data = [(x_train_batch[j].tolist(), int(y_train_batch[j])) for j in range(len(y_train_batch))]
    test_data = [(x_test_batch[j].tolist(), int(y_test_batch[j])) for j in range(len(y_test_batch))]
    
    # Define the schema
    schema = StructType([
        StructField('image', ArrayType(ArrayType(ArrayType(IntegerType()))), False),
        StructField('label', IntegerType(), False)
    ])
    
    # Create Spark DataFrames
    train_df = spark.createDataFrame(train_data, schema)
    test_df = spark.createDataFrame(test_data, schema)
    
    # Define the normalization UDF
    def normalize_images(image):
        image = np.array(image).astype('float32') / 255.0  # Normalization step
        return image.tolist()
    
    normalize_udf = udf(normalize_images, ArrayType(ArrayType(ArrayType(FloatType()))))
    
    # Apply the normalization UDF
    train_df = train_df.withColumn('image', normalize_udf(train_df['image']))
    test_df = test_df.withColumn('image', normalize_udf(test_df['image']))
    
    train_numpy =np.array(train_df.select('image').collect())
    test_numpy =np.array(test_df.select('image').collect())

    train_numpy = train_numpy.reshape(number_train_images,32,32,3)
    test_numpy = test_numpy.reshape(number_test_images,32,32,3)
    
    #print(train_df_numpy.shape)
    
    # Stop the Spark session
    spark.stop()

    return [train_numpy,test_numpy]

In [3]:
# Batch-wise data processing
for i in range(7):
    print('Batch',i)
    if i<6:
        train_batch_numpy,test_batch_numpy = preprocessing_using_pyspark(x_train[i*8000:(i+1)*8000], y_train[i*8000:(i+1)*8000], x_test[i*1000:(i+1)*1000],y_test[i*1000:(i+1)*1000])
    else:
        train_batch_numpy,test_batch_numpy = preprocessing_using_pyspark(x_train[i*8000:50000], y_train[i*8000:50000], x_test[i*1000:10000],y_test[i*1000:10000])
        
    if i==0:
        x_train_normalized = train_batch_numpy
        x_test_normalized = test_batch_numpy
    else:
        x_train_normalized = np.concatenate((x_train_normalized, train_batch_numpy), axis=0)
        x_test_normalized = np.concatenate((x_test_normalized, test_batch_numpy), axis=0)

    print('Current shape of train data:',x_train_normalized.shape)
    print('Current shape of test data:',x_test_normalized.shape)

Batch 0


24/05/19 17:35:39 WARN Utils: Your hostname, dell-Inspiron-5584 resolves to a loopback address: 127.0.1.1; using 192.168.38.254 instead (on interface wlp2s0)
24/05/19 17:35:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/19 17:35:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/19 17:36:05 WARN TaskSetManager: Stage 0 contains a task of very large size (10141 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:36:43 WARN TaskSetManager: Stage 1 contains a task of very large size (1274 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (8000, 32, 32, 3)
Current shape of test data: (1000, 32, 32, 3)
Batch 1


24/05/19 17:37:10 WARN TaskSetManager: Stage 0 contains a task of very large size (10141 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:37:49 WARN TaskSetManager: Stage 1 contains a task of very large size (1274 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (16000, 32, 32, 3)
Current shape of test data: (2000, 32, 32, 3)
Batch 2


24/05/19 17:38:14 WARN TaskSetManager: Stage 0 contains a task of very large size (10141 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:38:53 WARN TaskSetManager: Stage 1 contains a task of very large size (1274 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (24000, 32, 32, 3)
Current shape of test data: (3000, 32, 32, 3)
Batch 3


24/05/19 17:39:19 WARN TaskSetManager: Stage 0 contains a task of very large size (10141 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:39:57 WARN TaskSetManager: Stage 1 contains a task of very large size (1274 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (32000, 32, 32, 3)
Current shape of test data: (4000, 32, 32, 3)
Batch 4


24/05/19 17:40:24 WARN TaskSetManager: Stage 0 contains a task of very large size (10141 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:41:02 WARN TaskSetManager: Stage 1 contains a task of very large size (1274 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (40000, 32, 32, 3)
Current shape of test data: (5000, 32, 32, 3)
Batch 5


24/05/19 17:41:31 WARN TaskSetManager: Stage 0 contains a task of very large size (10141 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:42:08 WARN TaskSetManager: Stage 1 contains a task of very large size (1274 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (48000, 32, 32, 3)
Current shape of test data: (6000, 32, 32, 3)
Batch 6


24/05/19 17:42:29 WARN TaskSetManager: Stage 0 contains a task of very large size (2540 KiB). The maximum recommended task size is 1000 KiB.
24/05/19 17:42:38 WARN TaskSetManager: Stage 1 contains a task of very large size (5074 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Current shape of train data: (50000, 32, 32, 3)
Current shape of test data: (10000, 32, 32, 3)


In [4]:
# Saving processed data in npz file
np.savez('cifar_dataset_processed.npz',x_train = x_train_normalized, y_train = y_train, x_test = x_test_normalized, y_test = y_test)