### Load Data from Blob Storage

In [None]:
containerName = "YOUR_BLOB_CONTAINER_NAME"
storageAccountName = "YOUR_STORAGE_ACCOUNT_NAME"
# mountName is the parent folder path. Use the cell below to get the path to current notebook. And use the path without "CSV-FHIR-Converter-Final"
mountName = "PARENT_FOLDER_PATH_FOR_THIS_NOTEBOOK"
confKey = f"fs.azure.account.key.{storageAccountName}.blob.core.windows.net"
# scopeName and keyName are for Storage Account Key, which is stored as databricks secrets. 
# Follow instructions here https://docs.databricks.com/security/secrets/example-secret-workflow.html
scopeName = "DATABRICKS_SECRET_SCOPE"
keyName = "DATABRICKS_SECRET_NAME"

In [None]:
%scala
// Get path to the current notebook
dbutils.notebook.getContext.notebookPath

In [None]:
# Mount blob storage, only needed for the first time
dbutils.fs.mount(
  source = f"wasbs://{containerName}@{storageAccountName}.blob.core.windows.net",
  mount_point = f"/mnt/{mountName}",
  extra_configs = {f"{confKey}":dbutils.secrets.get(scope = f"{scopeName}", key = f"{keyName}")})

In [None]:
fileName = "FILENAME"
df = spark.read.csv(f"/mnt/{mountName}/{fileName}", header=True)

### Explore Data

In [None]:
display(df)

In [None]:
# Get the dimensions of data
print((df.count(), len(df.columns)))

In [None]:
# Get the schema of dataframe
df.printSchema()

In [None]:
# Get basic stats of df
df.describe().show()

### FHIR

#### FHIR Set up and Configuration

