In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=b574f157168c2235747a08fbf2be84f91351b41d5b1d44f9e0fb69f1ca890896
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import pandas as pd
import numpy as np
from PIL import Image, ImageOps
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Flatten, Dense, Dropout
from tensorflow.keras.models import Model
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
# Define image directory and label file paths
image_dir = "/kaggle/input/diabetic-retinopathy-resized/resized_train_cropped/resized_train_cropped"
label_file = "/kaggle/input/diabetic-retinopathy-resized/trainLabels_cropped.csv"


In [None]:

# Load label data
labels_df = pd.read_csv(label_file)

# Group the labels by level and sample 400 samples from each level
labels_grouped = labels_df.groupby('level').apply(lambda x: x.sample(min(300, len(x))))

# Define data augmentation for level 0 images
data_augmentation = ImageDataGenerator(
    rotation_range=45,  # Random rotation up to 45 degrees
    width_shift_range=0.3,  # Random width shift
    height_shift_range=0.3,  # Random height shift
    shear_range=0.3,  # Shear intensity
    zoom_range=0.3,  # Random zoom
    horizontal_flip=True,  # Random horizontal flip
    fill_mode='nearest'  # Fill mode for new pixels
)

In [None]:
# Load and preprocess images with data augmentation
images = []
labels = []
for index, row in labels_grouped.iterrows():
    image_path = os.path.join(image_dir, f"{row['image']}.jpeg")
    img = Image.open(image_path)
    img = img.resize((256, 256))
    img_array = np.array(img) / 255.0
    
    # Apply data augmentation for level 0 images
    if row['level'] == 0:
        img_array = img_array.reshape((1,) + img_array.shape)  # Reshape for flow method
        for _ in range(10):  # Augment 10 additional images for level 0
            for batch in data_augmentation.flow(img_array, batch_size=1):
                augmented_img = batch[0]
                images.append(augmented_img)
                labels.append(row['level'])
                break  # Break after one iteration to avoid infinite loop
    else:
        images.append(img_array)
        labels.append(row['level'])

images = np.array(images)
labels = np.array(labels)

In [None]:
# Encode labels
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# Load pre-trained VGG16 model
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(256, 256, 3))

# Unfreeze some top layers for fine-tuning
for layer in base_model.layers[:-4]:
    layer.trainable = False

# Add custom classifier layers
x = Flatten()(base_model.output)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)  # Add dropout for regularization
predictions = Dense(5, activation='softmax')(x)

# Create model
model = Model(inputs=base_model.input, outputs=predictions)

# Compile model with lower learning rate
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Train model with more epochs
model.fit(X_train, y_train, epochs=30, batch_size=32, validation_split=0.2)

# Extract features using the trained model
features_train = model.predict(X_train)
features_test = model.predict(X_test)

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ImageClassification") \
    .getOrCreate()

# Create Spark DataFrame for train and test data
train_df = pd.DataFrame(features_train)
train_df['label'] = y_train
train_spark_df = spark.createDataFrame(train_df)

test_df = pd.DataFrame(features_test)
test_df['label'] = y_test
test_spark_df = spark.createDataFrame(test_df)

# Define vector assembler
assembler = VectorAssembler(inputCols=train_spark_df.columns[:-1], outputCol='features')

# Transform train and test data
train_spark_df = assembler.transform(train_spark_df)
test_spark_df = assembler.transform(test_spark_df)

# Define MLP model with tuned hyperparameters
layers = [train_spark_df.schema['features'].metadata["ml_attr"]["num_attrs"], 256, 128, 64, 5]
mlp = MultilayerPerceptronClassifier(layers=layers, seed=42, blockSize=128, maxIter=100, stepSize=0.03)

# Train MLP model with more iterations
mlp_model = mlp.fit(train_spark_df)

# Make predictions
predictions = mlp_model.transform(test_spark_df)

# Evaluate model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy:", accuracy)

# Print the number of images
print("Number of images:", len(images))


2024-05-11 16:39:26.293586: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-11 16:39:26.293681: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-11 16:39:26.423365: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
  labels_grouped = labels_df.groupby('level').apply(lambda x: x.sample(min(300, len(x))))


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m58889256/58889256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 0us/step
Epoch 1/30


2024-05-11 16:41:08.741036: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65536: 3.31777, expected 2.61172
2024-05-11 16:41:08.741090: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65537: 5.10329, expected 4.39724
2024-05-11 16:41:08.741099: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65538: 4.80913, expected 4.10308
2024-05-11 16:41:08.741107: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65544: 5.33366, expected 4.62762
2024-05-11 16:41:08.741114: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65545: 5.00267, expected 4.29663
2024-05-11 16:41:08.741122: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65546: 4.93559, expected 4.22954
2024-05-11 16:41:08.741130: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 65547: 5.39283, expected 4.68678
2024-05-11 16:41:08.741137:

[1m 1/84[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m38:22[0m 28s/step - accuracy: 0.4062 - loss: 1.7002

I0000 00:00:1715445691.955539     116 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 130ms/step - accuracy: 0.6884 - loss: 1.3098 - val_accuracy: 0.7723 - val_loss: 0.4765
Epoch 2/30
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 116ms/step - accuracy: 0.7622 - loss: 0.4714 - val_accuracy: 0.7812 - val_loss: 0.4460
Epoch 3/30
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 117ms/step - accuracy: 0.7757 - loss: 0.4559 - val_accuracy: 0.7812 - val_loss: 0.4550
Epoch 4/30
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 116ms/step - accuracy: 0.7892 - loss: 0.4300 - val_accuracy: 0.7932 - val_loss: 0.4238
Epoch 5/30
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 116ms/step - accuracy: 0.7890 - loss: 0.4314 - val_accuracy: 0.7738 - val_loss: 0.6843
Epoch 6/30
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 117ms/step - accuracy: 0.7749 - loss: 0.4788 - val_accuracy: 0.7768 - val_loss: 0.4625
Epoch 7/30
[1m84/84[0m [32m━━━

2024-05-11 16:46:47.369141: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 0: 3.72144, expected 2.89031
2024-05-11 16:46:47.369203: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 1: 5.65806, expected 4.82693
2024-05-11 16:46:47.369219: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 2: 5.98376, expected 5.15263
2024-05-11 16:46:47.369237: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 3: 6.6283, expected 5.79717
2024-05-11 16:46:47.369249: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 6: 6.90088, expected 6.06975
2024-05-11 16:46:47.369260: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 7: 6.70007, expected 5.86894
2024-05-11 16:46:47.369271: E external/local_xla/xla/service/gpu/buffer_comparator.cc:1137] Difference at 8: 5.75339, expected 4.92226
2024-05-11 16:46:47.369282: E external/local_xla/xla/ser

[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 353ms/step


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/11 16:46:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/11 16:47:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/11 16:47:27 WARN BlockManager: Asked to remove block broadcast_167, which does not exist


Test Accuracy: 0.844047619047619
Number of images: 4200


                                                                                

In [4]:
# Make predictions on the training set
train_predictions = mlp_model.transform(train_spark_df)

# Evaluate train accuracy
train_accuracy = evaluator.evaluate(train_predictions)
print("Train Accuracy:",train_accuracy)

[Stage 117:>                                                        (0 + 4) / 4]

Train Accuracy: 0.9139880952380952


                                                                                