# Image to Parquet
Convert images to binary and save them into a parquet file.

In [1]:
import site
from pathlib import Path
from pyspark.sql import SparkSession

%load_ext autoreload
%autoreload 2

In [2]:
def get_gcs_connector_jar() -> str:
    # Assuming the JAR is installed in the user site-packages of PySpark
    user_site_packages = site.getusersitepackages()
    jars_dir = Path(user_site_packages) / "pyspark" / "jars"
    # Search for the GCS connector JAR
    jar = [jar for jar in jars_dir.glob("gcs-connector-hadoop3-*.jar")]
    return str(jar[0])


gcs_connector_jar = get_gcs_connector_jar()
# Initialize Spark Session
spark = (
    SparkSession.builder.appName("Image2Parquet")
    .master("local[*]")
    .config("spark.jars", gcs_connector_jar)
    .getOrCreate()
)
print(spark)

24/02/03 22:47:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<pyspark.sql.session.SparkSession object at 0x7f4e283fe170>


In [3]:
# Set Hadoop configurations for GCS
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set(
    "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
sc._jsc.hadoopConfiguration().set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)

In [None]:
import os
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)

# Base directory using pathlib
curr_dir = Path(os.getcwd())
base_dir = curr_dir.parents[1]
data_dir = base_dir / "data" / "SnakeCLEF2023-small_size" / "2023"
num_folders = 20

# Ensure base directory exists
if not data_dir.is_dir():
    raise FileNotFoundError(f"Data directory {data_dir} does not exist.")

# Getting subfolders
subfolders = sorted([f.name for f in data_dir.iterdir() if f.is_dir()])[:num_folders]

In [None]:
# Schema for the DataFrame
from pyspark.sql.types import StructType, StructField, BinaryType, StringType

schema = StructType(
    [
        StructField("path", StringType(), True),
        StructField("folder_name", StringType(), True),
        StructField("year", StringType(), True),
        StructField("binomial_name", StringType(), True),
        StructField("file_name", StringType(), True),
        StructField("image_binary_data", BinaryType(), True),
    ]
)


# Function to convert image to binary
def image_to_binary(image_path):
    with open(image_path, "rb") as file:
        return file.read()

In [None]:
# Create an empty RDD
image_rdd = spark.sparkContext.emptyRDD()

# Loop through subfolders and process images
for folder in subfolders:
    folder_path = data_dir / folder
    for img_name in os.listdir(folder_path):
        img_path = folder_path / img_name
        relative_path = img_path.relative_to(base_dir)  # Get relative path
        relative_path = str(relative_path).split("data/")[-1]
        folder_name = relative_path.split("/")[0]
        year = relative_path.split("/")[1]
        binomial_name = relative_path.split("/")[2]
        file_name = relative_path.split("/")[-1]
        # print(f"{folder_name}, {year}, {binomial_name}, {file_name}")
        binary_data = image_to_binary(str(img_path))
        image_rdd = image_rdd.union(
            spark.sparkContext.parallelize(
                [
                    (
                        relative_path,
                        folder_name,
                        year,
                        binomial_name,
                        file_name,
                        binary_data,
                    )
                ]
            )
        )

In [None]:
# Convert RDD to DataFrame
image_df = spark.createDataFrame(image_rdd, schema)

# Show the first few rows of image_df
image_df.show(n=5)

In [None]:
# Base directory for parquet_files folder
data_dir = Path(os.getcwd()).parents[1] / "data"

# Create "parquet_files" directory if it doesn't exist
parquet_dir = data_dir / "parquet_files"
os.makedirs(parquet_dir, exist_ok=True)

In [None]:
# Path to the Parquet file
parquet_file_path = parquet_dir / "images_data.parquet"

# Save the DataFrame as a Parquet file
image_df.write.mode("overwrite").parquet(str(parquet_file_path))

In [None]:
image_df.printSchema()

In [None]:
def get_size_of_parquet(dir_path):
    total_size = 0
    for root, dirs, files in os.walk(dir_path):
        for file in files:
            total_size += os.path.getsize(os.path.join(root, file))
    return total_size


