In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from google.cloud import storage

# Initialize Spark session
spark = SparkSession.builder \
    .appName('pyspark-run-with-local-folder') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# Note: GCS-specific configurations are not needed for local files
# Remove GCS configurations if switching to local file path
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json")
spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.gs.project.id", "filippo-boni-project")

# Load the image data from a local folder
image_df = spark.read.format("binaryFile") \
    .load("gs://breast-cancer-images-raw/raw_breast_images/Dataset_BUSI_with_GT/benign/")

# Print schema to understand the data structure
image_df.printSchema()

# Get some rows to verify data loading
rows = image_df.select("content").limit(3).collect()

# Print the first row's image data (for inspection)
for row in rows:
    image_data = rows[0]['content']
    print(image_data[:100])  # Print the first 100 bytes to check

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
import cv2
import numpy as np
import io

# Define the UDF to resize images
def resize_image(image_data, width=100, height=100):
    # Convert binary data to NumPy array
    nparr = np.frombuffer(image_data, np.uint8)
    
    # Decode image from array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is not None:
        # Resize image
        resized_img = cv2.resize(img, (width, height))
        
        # Encode resized image back to binary format
        _, img_encoded = cv2.imencode('.png', resized_img)
        
        # Return the binary data
        return img_encoded.tobytes()
    else:
        # Return original data if decoding failed
        return image_data

# Register the UDF
resize_image_udf = udf(lambda data: resize_image(data, width=100, height=100), BinaryType())


In [None]:
resized_image_df = image_df.withColumn("resized_content", resize_image_udf("content"))

In [None]:
resized_image_df.printSchema()
resized_image_df.show(truncate=True)

In [None]:
import matplotlib.pyplot as plt

# Get some rows to verify data loading
rows = image_df.select("content").limit(3).collect()

# Plot the images
for i, row in enumerate(rows):
    # Get the image data
    image_data = row['content']
    
    # Convert binary data to NumPy array
    nparr = np.frombuffer(image_data, np.uint8)
    
    # Decode image from array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is not None:
        # Convert BGR (OpenCV format) to RGB (Matplotlib format)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Plot the image
        plt.figure(figsize=(6, 6))
        plt.imshow(img_rgb)
        plt.title(f'Image {i + 1}')
        plt.axis('off')  # Turn off axis numbers and ticks
        plt.show()
    else:
        print(f'Failed to decode image {i + 1}')

In [None]:
import matplotlib.pyplot as plt

# Get some rows to verify data loading
rows = resized_image_df.select("resized_content").limit(3).collect()

# Plot the images
for i, row in enumerate(rows):
    # Get the image data
    image_data = row['resized_content']
    
    # Convert binary data to NumPy array
    nparr = np.frombuffer(image_data, np.uint8)
    
    # Decode image from array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is not None:
        # Convert BGR (OpenCV format) to RGB (Matplotlib format)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        print(np.shape(img_rgb))
        # Plot the image
        plt.figure(figsize=(6, 6))
        plt.imshow(img_rgb)
        plt.title(f'Image {i + 1}')
        plt.axis('off')  # Turn off axis numbers and ticks
        plt.show()
    else:
        print(f'Failed to decode image {i + 1}')

In [None]:
# Stop SparkSession
spark.stop()


In [None]:
# Select only the resized image column for saving
resized_image_df = resized_image_df.select("resized_content")

In [None]:
# Collect the resized images to the driver node
resized_images = resized_image_df.select("resized_content").collect()

In [None]:
from google.cloud import storage
from google.oauth2 import service_account
import os
import zipfile
import io



# Path to your service account key file
key_path = '/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json'

# Create credentials object
credentials = service_account.Credentials.from_service_account_file(key_path)

client = storage.Client(credentials=credentials)
# Define the GCS bucket and folder
bucket_name = 'breast-images-resized'
bucket = client.bucket(bucket_name)
output_folder = 'resized_images/images/'

# Save each resized image to GCS
for idx, row in enumerate(resized_images):
    image_data = row['resized_content']
    file_name = f"image_{idx}.jpg"
    
    # Save locally (optional) - skip this if directly uploading to GCS
    with open(file_name, 'wb') as f:
        f.write(image_data)
    
    # Upload to GCS
    blob = bucket.blob(output_folder + file_name)
    blob.upload_from_filename(file_name)

    # Optionally, delete the local file after uploading
    # os.remove(file_name)

print(f"Resized images uploaded to gs://{bucket_name}/{output_folder}")

In [None]:
from pyspark.sql.functions import udf, col, regexp_extract

