# Data Extraction

The main purpose of this module is to retrieve, transform, clean, and load data from medical CSV files, which will serve as the initial dataset for our healthcare support system.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import json

# Json file Path, saved on google drive of the collaboratos
json_path = '/content/drive/MyDrive/Colab Notebooks/Big Data/Final_Project/secret.json'

# Loading the json file
with open(json_path) as f:
  secrets = json.load(f)

# Secret info from json
#mongo_uri = secrets["MONGO_BASE_URI"]
mongo_uri = secrets["MONGO_M10_URI"]
collection_string_list = secrets["COLLECTION_STRING_LIST"]

In [3]:
import os
import pandas as pd
dataset_path = '/content/drive/MyDrive/Colab Notebooks/Big Data/Final_Project/Dataset/'

## Data Retrieve


For the first thing we have to retrieve from csv files all data as our database. We will clean the data, but to do this we need a framework that scales on horizontal cluster. Let's use spark!

In [4]:
#Install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# download spark3.4.4 (list of mirrors)
#!wget -q https://apache.osuosl.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz
#!wget -q https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz
!wget -q https://archive.apache.org/dist/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz

# unzip it
!tar xf spark-3.4.4-bin-hadoop3.tgz

# install findspark
!pip install -q findspark

# Scarica il connettore MongoDB-Spark
!wget -q https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.4.1/mongo-spark-connector_2.12-10.4.1.jar
!wget -q https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-sync/4.10.2/mongodb-driver-sync-4.10.2.jar
!wget -q https://repo1.maven.org/maven2/org/mongodb/bson/4.10.2/bson-4.10.2.jar
!wget -q https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-core/4.10.2/mongodb-driver-core-4.10.2.jar



In the second part of this notebook, we will load all the cleaned data to a MongoDB server using a MongoDB Atlas connection URI. To do this directly with PySpark, we need to use a dedicated connector. The following cells will contain its configuration.

In [5]:
import os

# Enviroment variable
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.4-bin-hadoop3"
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--jars /content/mongo-spark-connector_2.12-10.4.1.jar,'
    '/content/mongodb-driver-sync-4.10.2.jar,'
    '/content/bson-4.10.2.jar,'
    '/content/mongodb-driver-core-4.10.2.jar pyspark-shell'
)

In [6]:
import findspark
findspark.init()

In [7]:
# Libraries for SQL Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import split, explode, trim, count, sum, col, current_date, lower, regexp_replace
import time

# Spark Session configuration
spark = SparkSession.builder \
    .appName("MongoDBAtlasConnection") \
    .config("spark.mongodb.read.connection.uri", mongo_uri) \
    .config("spark.mongodb.write.connection.uri", mongo_uri) \
    .config("spark.jars", "/content/mongo-spark-connector_2.12-10.4.1.jar") \
    .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f23af664050>


In [8]:
# Check if the connector works
print(spark.sparkContext.getConf().get("spark.jars"))

/content/mongo-spark-connector_2.12-10.4.1.jar


In [9]:
# Trying to Connect to mongoAtlas
try:
    df = spark.read \
        .format("mongodb") \
        .option("database", "CAMPANIA_SALUTE") \
        .option("collection", "ANAGRAFICA") \
        .load()
    print("Connessione riuscita! Ecco i primi 5 documenti:")
    df.show(5)
except Exception as e:
    print("Errore di connessione:", str(e))

Connessione riuscita! Ecco i primi 5 documenti:
+------------------------+----------------+------+------------+-----------------+-------------------+-------------------+----------------+------+--------------------+---------+-----+-------+--------------------+
|CODICE_COMUNE_DI_NASCITA|  CODICE_FISCALE|CODPAZ|     COGNOME|COMUNE_DI_NASCITA|      DATADINASCITA|       DATA_DECESSO|GATE_DI_INGRESSO|ID_PAZ|      MOTIVO_DECESSO|  NOMEPAZ|SESSO|SEZIONE|                 _id|
+------------------------+----------------+------+------------+-----------------+-------------------+-------------------+----------------+------+--------------------+---------+-----+-------+--------------------+
|                    null|---NLL37B56-----|     1|           A|             null|1937-02-16 00:00:00|               null|         Esterno|   1_1|                null|    NELLO|    F|      1|6837054b383892705...|
|                    A024|NVSMDL37A58A024E|     7|       NAVAS|           ACERRA|1937-01-18 00:00:00|201