1. Sign in as **Global administrator**
2. Follow the steps in [Register a public client application in Azure Active Directory](https://docs.microsoft.com/en-us/azure/healthcare-apis/register-public-azure-ad-client-app)
  1. Use "https://www.getpostman.com/oauth2/callback" as the redirect URL. This will enable you to test the app with Postman.
3. Follow the steps in [Register a confidential client application in Azure Active Directory](https://docs.microsoft.com/en-us/azure/healthcare-apis/register-confidential-azure-ad-client-app)
4. Grant API permissions
  1. Go to Azure API for FHIR management portal
  2. Click on Access Control (IAM), and add role assignment
  3. Select **FHIR Data Writer** as the role, and Assign Access to **Azure AD user, group, or service principal**
  4. Search the app you just registered in Step 2, and click save.

#### FHIR Exploration

In [None]:
import requests
import pprint
import datetime
import json

In [None]:
# Registered App Name in Step 2
azureADAppName = "REGISTERED_AZURE_AD_APPNAME"

# Can be found on the app's page from Azure AD
applicationClientId = "APPLICATION_CLIENT_ID"
directoryTenantId = "DIRECTORY_TENANT_ID"

# The secret created for the app in Step 3
azureADAppSecretName = "REGISTERED_APP_SECRET_NAME"
# It is encouraged to store this secret as databricks secret as well. See the instructions above. 
azureADAppSecret = "REGISTERED_APP_SECRET_VALUE"

# Endpoint without /metadata
fhirEndPoint = "FHIR_END_POINT"

##### Functions

In [None]:
def get_access_token():
  token_url = f"https://login.microsoftonline.com/{directoryTenantId}/oauth2/token"

  token_data = {
  'grant_type': 'client_credentials',
  'client_id': applicationClientId,
  'client_secret': azureADAppSecret,
  'resource': fhirEndPoint
  }

  token_r = requests.post(token_url, data=token_data)

  print("Retrieving Access Token")
  if token_r.status_code == 200:
      print("Access Token Retrieved Successfully")
  else:
      raise Exception("Error retrieving access token")

  return token_r.json()["access_token"]

In [None]:
def insert_patient_record(payload, token):
    print("Inserting Patient Record")
    headers = {
        'Authorization': 'Bearer {}'.format(token),
        'Content-Type': 'application/json'
    }
    response = requests.request("POST", fhirEndPoint+"/Patient", headers=headers, data=payload)
    print("Response Code: ", response.status_code)
    if response.status_code == 201:
        print("Patient Record inserted Successfully")
#         print("Response Text: ", response.text)
        return response.status_code
    else:
#         print("Response Text: ", response.text)
        print("Error inserting patient record")
        return 0

In [None]:
def get_all_patients(token):
    print("Retrieving Patient Record")
    headers = {
        'Authorization': 'Bearer {}'.format(token),
        'Content-Type': 'application/json'
    }
    response = requests.request("GET", fhirEndPoint+"/Patient", headers=headers)
    print("Response Code: ", response.status_code)
    if response.status_code == 200:
        print("Retrieved all patients")
#         print("Response Text: ", response.text)
        return response.text
    else:
#         print("Response Text: ", response.text)
        raise Exception("Error retrieving all patient records")

In [None]:
def delete_patient(resource_id, token):
    print("Deleting Patient Record")
    headers = {
        'Authorization': 'Bearer {}'.format(token),
        'Content-Type': 'application/json'
    }
    response = requests.request("DELETE", fhirEndPoint+"/Patient/"+resource_id, headers=headers)
    print("Response Code: ", response.status_code)
    if response.status_code == 204:
        print("Deleted patient")
#         print("Response Text: ", response.text)
        return response
    else:
#         print("Response Text: ", response.text)
        print(response.status_code)
        raise Exception("Error deleting patient records")

In [None]:
def delete_all_patients(patients, token):
  patients = json.loads(patients)
  for entry in patients['entry']:
    id = entry['resource']['id']
    delete_patient(id, token)

In [None]:
def get_patient_count(token):
    headers = {
        'Authorization': 'Bearer {}'.format(token),
        'Content-Type': 'application/json'
    }
    response = requests.request("GET", fhirEndPoint+"/Patient?_summary=count", headers=headers)
    print("Response Code: ", response.status_code)
    if response.status_code == 200:
        print("Response Text: ", response.text)
        return response.text
    else:
        print("Response Text: ", response.text)
        raise Exception("Error getting patient count")

##### Patient Example

In [None]:
def insert_all_patients_spark(DATE_OF_SURGERY, AGE_AT_SURGERY, PAT_MRN_ID, PAT_LAST_NAME, PAT_FIRST_NAME, GENDER):
  date_of_surgery = datetime.datetime.strptime(DATE_OF_SURGERY, "%m/%d/%Y %H:%M")
  birthYear = date_of_surgery.year - int(AGE_AT_SURGERY)
  patientRecord = {
    "resourceType": "Patient",
    "active": True,
    "identifier": [
      {
        "use": "official",
        "value": PAT_MRN_ID
      }
    ],
    "name": [
      {
        "use": "official",
        "family": PAT_LAST_NAME,
        "given": [
          PAT_FIRST_NAME
        ]
      }
    ],
    "gender": GENDER.lower(),
    "birthDate": birthYear
      }
  patientRecordStr = json.dumps(patientRecord)
  response = insert_patient_record(patientRecordStr, token)
  return response

In [None]:
def insert_all_patients(row):
  print(row[0])
  date_of_surgery = datetime.datetime.strptime(row["DATE_OF_SURGERY"], "%m/%d/%Y %H:%M")
  birthYear = date_of_surgery.year - int(row["AGE_AT_SURGERY"])
  patientRecord = {
    "resourceType": "Patient",
    "active": True,
    "identifier": [
      {
        "use": "official",
        "value": row["PAT_MRN_ID"]
      }
    ],
    "name": [
      {
        "use": "official",
        "family": row["PAT_LAST_NAME"],
        "given": [
          row["PAT_FIRST_NAME"]
        ]
      }
    ],
    "gender": row["GENDER"].lower(),
    "birthDate": birthYear
      }
  patientRecordStr = json.dumps(patientRecord)
  response = insert_patient_record(patientRecordStr, token)
  return reponse.status_code

In [None]:
token = get_access_token()

In [None]:
get_patient_count(token)

##### Parallel Option

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
insert_all_patients_spark_udf = udf(insert_all_patients_spark, IntegerType())
df_with_result = df.withColumn("results", insert_all_patients_spark_udf(df["DATE_OF_SURGERY"], df["AGE_AT_SURGERY"], df["PAT_MRN_ID"], df["PAT_LAST_NAME"], df["PAT_FIRST_NAME"], df["GENDER"]))

In [None]:
display(df_with_result)

##### Non-parallel option

In [None]:
token = get_access_token()
for row in df.rdd.collect():
  date_of_surgery = datetime.datetime.strptime(row["DATE_OF_SURGERY"], "%m/%d/%Y %H:%M")
  birthYear = date_of_surgery.year - int(row["AGE_AT_SURGERY"])
  patientRecord = {
    "resourceType": "Patient",
    "active": True,
    "identifier": [
      {
        "use": "official",
        "value": row["PAT_MRN_ID"]
      }
    ],
    "name": [
      {
        "use": "official",
        "family": row["PAT_LAST_NAME"],
        "given": [
          row["PAT_FIRST_NAME"]
        ]
      }
    ],
    "gender": row["GENDER"].lower(),
    "birthDate": birthYear
      }
  print(patientRecord)
  patientRecordStr = json.dumps(patientRecord)
  insert_patient_record(patientRecordStr, token)

##### Testing and clean up

In [None]:
patients = get_all_patients(token)

In [None]:
pprint.pprint(json.loads(patients))

In [None]:
while True:
  patients = get_all_patients(token)
  try:
    delete_all_patients(patients, token)
  except KeyError:
    print("Deletion complete")
    break

### Appendix

Useful sites:

1. https://docs.microsoft.com/en-us/azure/healthcare-apis/tutorial-web-app-fhir-server

2. https://stackoverflow.com/questions/61299696/get-access-token-oauth2-azure-api-for-fhir-python

3. https://cuteprogramming.wordpress.com/2020/07/06/authentication-of-azure-api-for-fhir-and-the-import-of-patient-info-with-azure-function/

4. https://docs.microsoft.com/en-us/azure/healthcare-apis/register-service-azure-ad-client-app

5. https://stackoverflow.com/questions/61323003/403-authorization-error-oauth2-0-access-token-azure-api-for-fhir?noredirect=1&lq=1

6. https://docs.microsoft.com/en-us/azure/healthcare-apis/find-identity-object-ids