image_df = image_df.withColumn("filename", regexp_extract(col("path"), r"([^/]+)$", 1))

In [None]:
image_df.printSchema()
image_df.show(truncate=True)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr


prefix = "gs://breast-cancer-images-raw/raw_breast_images/"

# Extract the subpath after the specified prefix and create a new column
image_df = image_df.withColumn(
    "subpath", 
    expr(f"substring(path, length('{prefix}') + 1, length(path))")
)

# Display the DataFrame with the new column
image_df.select("path", "subpath").show(truncate=False)



In [2]:
# Stop SparkSession
spark.stop()


In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from google.cloud import storage

# Initialize Spark session
spark = SparkSession.builder \
    .appName('pyspark-run-with-local-folder') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# Note: GCS-specific configurations are not needed for local files
# Remove GCS configurations if switching to local file path
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json")
spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.gs.project.id", "filippo-boni-project")

# Load the image data from a local folder
image_df = spark.read.format("binaryFile") \
    .load("gs://breast-cancer-images-raw/raw_breast_images/Dataset_BUSI_with_GT/benign/")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 13:56:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/08/25 13:56:47 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr
from pyspark.sql.types import BinaryType, StringType
import cv2
import numpy as np
from google.cloud import storage
from google.oauth2 import service_account

key_path = '/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json'

# Create credentials object
credentials = service_account.Credentials.from_service_account_file(key_path)


# Define the UDF to resize images
def resize_image(image_data, width=100, height=100):
    # Convert binary data to NumPy array
    nparr = np.frombuffer(image_data, np.uint8)
    
    # Decode image from array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is not None:
        # Resize image
        resized_img = cv2.resize(img, (width, height))
        
        # Encode resized image back to binary format
        _, img_encoded = cv2.imencode('.png', resized_img)
        return img_encoded.tobytes()
    else:
        return None

# Register the resizing UDF
resize_image_udf = udf(lambda data: resize_image(data, width=100, height=100), BinaryType())



In [None]:
# Define the prefix to be removed
prefix = "gs://breast-cancer-images-raw/raw_breast_images/"

# Extract the subpath after the specified prefix and create a new column
image_df = image_df.withColumn(
    "subpath", 
    expr(f"substring(path, length('{prefix}') + 1, length(path))")
)

#image_df.show(truncate=True)

In [None]:
# Apply the resizing UDF
resized_image_df = image_df.withColumn("resized_content", resize_image_udf("content"))

# Show the GCS paths of the uploaded images
#resized_image_df.show(truncate=True)

In [None]:

# Define the UDF to upload images to GCS with the extracted subpath
def upload_image_to_gcs(image_data, subpath, bucket_name):
    if image_data is not None:
    
        # Upload to GCS with the original subpath
        client = storage.Client(credentials=credentials)
        bucket = client.bucket(bucket_name)
        blob = bucket.blob("breast-images-resized/" + subpath)
        #print(blob)
        if isinstance(image_data, bytearray):
            image_data = bytes(image_data)

        print(image_data)
        print(type(image_data))
        blob.upload_from_string(image_data, content_type='image/png')
        
        # Return the GCS path as confirmation
        return f"gs://{bucket_name}/{subpath}"
    else:
        return None

# Register the upload UDF
upload_image_udf = udf(
    lambda data, subpath: upload_image_to_gcs(data, subpath, "breast-images-resized"), 
    StringType()
)

In [None]:
# Apply the upload UDF, using the full subpath in the destination
result_df = resized_image_df.withColumn("gcs_path", upload_image_udf("resized_content", "subpath"))

# Show the GCS paths of the uploaded images
result_df.select("gcs_path").show(truncate=False)

In [29]:
# Stop SparkSession
spark.stop()

In [35]:
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from google.cloud import storage

# Initialize Spark session
spark = SparkSession.builder \
    .appName('pyspark-run-with-local-folder') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# Note: GCS-specific configurations are not needed for local files
# Remove GCS configurations if switching to local file path
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json")
spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.gs.project.id", "filippo-boni-project")

# Load the image data from a local folder
image_df = spark.read.format("binaryFile") \
    .load("gs://breast-cancer-images-raw/raw_breast_images/Dataset_BUSI_with_GT/benign/")

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr
from pyspark.sql.types import BinaryType, StringType
import cv2
import numpy as np
from google.cloud import storage
from google.oauth2 import service_account


# Access SparkContext from the SparkSession
sc = spark.sparkContext

# Broadcast the GCS credentials to all executors
key_path = '/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json'
credentials = service_account.Credentials.from_service_account_file(key_path)
credentials_broadcast = sc.broadcast(credentials)