The number of tables (collections in MongoDB) we need to load is significant. To ensure an efficient workflow, we need to implement a proper organization system for this process.

In [10]:
# Dictionary of all csv paths
csvPaths = {}
healthDB_path = os.path.join(dataset_path, '2024-05-05-DATABASE')
for collection in collection_string_list:
  csvPaths[collection] = os.path.join(healthDB_path,collection + '.csv')

In [11]:
print(csvPaths['ANAGRAFICA'])

/content/drive/MyDrive/Colab Notebooks/Big Data/Final_Project/Dataset/2024-05-05-DATABASE/ANAGRAFICA.csv


In [12]:
!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m25.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m20.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.13.0


In [13]:
import csv
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, unix_timestamp, col, substring, lit, when, length, regexp_replace
from pymongo import MongoClient

class CSVLoaderManager:
    def __init__(self, spark: SparkSession, mongo_uri: str = None):
        """
        The constructor initializes the CSVLoaderManager with a SparkSession and an optional MongoDB URI.
        """
        self.spark = spark
        self.mongo_uri = mongo_uri
        self.datasets = {}

    def _detect_delimiter(self, name, file_path: str, sample_size: int = 2048) -> str:
        """
        The function reads a file part to infer the delimiter (CSV or TSV)
        """
        with open(file_path, 'r', encoding='utf-8') as f:
            # Reads some samples
            sample = f.read(sample_size)
            sniffer = csv.Sniffer()
            try:
                # Use sniffer for checking the delimiter
                dialect = sniffer.sniff(sample)
                print(f"Name: {name}; Delimiter: {dialect.delimiter}")
                return dialect.delimiter
            except csv.Error:
                raise ValueError(f"Unable to automatically detect the file delimiter format of: {file_path}")


    def load_csv(self, name: str, file_path: str) -> None:
        """
        This function loads a CSV file into a Spark DataFrame,
        it adds the dataframe to the manager's datasets
        """
        delimiter = self._detect_delimiter(name, file_path)
        ds = self.spark.read \
          .option("delimiter",delimiter) \
          .option("inferSchema", "true") \
          .option("header", "true") \
          .option("multiline", "true") \
          .option("quote", "\"") \
          .option("escape", "\"") \
          .csv(file_path)
        self.datasets[name] = ds
        ds.printSchema()

    def load_many(self, files) -> None:
        """
        This function loads multiple CSV files into Spark DataFrames
        """
        for name, path in files.items():
            self.load_csv(name, path)

    def get(self, name: str):
        return self.datasets.get(name)

    def set(self, name: str, df, overwrite: bool=True):
      """
      This function sets a DataFrame in the manager's datasets,
      it can overwrites the DataFrame if it already exists with the parameter 'overwrite'
      """
      if not hasattr(df, 'schema'):
        raise TypeError("The given value is not a Spark dataframe.")
      if name in self.datasets and not overwrite:
        raise ValueError(f"The dataset '{name}' already exists. Use overwrite=True.")
      self.datasets[name] = df

    def list_datasets(self):
      """
      This function returns a list of the names of the datasets in the manager
      """
      return list(self.datasets.keys())

    def save_to_mongo(self, name: str, database: str, mode: str = "ignore") -> None:
      """
      This function saves a DataFrame to MongoDB ATLAS
      """
      if name not in self.datasets:
          raise ValueError(f"Dataset '{name}' not found.")
      if not self.mongo_uri:
          raise ValueError("Mongo URI not configured.")

      df = self.datasets[name]
      df.write \
          .format("mongodb") \
          .mode(mode) \
          .option("database", database) \
          .option("collection", name) \
          .save()

    def drop_collections(self, database: str, collections: list) -> None:
      """
      Drops specified collections from the given MongoDB database.
      """
      if not self.mongo_uri:
          raise ValueError("Mongo URI not configured.")
      client = MongoClient(self.mongo_uri)
      db = client[database]
      for collection in collections:
        if collection in db.list_collection_names():
          db.drop_collection(collection)
          print(f"Collection '{collection}' dropped successfully.")
        else:
          print(f"Collection '{collection}' not found in the database.")
      client.close()

    def anonymize_columns(self, dataset_name: str, columns_to_anonymize: list) -> None:
        """
        Anonimizza le colonne specificate in un DataFrame Spark, sostituendo
        il valore originale con la sola iniziale del valore.

        Args:
            dataset_name (str): Il nome del dataset nel manager da anonimizzare.
            columns_to_anonymize (list): Lista di nomi di colonne da anonimizzare.
        """
        if dataset_name not in self.datasets:
            raise ValueError(f"Dataset '{dataset_name}' not found for anonymization.")

        df = self.datasets[dataset_name]

        # Inizializza il DataFrame modificato con il DataFrame originale
        df_anon = df

        for col_name in columns_to_anonymize:
            if col_name in df.columns:
              if col_name != 'CODICE_FISCALE':
                df_anon = df_anon.withColumn(
                    col_name,
                    when(
                        col(col_name).isNull(),
                        lit(None) # Se è nullo, rimane nullo
                    ).when(
                        length(col(col_name)) == 0,
                        lit("") # Se è una stringa vuota, rimane vuota
                    ).otherwise(
                        substring(col(col_name), 1, 1) # Prende solo il primo carattere
                    )
                )
              else:
                df_anon = df_anon.withColumn(
                    col_name,
                    when(
                        col(col_name).isNull(),
                        lit(None) # Se è nullo, rimane nullo
                    ).when(
                        length(col(col_name)) == 0,
                        lit("") # Se è una stringa vuota, rimane vuota
                    ).otherwise(
                       regexp_replace(col(col_name), "[A-Za-z]", "-") # Prende solo il primo carattere
                    )
                )
            else:
                print(f"Attenzione: La colonna '{col_name}' non esiste nel dataset '{dataset_name}'. Verrà ignorata.")

        # Aggiorna il dataset nel manager con il DataFrame anonimizzato
        self.set(dataset_name, df_anon, overwrite=True)
        print(f"Dataset '{dataset_name}' anonimizzato per le colonne: {', '.join(columns_to_anonymize)}. Ora contengono solo la prima lettera.")
        df_anon.printSchema() # Stampa lo schema per verificare


