In [1]:
# Create Empty RDD in PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

23/11/25 17:10:57 WARN Utils: Your hostname, ubuntu-ThinkPad-T480 resolves to a loopback address: 127.0.1.1; using 192.168.1.111 instead (on interface wlp3s0)
23/11/25 17:10:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 17:10:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, BooleanType, LongType, FloatType

patient_schema = StructType([
    StructField("patient_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("gender", StringType(), True),
    StructField("address", StringType(), True),
    StructField("phone_number", LongType(), True),
    StructField("email", StringType(), True),
    StructField("diagnosis", StringType(), True),
    StructField("admission_date", DateType(), True),
    StructField("discharged", BooleanType(), True),
    StructField("organization", StringType(), True),
    StructField("practitioner", StringType(), True)
])

organization_schema = StructType([
    StructField("organization_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("phone_number", LongType(), True),
    StructField("email", StringType(), True),
    StructField("active_status", BooleanType(), True),
    StructField("address", StringType(), True)
])

practitioner_schema = StructType([
    StructField("organization", StringType(), True),
    StructField("practitioner_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("gender", StringType(), True),
    StructField("address", StringType(), True),
    StructField("phone_number", LongType(), True),
    StructField("email", StringType(), True),
    StructField("active_status", BooleanType(), True)
])

address_schema = StructType([
    StructField("address_id", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", IntegerType(), True), 
    StructField("country", StringType(), True),
    StructField("type", StringType(), True)
])

encounter_schema = StructType([
    StructField("encounter_id", StringType(), True),
    StructField("encounter_date", DateType(), True),
    StructField("encounter_type", StringType(), True),
    StructField("prescription", StringType(), True),
    StructField("practitioner", StringType(), True),
    StructField("patient", StringType(), True)
])

diagnosis_schema = StructType([
    StructField("diagnosis_id", StringType(), True),
    StructField("patient", StringType(), True),
    StructField("encounter", StringType(), True),
    StructField("practitioner", StringType(), True),
    StructField("diagnosis_date", DateType(), True),
    StructField("diagnosis_type", StringType(), True),
    StructField("diagnosis_result", StringType(), True)
])

medicine_schema = StructType([
    StructField("medicine_id", StringType(), True),
    StructField("medicine_name", StringType(), True),
    StructField("manufacturer", StringType(), True),
    StructField("dosage_form", StringType(), True),
    StructField("strength", FloatType(), True),
    StructField("unit", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", FloatType(), True),
    StructField("expiry_date", DateType(), True)
])


patient_df = spark.read.format("csv").option("header", True).schema(patient_schema).load("patient.csv")
organization_df = spark.read.format("csv").option("header", True).schema(organization_schema).load("organization.csv")
practitioner_df = spark.read.format("csv").option("header", True).schema(practitioner_schema).load("practitioner.csv")
encounter_df = spark.read.format("csv").option("header", True).schema(encounter_schema).load("encounter.csv")
address_df = spark.read.format("csv").option("header", True).schema(address_schema).load("address.csv")
diagnosis_df = spark.read.format("csv").option("header", True).schema(diagnosis_schema).load("diagnosis.csv")
medicine_df = spark.read.format("csv").option("header", True).schema(medicine_schema).load("medicine.csv")


In [3]:
patient_df = patient_df.dropDuplicates(["patient_id"])
organization_df = organization_df.dropDuplicates(["organization_id"])
practitioner_df = practitioner_df.dropDuplicates(["practitioner_id"])
encounter_df = encounter_df.dropDuplicates(["encounter_id"])
address_df = address_df.dropDuplicates(["address_id"])
diagnosis_df = diagnosis_df.dropDuplicates(["diagnosis_id"])
medicine_df = medicine_df.dropDuplicates(["medicine_id"])

In [4]:
patient_df.write.format("csv").mode("append").option("header", True).save("patient")
organization_df.write.format("csv").mode("append").option("header", True).save("organization")
practitioner_df.write.format("csv").mode("append").option("header", True).save("practitioner")
encounter_df.write.format("csv").mode("append").option("header", True).save("encounter")
address_df.write.format("csv").mode("append").option("header", True).save("address")
diagnosis_df.write.format("csv").mode("append").option("header", True).save("diagnosis")
medicine_df.write.format("csv").mode("append").option("header", True).save("medicine")

                                                                                

In [7]:
from pyspark.sql import functions as f

male_pat = patient_df.filter(patient_df.gender == "Male")
# male_pat.show(truncate=False) # data frame
active_org = organization_df.where(organization_df.active_status == True)
# active_org.collect()  # list of organizations
email_com = practitioner_df.filter(practitioner_df.email.like("%.com%"))
# email_com.head(10) # list email ending .com using regex
enc_year = encounter_df.where(f.year(encounter_df.encounter_date) == 2020)
# enc_year.tail(10) # list encounter using encounter year

[Row(encounter_id='ENC29945', encounter_date=datetime.date(2020, 11, 28), encounter_type='Follow-up', prescription='Strategy answer trip half way American off.', practitioner='PRAC1805', patient='PAT18849'),
 Row(encounter_id='ENC29949', encounter_date=datetime.date(2020, 8, 9), encounter_type='Emergency', prescription='Seven statement create fill tree choose other.', practitioner='PRAC1194', patient='PAT17205'),
 Row(encounter_id='ENC29965', encounter_date=datetime.date(2020, 11, 4), encounter_type='Follow-up', prescription='Identify shoulder group cover.', practitioner='PRAC1810', patient='PAT14148'),
 Row(encounter_id='ENC29972', encounter_date=datetime.date(2020, 4, 10), encounter_type='Emergency', prescription='Bill relate possible clear feeling although everybody.', practitioner='PRAC1021', patient='PAT15524'),
 Row(encounter_id='ENC29978', encounter_date=datetime.date(2020, 12, 10), encounter_type='Follow-up', prescription='Another natural public know effect increase.', practiti