# Define the UDF to resize images
def resize_image(image_data, width=100, height=100):
    # Convert binary data to NumPy array
    nparr = np.frombuffer(image_data, np.uint8)
    
    # Decode image from array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is not None:
        # Resize image
        resized_img = cv2.resize(img, (width, height))
        
        # Encode resized image back to binary format
        _, img_encoded = cv2.imencode('.png', resized_img)
        return img_encoded.tobytes()
    else:
        return None

# Register the resizing UDF
resize_image_udf = udf(lambda data: resize_image(data, width=100, height=100), BinaryType())

# Define the prefix to be removed
prefix = "gs://breast-cancer-images-raw/raw_breast_images/"

# Extract the subpath after the specified prefix and create a new column
image_df = image_df.withColumn(
    "subpath", 
    expr(f"substring(path, length('{prefix}') + 1, length(path))")
)

# Apply the resizing UDF
resized_image_df = image_df.withColumn("resized_content", resize_image_udf("content"))

# Show the GCS paths of the uploaded images

# Define the UDF to upload images to GCS with the extracted subpath
"""
def upload_image_to_gcs(image_data, subpath, bucket_name):
    if image_data is not None:
    
        # Upload to GCS with the original subpath
        client = storage.Client(credentials=credentials)
        bucket = client.bucket(bucket_name)
        blob = bucket.blob("breast-images-resized/" + subpath)
        #print(blob)
        if isinstance(image_data, bytearray):
            image_data = bytes(image_data)

        blob.upload_from_string(image_data, content_type='image/png')
        
        # Return the GCS path as confirmation
        return f"gs://{bucket_name}/{subpath}"
    else:
        return None

# Register the upload UDF
upload_image_udf = udf(
    lambda data, subpath: upload_image_to_gcs(data, subpath, "breast-images-resized"), 
    StringType()
)

# Apply the upload UDF, using the full subpath in the destination
result_df = resized_image_df.withColumn("gcs_path", upload_image_udf("resized_content", "subpath"))
"""

'\ndef upload_image_to_gcs(image_data, subpath, bucket_name):\n    if image_data is not None:\n    \n        # Upload to GCS with the original subpath\n        client = storage.Client(credentials=credentials)\n        bucket = client.bucket(bucket_name)\n        blob = bucket.blob("breast-images-resized/" + subpath)\n        #print(blob)\n        if isinstance(image_data, bytearray):\n            image_data = bytes(image_data)\n\n        blob.upload_from_string(image_data, content_type=\'image/png\')\n        \n        # Return the GCS path as confirmation\n        return f"gs://{bucket_name}/{subpath}"\n    else:\n        return None\n\n# Register the upload UDF\nupload_image_udf = udf(\n    lambda data, subpath: upload_image_to_gcs(data, subpath, "breast-images-resized"), \n    StringType()\n)\n\n# Apply the upload UDF, using the full subpath in the destination\nresult_df = resized_image_df.withColumn("gcs_path", upload_image_udf("resized_content", "subpath"))\n'

In [36]:
def process_partition(iterator):
    # Initialize the GCS client and bucket
    client = storage.Client(credentials=credentials_broadcast.value)
    bucket = client.bucket("breast-images-resized")
    
    for row in iterator:
        image_data = row['resized_content']
        subpath = row['subpath']
            
        if image_data is not None:
            
            # Upload to GCS
            blob = bucket.blob(f"breast-images-resized/{subpath}")

            if isinstance(image_data, bytearray):
                image_data = bytes(image_data)

            blob.upload_from_string(image_data, content_type='image/png')

    print("Partition processed successfully.")

In [37]:
sc.defaultParallelism

10

In [38]:
# Repartition the dataframe to optimize parallel processing
num_partitions = sc.defaultParallelism * 4
resized_image_df = resized_image_df.repartition(num_partitions)

In [39]:
resized_image_df.rdd.foreachPartition(process_partition)

Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.
Partition proc

In [40]:
resized_image_df.show()

+----+----------------+------+-------+-------+---------------+
|path|modificationTime|length|content|subpath|resized_content|
+----+----------------+------+-------+-------+---------------+
+----+----------------+------+-------+-------+---------------+



In [51]:
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from google.cloud import storage

# Initialize Spark session
spark = SparkSession.builder \
    .appName('pyspark-run-with-local-folder') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# Note: GCS-specific configurations are not needed for local files
# Remove GCS configurations if switching to local file path
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json")
spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.gs.project.id", "filippo-boni-project")