Now we can load all the collections into dataframe to manipulate them.

In [14]:
manager = CSVLoaderManager(spark, mongo_uri)
manager.load_many(csvPaths)
manager.list_datasets()

Name: ANAGRAFICA; Delimiter: ,
root
 |-- SEZIONE: integer (nullable = true)
 |-- CODPAZ: integer (nullable = true)
 |-- COGNOME: string (nullable = true)
 |-- NOMEPAZ: string (nullable = true)
 |-- DATADINASCITA: date (nullable = true)
 |-- SESSO: string (nullable = true)
 |-- COMUNE_DI_NASCITA: string (nullable = true)
 |-- CODICE_COMUNE_DI_NASCITA: string (nullable = true)
 |-- CODICE_FISCALE: string (nullable = true)
 |-- GATE_DI_INGRESSO: string (nullable = true)
 |-- MOTIVO_DECESSO: string (nullable = true)
 |-- DATA_DECESSO: date (nullable = true)

Name: ANAMNESI; Delimiter: 	
root
 |-- SEZIONE: integer (nullable = true)
 |-- CODPAZ: integer (nullable = true)
 |-- DATA: date (nullable = true)
 |-- NUM_PROGRESSIVO: integer (nullable = true)
 |-- DIABETE: string (nullable = true)
 |-- DISLIPIDEMIA: string (nullable = true)
 |-- DISLIPIDEMIA_IPERCOLESTEROLEMIA: string (nullable = true)
 |-- DISLIPIDEMIA_IPERTRIGLICERIDEMIA: string (nullable = true)
 |-- DISLIPIDEMIA_MISTA: string (n

['ANAGRAFICA',
 'ANAMNESI',
 'CORONAROGRAFIA_PTCA',
 'ECOCARDIO_DATI',
 'ECOCAROTIDI',
 'ESAMI_LABORATORIO',
 'ESAMI_SPECIALISTICI',
 'ESAMI_STRUMENTALI_CARDIO',
 'LISTA_EVENTI',
 'PREVALENT',
 'RICOVERO_OSPEDALIERO',
 'VISITA_CONTROLLO_ECG']

In [15]:
manager.get('LISTA_EVENTI').printSchema()

root
 |-- SEZIONE: integer (nullable = true)
 |-- CODPAZ: integer (nullable = true)
 |-- DATA: date (nullable = true)
 |-- NUM_PROGRESSIVO: integer (nullable = true)
 |-- TIPO_EVENTO: string (nullable = true)
 |-- NUM_PROGRESSIVO_GLOBALE: integer (nullable = true)



In general all datasets are quite cleaned, but there is something that could give trouble in the future. For example some columns have same values but sometimes in lower case and other time in Upper case.

In [16]:
# Let's capitalize the columns that may give problems in query generation
from pyspark.sql.functions import col, upper

df = manager.get('ANAGRAFICA')

upper_df = df \
      .withColumn("COGNOME", upper(col("COGNOME"))) \
      .withColumn("NOMEPAZ", upper(col("NOMEPAZ"))) \
      .withColumn("COMUNE_DI_NASCITA", upper(col("COMUNE_DI_NASCITA")))

manager.set('ANAGRAFICA', upper_df)
manager.get('ANAGRAFICA').show(10)

+-------+------+------------+----------+-------------+-----+-----------------+------------------------+----------------+----------------+--------------------+------------+
|SEZIONE|CODPAZ|     COGNOME|   NOMEPAZ|DATADINASCITA|SESSO|COMUNE_DI_NASCITA|CODICE_COMUNE_DI_NASCITA|  CODICE_FISCALE|GATE_DI_INGRESSO|      MOTIVO_DECESSO|DATA_DECESSO|
+-------+------+------------+----------+-------------+-----+-----------------+------------------------+----------------+----------------+--------------------+------------+
|      1|     1|           A|     NELLO|   1937-02-16|    F|                -|                    null|---NLL37B56-----|         Esterno|                null|        null|
|      1|     7|       NAVAS| MADDALENA|   1937-01-18|    F|           ACERRA|                    A024|NVSMDL37A58A024E|    Ipertensione|Causa extracardio...|  2019-07-02|
|      1|    17|D`ALESSANDRO|  RAFFAELE|   1932-10-24|    M|                 |                        |DLSRFL32R24-----|    Ipertensione|   

In [17]:
# Anonimizing dataset
df = manager.get('ANAGRAFICA')
columns_to_anonymize = ['COGNOME', 'NOMEPAZ', 'CODICE_FISCALE']
manager.anonymize_columns('ANAGRAFICA', columns_to_anonymize)


manager.get('ANAGRAFICA').show(10)


Dataset 'ANAGRAFICA' anonimizzato per le colonne: COGNOME, NOMEPAZ, CODICE_FISCALE. Ora contengono solo la prima lettera.
root
 |-- SEZIONE: integer (nullable = true)
 |-- CODPAZ: integer (nullable = true)
 |-- COGNOME: string (nullable = true)
 |-- NOMEPAZ: string (nullable = true)
 |-- DATADINASCITA: date (nullable = true)
 |-- SESSO: string (nullable = true)
 |-- COMUNE_DI_NASCITA: string (nullable = true)
 |-- CODICE_COMUNE_DI_NASCITA: string (nullable = true)
 |-- CODICE_FISCALE: string (nullable = true)
 |-- GATE_DI_INGRESSO: string (nullable = true)
 |-- MOTIVO_DECESSO: string (nullable = true)
 |-- DATA_DECESSO: date (nullable = true)

+-------+------+-------+-------+-------------+-----+-----------------+------------------------+----------------+----------------+--------------------+------------+
|SEZIONE|CODPAZ|COGNOME|NOMEPAZ|DATADINASCITA|SESSO|COMUNE_DI_NASCITA|CODICE_COMUNE_DI_NASCITA|  CODICE_FISCALE|GATE_DI_INGRESSO|      MOTIVO_DECESSO|DATA_DECESSO|
+-------+------+----

Another problem is the presence of more "null" values in some columns.

In [18]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DataType
from typing import Any

def normalize_null(value):
    """
    The function uniforms null or unexisting values at None.
    """
    null_equivalents = {"", " ", "  ", "null", "None", "N/A", "na", "-", "--", "NaN"}

    if isinstance(value, str):
        value = value.strip()
    return None if value in null_equivalents else value

def create_normalize_udf(return_type: DataType) -> Any:
    """
    Creates a UDF that normalizes null values and preserves the specified data type.

    Args:
        return_type: The Spark DataType to preserve.

    Returns:
        A PySpark UDF.
    """
    def _normalize(value: Any) -> Any:
        return normalize_null(value)
    return udf(_normalize, return_type)


In [19]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

for name in collection_string_list:
  df = manager.get(name)

  for column_name, column_type in df.dtypes:
    norm_udf = create_normalize_udf(column_type)
    df = df.withColumn(column_name, norm_udf(col(column_name)))

  manager.set(name, df)


In the end, we need to create a unique foreign key for all collections. It's not strictly mandatory, but we believe it could be useful for several reasons:

* Better Indexing
* Simpler Join Queries
* Clearer Data Model

In [20]:
from pyspark.sql.functions import col, concat_ws

# Checks if the collection has these columns, then adds unique Id column (for patient)
for collection in collection_string_list:
  if 'CODPAZ' in manager.get(collection).columns and 'SEZIONE' in manager.get(collection).columns:
    df = manager.get(collection)
    df = df.withColumn('ID_PAZ', concat_ws("_", col('SEZIONE'), col('CODPAZ')))

    # List of existing columns
    existing_columns = [c for c in df.columns if c != 'ID_PAZ']
    new_order = ['ID_PAZ'] + existing_columns

    # Set ID_PAZ as first column
    df = df.select(new_order)

    manager.set(collection, df)
    print(collection)
    manager.get(collection).show(5)


ANAGRAFICA
+------+-------+------+-------+-------+-------------+-----+-----------------+------------------------+----------------+----------------+--------------------+------------+
|ID_PAZ|SEZIONE|CODPAZ|COGNOME|NOMEPAZ|DATADINASCITA|SESSO|COMUNE_DI_NASCITA|CODICE_COMUNE_DI_NASCITA|  CODICE_FISCALE|GATE_DI_INGRESSO|      MOTIVO_DECESSO|DATA_DECESSO|
+------+-------+------+-------+-------+-------------+-----+-----------------+------------------------+----------------+----------------+--------------------+------------+
|   1_1|      1|     1|      A|      N|   1937-02-16|    F|             null|                    null|------37-56-----|         Esterno|                null|        null|
|   1_7|      1|     7|      N|      M|   1937-01-18|    F|           ACERRA|                    A024|------37-58-024-|    Ipertensione|Causa extracardio...|  2019-07-02|
|  1_17|      1|    17|      D|      R|   1932-10-24|    M|             null|                    null|------32-24-----|    Ipertension

### Drop Collection 🚩***Run it only if you want to eliminate some collection from mongo atlas***

In [None]:
manager.drop_collections("CAMPANIA_SALUTE", ["PREVALENT"])

Collection 'PREVALENT' dropped successfully.


## Data Loading in Mongo DB

We'll now load our processed datasets into MongoDB Atlas using PySpark's native connector. This efficient approach enables seamless integration between Spark DataFrames and MongoDB collections. The following configuration ensures optimal performance and reliability.

In [21]:
manager.save_to_mongo('ANAGRAFICA','CAMPANIA_SALUTE', mode="overwrite")
#manager.save_to_mongo('LISTA_EVENTI', 'CAMPANIA_SALUTE', mode="overwrite")
#manager.save_to_mongo('ANAMNESI', 'CAMPANIA_SALUTE', mode = "overwrite")

In [None]:
for name in collection_string_list:
  manager.save_to_mongo(name, "CAMPANIA_SALUTE", mode="overwrite")