## Step 1: Data Ingestion
Ingest data from the two provided CSV files. One contains patient details, and the other contains appointment data.

In [9]:
from pyspark.sql import SparkSession

try:
    # Create Spark session
    spark = SparkSession.builder \
        .appName("hello_heart_data") \
        .getOrCreate()
    
    # Read the CSV files
    appointments_df = spark.read.csv('sample_data/appointment_data.csv', header=True, inferSchema=True)
    patients_df = spark.read.csv('sample_data/patient_data.csv', header=True, inferSchema=True)
    
    # Get row counts
    print(f"\nNumber of appointments: {appointments_df.count()}")
    print(f"Number of patients: {patients_df.count()}")
    
    print("\nSample Appointments:")
    appointments_df.show(5, truncate=False)
    
    print("\nSample Patients:")
    patients_df.show(25, truncate=False)
    patients_df.select("address").show(50, truncate=False)
except Exception as e:
    raise Exception("Error while loading files into dataframes: ", str(e))


Number of appointments: 1000
Number of patients: 1000

Sample Appointments:
+----------+----------------+--------------+
|patient_id|appointment_date|doctor        |
+----------+----------------+--------------+
|236       |2024-05-17      |Morgan Baker  |
|225       |2024-08-03      |Vincent Wright|
|831       |2024-07-19      |Joshua Ford   |
|116       |2024-03-02      |Michelle Hill |
|433       |2024-01-17      |Kari Morse    |
+----------+----------------+--------------+
only showing top 5 rows


Sample Patients:
+----------+-----------------+---+-------------------------------------------------------+---------------------+-------------+
|patient_id|name             |age|address                                                |phone_number         |diagnosis    |
+----------+-----------------+---+-------------------------------------------------------+---------------------+-------------+
|1         |Nicole Taylor    |36 |6377 Jennifer Trail Apt. 075, Calebside, NY 22906      |080.

## Step 2: Data Transformation (Should occur before de-identification)
Clean and transform the data. Ensure that:
- Phone numbers and addresses are in a consistent format.
- Data is deduplicated based on patient_id.
- Join the two datasets using patient_id to create a single view of the patient and their appointment history.

In [10]:
'''
Problems with patients data:
    - Phone number
        - Inconsistent use of country code
        - Inconsistent use of extension
        - Inconsistent format of country code (001 and +1)
        - Inconsistent use of spaces, dots, dashes, and parenthesis
    - Address 
        - Leading zeroes on house, apartment, and suite number which should be stripped
        - Leading zeroes on box number and zip code which should be preserved
'''

from pyspark.sql.functions import regexp_replace, concat_ws, col, when, split, length, count, countDistinct, trim

try:
    # Clean the original column to remove non-numeric characters
    patients_df = patients_df.withColumn("phone_number", regexp_replace(col("phone_number"), r"[^0-9x]", ""))
    
    # Split phone_number into core_number and extension
    split_col = split(col("phone_number"), "x")
    patients_df = patients_df.withColumn("core_number", split_col.getItem(0)) \
                             .withColumn("extension", split_col.getItem(1))
    
    # Strip country code based on the length of core_number
    patients_df = patients_df.withColumn(
        "core_number",
        when((length(col("core_number")) == 11), col("core_number").substr(2, 10))
        .when((length(col("core_number")) == 12), col("core_number").substr(3, 10))
        .when((length(col("core_number")) == 13), col("core_number").substr(4, 10))
        .otherwise(col("core_number"))
    )
    
    # Recombine core phone number and extension
    patients_df = patients_df.withColumn(
        "phone_number",
        when(col("extension") != "", concat_ws("x", col("core_number"), col("extension")))
        .otherwise(col("core_number"))
    )
    
    # Drop intermediary columns
    patients_df = patients_df.drop("core_number", "extension")
    
    # Normalize addresses
    patients_df = patients_df \
        .withColumn("address", trim(col("address"))) \
        .withColumn("address", regexp_replace(col("address"), r"\.", "")) \
        .withColumn("address", regexp_replace(col("address"), r"\bApartment\b", "Apt")) \
        .withColumn("address", regexp_replace(col("address"), r"\bStreet\b", "St")) \
        .withColumn("address", regexp_replace(col("address"), r"\bCircle\b", "Cir")) \
        .withColumn("address", regexp_replace(col("address"), r"\bCourt\b", "Ct")) \
        .withColumn("address", regexp_replace(col("address"), r"\bSuite\b", "Ste")) \
        .withColumn("address", regexp_replace(col("address"), r"\bTrail\b", "Trl")) \
        .withColumn("address", regexp_replace(col("address"), r"\bBypass\b", "Byp")) \
        .withColumn("address", regexp_replace(col("address"), r"(?<=^|,|\sApt\s|\sSte\s)0+(\d+)", r"\1")) \
        .withColumn("address", regexp_replace(col("address"), r",", ", ")) \
        .withColumn("address", regexp_replace(col("address"), r"\s+", " "))
    
    
    # Show normalized df
    patients_df.select("address").show(100, truncate=False)
    
    # No duplicates found in this file, but dupes could be found in future files.
    patients_df.select(
        count("patient_id").alias("total_patient_id"),
        countDistinct("patient_id").alias("distinct_patient_id")
    ).show()
    
    # Drop duplicates based on patient_id
    patients_df = patients_df.dropDuplicates(["patient_id"])
    
    # Join patients to appointments and create a view
    appointment_history_df = patients_df.join(appointments_df, on="patient_id", how="inner")
    appointment_history_df.createOrReplaceTempView("appointment_history")
    appointment_history_df.show(10, truncate=False)
except Exception as e:
    raise Exception("Error while normalizing and deduplicating dataframes: ", str(e))

+-----------------------------------------------------+
|address                                              |
+-----------------------------------------------------+
|6377 Jennifer Trl Apt 1, Calebside, NY 22906         |
|1 Thompson Park Ste 212, West Anitaport, NY 11534    |
|792 Mark Wells, Jaclynport, TN 90027                 |
|1 Harris Inlet, Nealton, MS 89986                    |
|150 Miranda Unions, Bradburgh, FL 64440              |
|2523 Flores Radial Ste 462, South Hannah, ND 92740   |
|3040 Eddie Byp, Lake Andrew, RI 07138                |
|USNS Thompson, FPO AP 63179                          |
|184 Soto Pines Apt 875, West Dakota, GA 96278        |
|1 Jake Locks Ste 921, Lewishaven, MT 10958           |
|18331 Mendez Pike, Reidhaven, GA 29116               |
|56674 Edwards Divide Apt 1, Kimland, MS 78734        |
|2972 Erika Rapid, Knightborough, AK 93261            |
|5790 Gregory Ville Ste 343, Lake Ryanland, NJ 30897  |
|968 Christopher Fields Ste 713, Bradleyside, MI

## Step 3: De-identification (Should occur after transformation)
De-identify sensitive patient data such as name, address, and phone_number using an anonymization technique (e.g., hashing).
Ensure that the data can still be linked across the two datasets via a secure common identifier (e.g., patient_id).

In [18]:
from pyspark.sql.functions import sha2, rand, concat, lit

try:
    # PII columns to hash
    pii_columns_patients = ['name', 'address', 'phone_number']
    
    # Add salt to original value before hashing to enhance security
    appointment_history_anon = appointment_history_df.select(
        *[
            sha2(concat(col, lit('_'), rand()), 256).alias(col) if col in pii_columns_patients 
            else col 
            for col in appointment_history_df.columns
        ]
    )
    
    print("\nSample of anonymized patients:")
    appointment_history_anon.show(10, truncate=False)
    
    # Overwrite the original df with the hashed columns
    appointment_history_df = appointment_history_anon
    print(appointment_history_df.count())
except Exception as e:
    raise Exception("Error while hashing PII columns: ", str(e))


Sample of anonymized patients:
+----------+----------------------------------------------------------------+---+----------------------------------------------------------------+----------------------------------------------------------------+------------+----------------+------------------+
|patient_id|name                                                            |age|address                                                         |phone_number                                                    |diagnosis   |appointment_date|doctor            |
+----------+----------------------------------------------------------------+---+----------------------------------------------------------------+----------------------------------------------------------------+------------+----------------+------------------+
|3         |7a2a8e1e2dfeffb512cdcd310122e9ddd8bee11f60dac80ca9381864917de285|61 |fcd1eeb21bb96f5742a5f485aef55a9db134d9a3fc24f2cf68e494887b813b0a|8bbde97c44704518d1c68b743672669d8603aec

## Step 4: Data Storage
- Set up LocalStack to simulate an S3 environment.
- Store the de-identified and transformed data as Parquet files in the LocalStack S3 bucket.
- Ensure your solution uses Docker to run the pipeline in a containerized environment.

In [19]:
import requests

# Test connection to LocalStack S3 service
url = "http://localstack:4566"
try:
    response = requests.get(url)
    if response.status_code == 200:
        print("Successfully connected to LocalStack:", response.text)
    else:
        print("Failed to connect to LocalStack. Status code:", response.status_code)
except requests.exceptions.RequestException as e:
    print("Error connecting to LocalStack: ", str(e))


Successfully connected to LocalStack: 


In [5]:
import boto3

try:
    # Connect to LocalStack S3
    s3 = boto3.client(
        "s3",
        endpoint_url="http://localstack:4566",
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1"
    )
    
    # Create the bucket
    bucket_name = "my-test-bucket"
    s3.create_bucket(Bucket=bucket_name)
    print(f"Bucket '{bucket_name}' created successfully.")
except Exception as e:
    print("Error creating bucket:", str(e))


Bucket 'my-test-bucket' created successfully.


In [6]:
import shutil
import os

try:
    # Local directory where the files are stored
    local_temp_path = "/tmp/data/"
    
    # Delete everything in /tmp/data/
    if os.path.exists(local_temp_path):
        shutil.rmtree(local_temp_path)
    
    # Write appointment history to tmp folder as parquet
    appointment_history_df.write.parquet(local_temp_path, mode='overwrite')
    
    # Get the list of parquet part files
    parquet_files = [f for f in os.listdir(local_temp_path) if f.endswith('.parquet')]
    
    # Upload each part file to S3
    for part_file in parquet_files:
        part_file_path = os.path.join(local_temp_path, part_file)
    
        # Upload the part file to S3
        s3.upload_file(part_file_path, bucket_name, part_file)
        print(f"Uploaded {part_file} to S3 bucket '{bucket_name}' with key '{part_file}'.")
    
    print(f"All part files have been successfully uploaded to S3 bucket '{bucket_name}'.")
except Exception as e:
    raise Exception("Error while uploading parquet files to LocalStack: ", str(e))

Uploaded part-00000-64463bd7-7689-45f0-a528-045e18f91de7-c000.snappy.parquet to S3 bucket 'my-test-bucket' with key 'part-00000-64463bd7-7689-45f0-a528-045e18f91de7-c000.snappy.parquet'.
All part files have been successfully uploaded to S3 bucket 'my-test-bucket'.


## Step 5: Data Join with PySpark
- Use PySpark to load the two Parquet tables (patient data and appointment data) from the LocalStack S3 bucket.
- Join the two tables on patient_id and print the resulting joined dataset.

In [7]:
# Data is already joined, pulling down from Localstack bucket and displaying...

try:
    # List all files in the S3 bucket
    response = s3.list_objects_v2(Bucket=bucket_name)
    print(response)
    
    # Filter the list of files to only include Parquet files
    parquet_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.parquet')]
    
    # Temporary directory to store the files
    temp_dir = '/tmp/parquet_files/'
    
    # Delete everything in tmp
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
    
    # Ensure the directory exists
    os.makedirs(temp_dir, exist_ok=True)
    
    # Download each Parquet file from S3 and save it locally
    for file_key in parquet_files:
        # Download the file
        file_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
        file_data = file_obj['Body'].read()
        
        # Write the file data to a local file
        local_path = os.path.join(temp_dir, os.path.basename(file_key))
        with open(local_path, 'wb') as f:
            f.write(file_data)
    
    # Now read all the Parquet files from the local directory into a single Spark DataFrame
    df = spark.read.parquet(temp_dir)
    
    # Show the DataFrame
    df.show()
    print(df.count())
    df.select("patient_id").where("patient_id = 3").show()
except Exception as e:
    raise Exception("Error while downloading parquet files from LocalStack: ", str(e))

{'ResponseMetadata': {'RequestId': '4ffa2424-e408-44a3-b26e-2a537158235a', 'HostId': 's9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'TwistedWeb/24.3.0', 'date': 'Mon, 25 Nov 2024 17:04:32 GMT', 'content-type': 'application/xml', 'content-length': '526', 'x-amz-request-id': '4ffa2424-e408-44a3-b26e-2a537158235a', 'x-amz-id-2': 's9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234='}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Contents': [{'Key': 'part-00000-64463bd7-7689-45f0-a528-045e18f91de7-c000.snappy.parquet', 'LastModified': datetime.datetime(2024, 11, 25, 17, 4, 32, tzinfo=tzlocal()), 'ETag': '"88cb04c11e3d29c5edefc7554f90c58d"', 'Size': 154736, 'StorageClass': 'STANDARD'}], 'Name': 'my-test-bucket', 'Prefix': '', 'MaxKeys': 1000, 'EncodingType': 'url', 'KeyCount': 1}
+----------+--------------------+---+--------------------+--------------------+-------------+----------------

# Cleanup

In [8]:
try:
    # Create an S3 resource pointing to LocalStack
    s3_resource = boto3.resource(
        's3',
        endpoint_url='http://localstack:4566',  # LocalStack S3 endpoint
        aws_access_key_id='test',  # Dummy credentials for LocalStack
        aws_secret_access_key='test'
    )
    
    # Reference the bucket
    bucket = s3_resource.Bucket(bucket_name)
    
    # Delete all objects
    bucket.objects.all().delete()
    
    # If versioning is enabled, delete all versions
    bucket.object_versions.all().delete()
except Exception as e:
    raise Exception("Error while truncating LocalStack bucket: ", str(e))