# Load the image data from a local folder
image_df = spark.read.format("binaryFile") \
    .load("gs://breast-cancer-images-raw/raw_breast_images/Dataset_BUSI_with_GT/*")

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr
from pyspark.sql.types import BinaryType, StringType
import cv2
import numpy as np
from google.cloud import storage
from google.oauth2 import service_account


# Access SparkContext from the SparkSession
sc = spark.sparkContext

# Broadcast the GCS credentials to all executors
key_path = '/Users/filippoboni/Desktop/MLOPS/move_images_to_gcp/filippo-boni-project-f61113b1f868.json'
credentials = service_account.Credentials.from_service_account_file(key_path)
credentials_broadcast = sc.broadcast(credentials)


# Define the UDF to resize images
def resize_image(image_data, width=100, height=100):
    # Convert binary data to NumPy array
    nparr = np.frombuffer(image_data, np.uint8)
    
    # Decode image from array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is not None:
        # Resize image
        resized_img = cv2.resize(img, (width, height))
        
        # Encode resized image back to binary format
        _, img_encoded = cv2.imencode('.png', resized_img)
        return img_encoded.tobytes()
    else:
        return None

# Register the resizing UDF
resize_image_udf = udf(lambda data: resize_image(data, width=100, height=100), BinaryType())

# Define the prefix to be removed
prefix = "gs://breast-cancer-images-raw/raw_breast_images/"

# Extract the subpath after the specified prefix and create a new column
image_df = image_df.withColumn(
    "subpath", 
    expr(f"substring(path, length('{prefix}') + 1, length(path))")
)

# Apply the resizing UDF
resized_image_df = image_df.withColumn("resized_content", resize_image_udf("content"))

def process_partition(iterator):
    # Initialize the GCS client and bucket
    client = storage.Client(credentials=credentials_broadcast.value)
    bucket = client.bucket("breast-cancer-images-raw")
    
    for row in iterator: 
        image_data = row['resized_content']
        subpath = row['subpath']
            
        if image_data is not None:
            
            # Upload to GCS
            blob = bucket.blob(f"breast-images-resized/{subpath}")

            if isinstance(image_data, bytearray):
                image_data = bytes(image_data)

            blob.upload_from_string(image_data, content_type='image/png')

    print("Partition processed successfully.")

# Repartition the dataframe to optimize parallel processing
num_partitions = sc.defaultParallelism * 4
resized_image_df = resized_image_df.repartition(num_partitions)

In [52]:
resized_image_df.rdd.foreachPartition(process_partition)

Partition processed successfully.                                 (0 + 10) / 40]
Partition processed successfully.
Partition processed successfully.                                 (2 + 10) / 40]
Partition processed successfully.                                 (3 + 10) / 40]
Partition processed successfully.
Partition processed successfully.
Partition processed successfully.                                 (6 + 10) / 40]
Partition processed successfully.                                 (7 + 10) / 40]
Partition processed successfully.
Partition processed successfully.                                 (9 + 10) / 40]
Partition processed successfully.                                (10 + 10) / 40]
Partition processed successfully.                                (11 + 10) / 40]
Partition processed successfully.                                (12 + 10) / 40]
Partition processed successfully.                                (13 + 10) / 40]
Partition processed successfully.
Partition processed 

24/08/26 05:02:40 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3654426 ms exceeds timeout 120000 ms
24/08/26 05:02:40 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/26 05:02:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [44]:
resized_image_df.show()

+----+----------------+------+-------+-------+---------------+
|path|modificationTime|length|content|subpath|resized_content|
+----+----------------+------+-------+-------+---------------+
+----+----------------+------+-------+-------+---------------+



In [48]:
# Path to images
path = "gs://breast-cancer-images-raw/raw_breast_images/Dataset_BUSI_with_GT/benign/*"

# Load images
image_df = spark.read.format("binaryFile").load(path)

# Show schema and some rows
image_df.printSchema()
image_df.show(5)


                                                                                

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



                                                                                

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|gs://breast-cance...|2024-08-17 22:34:...|563959|[89 50 4E 47 0D 0...|
|gs://breast-cance...|2024-08-17 22:36:...|561388|[89 50 4E 47 0D 0...|
|gs://breast-cance...|2024-08-17 22:34:...|560410|[89 50 4E 47 0D 0...|
|gs://breast-cance...|2024-08-17 22:36:...|559366|[89 50 4E 47 0D 0...|
|gs://breast-cance...|2024-08-17 22:34:...|547795|[89 50 4E 47 0D 0...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows

