In [1]:
import nibabel as nib
import numpy as np
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.padding import PKCS7
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import logging
from nibabel.filebasedimages import FileBasedImage
import cv2
import pandas as pd
import kagglehub
from pyspark.rdd import RDD
import boto3
from io import BytesIO
logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

In [2]:
from dotenv import load_dotenv

CT_SCANS = "../rawdata/ct_scans"
INFECTION_MASKS = "../rawdata/infection_mask"
LUNGANDINFECTION_MASKS = "../rawdata/lung_and_infection_mask"
LUNG_MASKS = "../rawdata/lung_mask"
OUTPUT_DIR = "../rawdata/"

# Load environment variables
load_dotenv()
# Load Minio credentials
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
print(S3_ACCESS_KEY)
print(S3_SECRET_KEY)

o1Qzq9W0RuSODfOdOWMO
NvuyLbJngKDNtNf0GdllPvbtZk3Wgg2SbgOaIqLF


In [3]:
ct_nii: FileBasedImage = nib.load(f"{CT_SCANS}/coronacases_org_001.nii")
header = ct_nii.header
affine = ct_nii.affine
data = ct_nii.get_fdata()
data = np.array(data)
data[0], np.min(data), np.max(data), data.shape

(array([[-998., -998., -999., ..., -998., -998., -998.],
        [-998., -998., -999., ..., -998., -998., -998.],
        [-998., -998., -999., ..., -998., -998., -998.],
        ...,
        [-998., -999., -998., ..., -998., -998., -998.],
        [-998., -999., -998., ..., -998., -998., -998.],
        [-998., -999., -998., ..., -998., -998., -998.]]),
 np.float64(-1021.0),
 np.float64(2996.0),
 (512, 512, 301))

In [4]:
spark = SparkSession.builder \
    .appName("NIfTI_Processing") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "10g") \
    .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.endpoint", "https://s3.captechvn.com") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()
sc = spark.sparkContext

KeyboardInterrupt: 

In [8]:
spark.stop()

## Load into RDDs

In [5]:
##### Extract #####
def download_data():
    if os.path.exists(OUTPUT_DIR):
        return
    os.makedirs("OUTPUT_DIR", exist_ok=True)
    # Download latest version to rawdata folder
    path = kagglehub.dataset_download("andrewmvd/covid19-ct-scans")
    print("Path to dataset files:", path)
    # Move the downloaded folder to the rawdata directory, For example: 4/ct_scans, 4/metadata -> rawdata/ct_scans, rawdata/metadata
    os.rename(path, OUTPUT_DIR)
    print("Data downloaded and unzipped successfully!")

def extract_metadata():
    # Extract metadata from the downloaded dataset
    path = os.path.join(OUTPUT_DIR, "metadata.csv")
    metadata = pd.read_csv(path)
    logger.info("Metadata extracted successfully!")
    metadata.replace("../input/covid19-ct-scans/", "", regex=True, inplace=True)
    return metadata

def extract_image_data(metadata: pd.DataFrame) -> RDD:
    # Convert DataFrame to np array
    image_paths: np.ndarray = metadata.to_numpy().flatten()
    # Extract image data
    def load_image(x):
        path = os.path.join(OUTPUT_DIR, x)
        nifti_data = nib.load(path)
        header = nifti_data.header
        affine = nifti_data.affine
        image_data = nifti_data.get_fdata()
        image_data = np.array(image_data)

        return image_data, header, affine, x

    image_rdds = sc.parallelize(image_paths).map(load_image)

    return image_rdds

In [None]:
##### Transform #####

# Resize the image's x and y dimensions to 256x256
def resize_image(image_rdds: RDD):
    def resize_image(x):
        image_data, header, affine, file_name = x
        resized_image = cv2.resize(image_data, (256, 256))
        return resized_image, header, affine, file_name
    
    resized_image_rdds = image_rdds.map(resize_image)
    logger.info("Image resized successfully!")
    return resized_image_rdds

# Normalize the image data
def normalize_image(image_rdds: RDD):
    def normalize_image(x):
        image_data, header, affine, file_name = x
        normalized_image = cv2.normalize(image_data, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
        return normalized_image, header, affine, file_name

    normalized_image_rdds = image_rdds.map(normalize_image)
    logger.info("Image normalized successfully!")
    return normalized_image_rdds

# Noice reduction
def denoise_image(image_rdds: RDD):
    def denoise_image(x):
        image_data, header, affine, file_name = x
        denoised_image = cv2.fastNlMeansDenoising(image_data, None, 10, 7, 21)
        return denoised_image, header, affine, file_name

    denoised_image_rdds = image_rdds.map(denoise_image)
    logger.info("Image denoised successfully!")
    return denoised_image_rdds

# Encrypt the image data
def encrypt_image(image_rdds: RDD):
    backend = default_backend()
    salt = b"this is a salt"
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=100000,
        backend=backend
    )
    key = kdf.derive(b"123123")
    nonce = os.urandom(16)
    cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=backend)

    def encrypt_image_record(record):
        """
        Encrypts a single image record.

        Args:
            record (tuple): A tuple containing (image_data, header, affine, file_name)

        Returns:
            tuple: A tuple containing (encrypted_data, header, affine, file_name)
        """
        image_data, header, affine, file_name = record

        # Generate a unique nonce for AES-CTR
        nonce = os.urandom(16)

        # Initialize Cipher for AES-CTR
        encryptor = cipher.encryptor()

        # Encrypt the image data
        ct = encryptor.update(image_data.tobytes()) + encryptor.finalize()

        # Prepend nonce to ciphertext for use in decryption
        encrypted_data = nonce + ct

        return encrypted_data, header, affine, file_name

    encrypted_image_rdds = image_rdds.map(encrypt_image_record)
    logger.info("Image encrypted successfully!")
    return encrypted_image_rdds


In [13]:
##### Load #####
# Save to disk
def save_data(image_rdds: RDD):
    def save_image(x):
        encrypted_bytes, header, affine, file_name = x
        print(f"Saving image: {file_name}")
        # Save image data: np array to disk
        os.makedirs("output", exist_ok=True)
        folder = file_name.split("/")[0]
        os.makedirs(f"output/{folder}", exist_ok=True)
        with open(f"output/{file_name}.npy", "wb") as f:
            f.write(encrypted_bytes)
        print(f"Saved image: {file_name}")

    image_rdds.foreach(save_image)

# Save to minio bucket
def save_to_bucket(image_rdds: RDD):
    # Save image data to minio bucket: https://s3.captechvn.com/etl-mi
    def upload_numpy_to_minio(x):
        s3_client = boto3.client(
            "s3",
            aws_access_key_id=S3_ACCESS_KEY,
            aws_secret_access_key=S3_SECRET_KEY,
            endpoint_url="https://s3.captechvn.com",
            verify=False
        )
        try:
            # Save NumPy array to memory
            # image_data, header, affine, file_name = x
            # buffer = BytesIO()
            # np.save(buffer, image_data)

            # Save encrypted image data to memory
            encrypted_bytes, header, affine, file_name = x
            file_name += ".enc"
            # Upload to MinIO
            s3_client.put_object(
                Bucket="etl-mi",
                Key=file_name,
                Body=encrypted_bytes,
                ContentType="application/octet-stream"
            )

            print(f"Uploaded {file_name} to MinIO")
        except Exception as e:
            print(f"Error uploading {file_name}: {e}")

    image_rdds.foreach(upload_numpy_to_minio)
    logger.info("Image saved to minio bucket successfully!")

In [14]:
download_data()
metadata = extract_metadata()
rdds = extract_image_data(metadata)
rdds = resize_image(rdds)
rdds = normalize_image(rdds)
encrypted = encrypt_image(rdds)
# save_data(encrypted)
save_to_bucket(encrypted)
# save_encrypted_image(encrypted)

INFO:__main__:Metadata extracted successfully!
INFO:__main__:Image resized successfully!
INFO:__main__:Image normalized successfully!
INFO:__main__:Image encrypted successfully!
ERROR:root:KeyboardInterrupt while sending command.               (0 + 72) / 72]
Traceback (most recent call last):
  File "/home/aphuc/web/data_science/pytorch-notebook/ETL-MI/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aphuc/web/data_science/pytorch-notebook/ETL-MI/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aphuc/miniconda3/lib/python3.12/socket.py", line 708, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
INFO:py4j.clientserver:Closing down clientserve

KeyboardInterrupt: 

In [None]:
# Load a sample from the saved data
loaded_data = np.load("coronacases_org_001.nii.npy")
import matplotlib.pyplot as plt

middle_slice = loaded_data.shape[2] // 2
plt.imshow(loaded_data[:,:,middle_slice], cmap="gray")
plt.axis("off")
plt.show()

FileNotFoundError: [Errno 2] No such file or directory: 'coronacases_org_001.nii.npy'

In [None]:
def decrypt_image(file_path: str):
    backend = default_backend()
    salt = b"this is a salt"
    password = b"123123"
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=100000,
        backend=backend
    )
    key = kdf.derive(password)
    response = s3.get_object(Bucket="etl-mi", Key=file_path)
    encrypted_data = response["Body"].read()
    # Extract nonce and ciphertext
    nonce = encrypted_data[:16]
    ct = encrypted_data[16:]
    cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=backend)
    decryptor = cipher.decryptor()
    decrypted_bytes = decryptor.update(ct) + decryptor.finalize()
    # Convert bytes back to original image data format
    # You need to know the original dtype and shape; replace <dtype> and <shape> accordingly
    image_data = np.frombuffer(decrypted_bytes, dtype=np.uint8)  # Example dtype
    # If you have shape information, reshape accordingly
    # image_data = image_data.reshape(<shape>)
    return image_data



ValueError: Invalid padding bytes.

## Encrypted Data