# Get the size of the Parquet file (directory)
parquet_size = get_size_of_parquet(parquet_file_path)
print(f"Size of Parquet file: {parquet_size} bytes")

In [None]:
# Define the GCS path
gcs_path = "gs://dsgt-clef-snakeclef-2024/data/parquet_files/image_data"

# Write the DataFrame to GCS
image_df.write.mode("overwrite").parquet(gcs_path)

## Load DataFrame from GCS

In [None]:
from snakeclef.utils import get_spark

spark = get_spark()
display(spark)

In [None]:
# Define the GCS path to the Parquet file
gcs_parquet_path = "gs://dsgt-clef-snakeclef-2024/data/parquet_files/image_data"

# Read the Parquet file into a DataFrame
df = spark.read.parquet(gcs_parquet_path)

# Show the data (for example, first few rows)
df.show(n=3)

In [None]:
import io
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image


def display_images_from_binary(image_data_list, binomial_names, grid_size=(3, 3)):
    """
    Display images in a grid with binomial names as labels.

    :param image_data_list: List of binary image data.
    :param binomial_names: List of binomial names corresponding to each image.
    :param grid_size: Tuple (rows, cols) representing the grid size.
    """
    # Unpack the number of rows and columns for the grid
    rows, cols = grid_size

    # Create a matplotlib subplot with the specified grid size
    fig, axes = plt.subplots(rows, cols, figsize=(12, 12), dpi=80)

    # Flatten the axes array for easy iteration if it's 2D
    axes = axes.flatten() if isinstance(axes, np.ndarray) else [axes]

    for ax, binary_data, name in zip(axes, image_data_list, binomial_names):
        # Convert binary data to an image and display it
        image = Image.open(io.BytesIO(binary_data))
        ax.imshow(image)
        name = name.replace("_", " ")
        ax.set_xlabel(name)  # Set the binomial name as xlabel
        ax.xaxis.label.set_size(14)  # Set the font size for the xlabel
        ax.set_xticks([])
        ax.set_yticks([])
    plt.tight_layout()
    plt.show()

In [None]:
# Collect binary image data from DataFrame
rows, cols = 3, 3
image_data_list = [row["image_binary_data"] for row in df.limit(rows * cols).collect()]
binomial_names = [row["binomial_name"] for row in df.limit(rows * cols).collect()]

# Display the images in a grid with binomial names
display_images_from_binary(image_data_list, binomial_names, grid_size=(3, 3))

## Join image DataFrame with Metadata files

In [4]:
import os
from pyspark.sql.functions import lit, regexp_extract, regexp_replace

In [5]:
# Base directory using pathlib
curr_dir = Path(os.getcwd())
base_dir = curr_dir.parents[1]
base_dir = base_dir / "data" / "SnakeCLEF2023-small_size"

# Load all files from the base directory as binary data
# Convert Path object to string when passing to PySpark
image_df = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .option("recursiveFileLookup", "true")
    .load(str(base_dir))
)

# Construct the string to be replaced - adjust this based on your actual base path
to_remove = "file:" + str(base_dir.parents[0])

# Extract metadata from the file path
image_final_df = (
    image_df.withColumn("path", regexp_replace("path", to_remove, ""))
    .withColumn("folder_name", lit("SnakeCLEF2023-small_size"))
    .withColumn("year", regexp_extract("path", ".*/(\\d{4})/.*", 1))
    .withColumn("binomial_name", regexp_extract("path", ".*/(\\d{4})/(.*)/.*", 2))
    .withColumn("file_name", regexp_extract("path", ".*/([^/]+)$", 1))
)

# Select and rename columns to fit the target schema, including renaming 'content' to 'image_binary_data'
image_final_df = image_final_df.select(
    "path",
    "folder_name",
    "year",
    "binomial_name",
    "file_name",
    image_final_df["content"].alias("image_binary_data"),
)

# Print Schema
image_final_df.printSchema()

24/02/03 22:47:26 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


root
 |-- path: string (nullable = true)
 |-- folder_name: string (nullable = false)
 |-- year: string (nullable = true)
 |-- binomial_name: string (nullable = true)
 |-- file_name: string (nullable = true)
 |-- image_binary_data: binary (nullable = true)



In [6]:
image_final_df.show(n=3, truncate=100)

                                                                                

+--------------------------------------------------------------------+------------------------+----+-------------------------+------------+----------------------------------------------------------------------------------------------------+
|                                                                path|             folder_name|year|            binomial_name|   file_name|                                                                                   image_binary_data|
+--------------------------------------------------------------------+------------------------+----+-------------------------+------------+----------------------------------------------------------------------------------------------------+
|     /SnakeCLEF2023-small_size/2010/Dendrelaphis_pictus/49119484.jpg|SnakeCLEF2023-small_size|2010|      Dendrelaphis_pictus|49119484.jpg|[FF D8 FF E0 00 10 4A 46 49 46 00 01 01 00 00 01 00 01 00 00 FF E2 FF FF 49 43 43 5F 50 52 4F 46 ...|
|   /SnakeCLEF2023-small_size/2017/L

### join image_df with metadata from GCS

In [7]:
# Get list of stored filed in cloud bucket
! gcloud storage ls gs://dsgt-clef-snakeclef-2024/raw

gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-PubTestMetadata.csv
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-TrainMetadata-HM.csv
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-TrainMetadata-iNat.csv
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-ValMetadata.csv
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-pubtest.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-train-large_size.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-train-medium_size.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-train-small_size.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-trainHMP.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-val-large_size.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-val-medium_size.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/SnakeCLEF2023-val-small_size.tar.gz
gs://dsgt-clef-snakeclef-2024/raw/urls.txt


In [8]:
from pyspark.sql import functions as F

# Root directory
raw_root = "gs://dsgt-clef-snakeclef-2024/raw/"

train_meta_hm = spark.read.csv(
    f"{raw_root}/SnakeCLEF2023-TrainMetadata-HM.csv", header=True, inferSchema=True
)
train_meta_inat = spark.read.csv(
    f"{raw_root}/SnakeCLEF2023-TrainMetadata-iNat.csv", header=True, inferSchema=True
)

train_meta_hm.printSchema()
train_meta_inat.printSchema()



root
 |-- observation_id: string (nullable = true)
 |-- binomial_name: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- subset: string (nullable = true)
 |-- code: string (nullable = true)
 |-- endemic: boolean (nullable = true)
 |-- class_id: integer (nullable = true)

root
 |-- observation_id: integer (nullable = true)
 |-- endemic: boolean (nullable = true)
 |-- binomial_name: string (nullable = true)
 |-- code: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- class_id: integer (nullable = true)
 |-- subset: string (nullable = true)



                                                                                

In [9]:
meta_df = (
    (
        # make this table consistent with the inat one
        train_meta_hm.withColumn(
            "observation_id", F.split("observation_id", " ")[1].cast("int")
        ).select(train_meta_inat.columns)
    )
    .union(train_meta_inat)
    .dropDuplicates()
    .repartition(1)
).cache()
meta_df.show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------------+-------+--------------------+----+--------------------+--------+--------+
|observation_id|endemic|       binomial_name|code|          image_path|class_id|  subset|
+--------------+-------+--------------------+----+--------------------+--------+--------+
|         42265|   true|     Liodytes pygaea|  US|HMP/Liodytes_pyga...|     933|train-hm|
|        317846|   true|     Liodytes pygaea|  US|HMP/Liodytes_pyga...|     933|train-hm|
|        325731|   true|       Crotalus enyo|  MX|HMP/Crotalus_enyo...|     414|train-hm|
|        114227|  false|  Contia longicaudae|  US|HMP/Contia_longic...|     378|train-hm|
|        128559|  false|  Contia longicaudae|  US|HMP/Contia_longic...|     378|train-hm|
|        375354|  false|  Contia longicaudae|  US|HMP/Contia_longic...|     378|train-hm|
|        127773|   true|      Micrurus tener|  US|HMP/Micrurus_tene...|    1082|train-hm|
|        280822|  false| Dendrelaphis pictus|  ID|HMP/Dendrelaphis_...|     499|train-hm|
|        3

                                                                                

In [10]:
# check if there are duplicate ids
print(meta_df.count())
print(meta_df.groupBy("observation_id").count().where("count > 1").count())

# check if there are duplicate paths?
meta_df.groupBy("image_path").count().where("count > 1").show()

168139
34847
+----------+-----+
|image_path|count|
+----------+-----+
+----------+-----+



In [11]:
# let's grab all the paths
train_root = Path("/mnt/data/SnakeCLEF2023-small_size")
paths = sorted([p.relative_to(train_root) for p in train_root.glob("**/*.jpg")])
len(paths), paths[:10]

(68495,
 [PosixPath('1990/Amphiesma_stolatum/59067968.jpg'),
  PosixPath('1990/Elaphe_schrenckii/57902708.jpg'),
  PosixPath('1990/Gloydius_intermedius/57909018.jpg'),
  PosixPath('1990/Lachesis_muta/42704088.jpg'),
  PosixPath('1990/Lampropeltis_annulata/7437403.jpg'),
  PosixPath('1990/Oligodon_taeniatus/59067926.jpg'),
  PosixPath('1990/Xenoxybelis_argenteus/113910655.jpg'),
  PosixPath('1990/Xenoxybelis_argenteus/113910659.jpg'),
  PosixPath('1990/Zamenis_lineatus/3001242.jpg'),
  PosixPath('1991/Bitis_arietans/1250455.jpg')])

In [12]:
from pyspark.sql import Row

path_df = spark.createDataFrame([Row(path=p.as_posix()) for p in paths])
path_df.show(n=5, truncate=100)

[Stage 24:>                                                         (0 + 1) / 1]

+--------------------------------------+
|                                  path|
+--------------------------------------+
|  1990/Amphiesma_stolatum/59067968.jpg|
|   1990/Elaphe_schrenckii/57902708.jpg|
|1990/Gloydius_intermedius/57909018.jpg|
|       1990/Lachesis_muta/42704088.jpg|
|1990/Lampropeltis_annulata/7437403.jpg|
+--------------------------------------+
only showing top 5 rows



                                                                                

In [13]:
meta_df.select("image_path").show(n=3, truncate=False)


def remove_leading_parent(col):
    """remove the leading parent directory from the path

    e.g. 1992/Lampropeltis_annulata/70994554.jpg turns into Lampropeltis_annulata/70994554.jpg
    """
    return F.regexp_replace(col, "^(.+?)\/", "")


# let's join the two tables
joined_meta_df = meta_df.withColumn("path", remove_leading_parent("image_path")).join(
    path_df.withColumn("path", remove_leading_parent("path")), on="path", how="right"
)

joined_meta_df.show(n=5)

+------------------------------+
|image_path                    |
+------------------------------+
|HMP/Liodytes_pygaea/63985.jpg |
|HMP/Liodytes_pygaea/436163.jpg|
|HMP/Crotalus_enyo/448622.jpg  |
+------------------------------+
only showing top 3 rows



[Stage 31:>                                                         (0 + 1) / 1]

+--------------------+--------------+-------+--------------------+----+--------------------+--------+------+
|                path|observation_id|endemic|       binomial_name|code|          image_path|class_id|subset|
+--------------------+--------------+-------+--------------------+----+--------------------+--------+------+
|Amphiesma_stolatu...|      37280129|  false|  Amphiesma stolatum|  TH|1990/Amphiesma_st...|      66| train|
|Gloydius_intermed...|      36619782|  false|Gloydius intermedius|  RU|1990/Gloydius_int...|     741| train|
|Lampropeltis_annu...|       5954637|  false|Lampropeltis annu...|  US|1990/Lampropeltis...|     861| train|
|Vermicella_annula...|       8160866|   true| Vermicella annulata|  AU|2017/Vermicella_a...|    1732| train|
|Vipera_ammodytes/...|      10925624|  false|    Vipera ammodytes|  RS|2017/Vipera_ammod...|    1734| train|
+--------------------+--------------+-------+--------------------+----+--------------------+--------+------+
only showing top 5 

                                                                                

In [14]:
# any missing rows on the left will be filled with nulls
joined_meta_df.where("image_path is not null").count(), path_df.count()

                                                                                

(68495, 68495)