In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Create Delta Table') \
    .config("spark.jars", "/home/ubuntu/Downloads/postgresql-42.6.0.jar") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()



23/11/14 11:41:58 WARN Utils: Your hostname, ubuntu-ThinkPad-T480 resolves to a loopback address: 127.0.1.1; using 192.168.1.111 instead (on interface wlp3s0)
23/11/14 11:41:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d0a7a168-9dda-4af4-9fe0-0dc6e9102fad;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 192ms :: artifacts dl 11ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   

In [None]:

from pyspark.sql.types import StructType, StructField, StringType, DateType, BooleanType

# Define the schema for the data
schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("gender", StringType(), True),
    StructField("address", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("email", StringType(), True),
    StructField("diagnosis", StringType(), True),
    StructField("admission_date", DateType(), True),
    StructField("discharged", BooleanType(), True)
])
patient_df = spark.read.format("csv").option("delimeter", ",").option("header", True).schema(schema).load("data.csv")

In [None]:
patient_df.repartition(1000).rdd.getNumPartitions()

In [None]:
from pyspark.sql.functions import dayofmonth, month, year

patient_df = patient_df.withColumn("month", month("admission_date"))
patient_df = patient_df.withColumn("year", year("admission_date"))
patient_df = patient_df.withColumn("day", dayofmonth("admission_date"))

In [None]:
import random
from pyspark.sql.functions import udf

random_id_gen = udf(lambda: "PRAC"+str(random.randint(10000,11111)))
patient_df = patient_df.withColumn("practitioner", random_id_gen())
patient_df.repartition(1000).repartition("year", "month", "day", "diagnosis", "gender").write.mode(saveMode="overwrite").format("delta").option("overwriteSchema", "true").save("delta-partition/patients")


In [2]:
delta_patient = spark.read.format("delta").load("delta-partition-1000/patients")
delta_patient.schema

                                                                                

StructType([StructField('first_name', StringType(), True), StructField('last_name', StringType(), True), StructField('date_of_birth', DateType(), True), StructField('gender', StringType(), True), StructField('address', StringType(), True), StructField('phone_number', StringType(), True), StructField('email', StringType(), True), StructField('diagnosis', StringType(), True), StructField('admission_date', DateType(), True), StructField('discharged', BooleanType(), True), StructField('month', IntegerType(), True), StructField('year', IntegerType(), True), StructField('day', IntegerType(), True), StructField('practitioner', StringType(), True)])

In [6]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(spark)
sqlContext.registerDataFrameAsTable(delta_patient, "patients")

In [None]:
from pyspark.sql.types import FloatType
from faker import Faker

fake = Faker()

practitioner_schema = StructType([
    StructField("practitioner_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("specialty", StringType(), True),
    StructField("experience_years", FloatType(), True),
    StructField("location", StringType(), True),
    StructField("email", StringType(), True)
])

# Generate Fake Practitioners
num_practitioners = 10000

practitioners = []
for id,_ in enumerate(range(num_practitioners)):
    practitioner_id = "PRAC" + str(num_practitioners + id)
    name = fake.name()
    specialty = fake.random.choice(["Cardiologist", "Dermatologist", "Pediatrician", "Orthopedic Surgeon", "Psychiatrist"])
    experience_years = float(fake.random.randrange(1,20))
    address = fake.address().replace('\n', '').replace(',', '')
    email = fake.email()

    practitioners.append((practitioner_id, name, specialty, experience_years, address, email))

# Create DataFrame from practitioners
practitioner_df = spark.createDataFrame(practitioners, schema=practitioner_schema)
practitioner_df.repartition(100).write.mode(saveMode="overwrite").format("delta").save("delta-partition-1000/practitioners")

In [4]:

delta_practitioner = spark.read.format("delta").load("delta-partition-1000/practitioners")
delta_practitioner.schema

StructType([StructField('practitioner_id', StringType(), True), StructField('name', StringType(), True), StructField('specialty', StringType(), True), StructField('experience_years', FloatType(), True), StructField('location', StringType(), True), StructField('email', StringType(), True)])

In [13]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(spark)
sqlContext.registerDataFrameAsTable(delta_practitioner, "practitioners")

In [7]:
import time 

s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE admission_date REGEXP '01-21'")
print(time.time() - s)

0.18877696990966797


In [8]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE admission_date REGEXP '2020-01-21' and diagnosis='Fever' and gender='Male' and first_name='Steven'")
print(time.time() - s)

0.07281756401062012


In [9]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE admission_date REGEXP '01-21' and diagnosis='Fever' and gender='Male'")
print(time.time() - s)

0.024633169174194336


In [10]:
s = time.time()
sqlContext.sql("SELECT diagnosis, COUNT(diagnosis) as diagnosis_count FROM patients GROUP BY diagnosis")
print(time.time() - s)

0.07954192161560059


In [11]:
s = time.time()
spark.sql("SELECT day, COUNT(day) as day_count FROM patients GROUP BY day")
print(time.time() - s)

0.029010295867919922


In [14]:
s = time.time()
sqlContext.sql("SELECT * FROM patients INNER JOIN practitioners ON practitioners.practitioner_id=patients.practitioner")
print(time.time() - s)

0.03674674034118652


In [15]:
s = time.time()
sqlContext.sql("SELECT * FROM patients;")
print(time.time() - s)

0.015388727188110352


In [16]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners;")
print(time.time() - s)

0.01124429702758789


In [17]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE diagnosis = 'Headache';")
print(time.time() - s)

0.021991968154907227


In [18]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE experience_years > 10;")
print(time.time() - s)

0.02054286003112793


In [19]:
s = time.time()
sqlContext.sql("SELECT gender, COUNT(*) FROM patients GROUP BY gender;")
print(time.time() - s)

0.022705793380737305


In [20]:
s = time.time()
sqlContext.sql("SELECT specialty, AVG(experience_years) FROM practitioners GROUP BY specialty;")
print(time.time() - s)

0.02323174476623535


In [21]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE admission_date < '2023-01-01';")
print(time.time() - s)

0.01984429359436035


In [22]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE location LIKE '%City%';")
print(time.time() - s)

0.02972102165222168


In [23]:
s = time.time()
sqlContext.sql("SELECT diagnosis, COUNT(*) FROM patients GROUP BY diagnosis;")
print(time.time() - s)

0.011874198913574219


In [24]:
s = time.time()
sqlContext.sql("SELECT * FROM patients p JOIN practitioners pr ON p.address = pr.location;")
print(time.time() - s)

0.023538589477539062


In [25]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE MONTH(admission_date) = 5;")
print(time.time() - s)

0.030556201934814453


In [26]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE experience_years BETWEEN 5 AND 15;")
print(time.time() - s)

0.01771998405456543


In [27]:
s = time.time()
sqlContext.sql("SELECT * FROM patients p JOIN practitioners pr ON p.diagnosis = pr.specialty;")
print(time.time() - s)

0.018644332885742188


In [30]:
s = time.time()
sqlContext.sql("SELECT YEAR(admission_date), COUNT(*) FROM patients GROUP BY admission_date;")
print(time.time() - s)

0.030059337615966797


In [31]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners ORDER BY experience_years DESC LIMIT 1;")
print(time.time() - s)

0.034102678298950195


In [32]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE discharged = false;")
print(time.time() - s)

0.01971578598022461


In [33]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE email LIKE '%.com';")
print(time.time() - s)

0.017071962356567383


In [35]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE practitioner = '123';")
print(time.time() - s)

0.015531063079833984


In [36]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE specialty = 'Cardiologist' AND location = 'Hospital A';")
print(time.time() - s)

0.038361310958862305


In [37]:
s = time.time()
sqlContext.sql("SELECT * FROM patients p JOIN practitioners pr ON p.gender = pr.name;")
print(time.time() - s)

0.017200231552124023


In [38]:
s = time.time()
sqlContext.sql("SELECT * FROM patients INNER JOIN practitioners ON patients.practitioner = practitioners.practitioner_id;")
print(time.time() - s)

0.017252445220947266


In [None]:
s = time.time()
sqlContext.sql("SELECT * FROM patients RIGHT JOIN practitioners ON patients.practitioner_id = practitioners.practitioner_id;")
print(time.time() - s)

In [39]:
s = time.time()
sqlContext.sql("SELECT * FROM patients FULL OUTER JOIN practitioners ON patients.practitioner = practitioners.practitioner_id;")
print(time.time() - s)

0.018781185150146484


In [40]:
s = time.time()
sqlContext.sql("SELECT * FROM patients CROSS JOIN practitioners;")
print(time.time() - s)

0.016555309295654297


In [41]:
s = time.time()
sqlContext.sql("SELECT * FROM patients p1 INNER JOIN patients p2 ON p1.practitioner = p2.practitioner AND p1.first_name != p2.first_name;")
print(time.time() - s)

0.02830195426940918


In [43]:
s = time.time()
sqlContext.sql("SELECT * FROM patients JOIN practitioners ON patients.practitioner = practitioners.practitioner_id WHERE patients.first_name = practitioners.name;")
print(time.time() - s)

0.020437955856323242


In [44]:
s = time.time()
sqlContext.sql("SELECT practitioners.practitioner_id, practitioners.name, COUNT(*) AS patient_count FROM practitioners LEFT JOIN patients ON practitioners.practitioner_id = patients.practitioner GROUP BY practitioners.practitioner_id, practitioners.name;")
print(time.time() - s)

0.017418384552001953


In [45]:
s = time.time()
sqlContext.sql("SELECT * FROM patients JOIN practitioners ON patients.practitioner = practitioners.practitioner_id WHERE patients.address = practitioners.location AND patients.diagnosis = practitioners.specialty;")
print(time.time() - s)

0.022181034088134766


In [46]:
s = time.time()
sqlContext.sql("SELECT * FROM patients LEFT JOIN practitioners ON patients.practitioner = practitioners.practitioner_id WHERE practitioners.practitioner_id IS NULL;")
print(time.time() - s)

0.014779090881347656


In [47]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE diagnosis = 'Headache' AND admission_date > '2023-01-01';")
print(time.time() - s)

0.017317533493041992


In [48]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE location = 'City A' AND experience_years > 10;")
print(time.time() - s)

0.008323907852172852


In [49]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE gender = 'Male' AND diagnosis = 'Back Pain';")
print(time.time() - s)

0.008826017379760742


In [50]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE specialty = 'Cardiologist' AND experience_years BETWEEN 5 AND 15;")
print(time.time() - s)

0.013671636581420898


In [51]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE MONTH(admission_date) = 3 AND discharged = true;")
print(time.time() - s)

0.01461338996887207


In [52]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE email LIKE '%example.com' AND location = 'Hospital B';")
print(time.time() - s)

0.016833066940307617


In [53]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE diagnosis = 'Cough' OR admission_date > '2023-02-01';")
print(time.time() - s)

0.01139068603515625


In [54]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE specialty = 'Orthopedic Surgeon' OR experience_years > 20;")
print(time.time() - s)

0.010317087173461914


In [56]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE diagnosis = 'Fever' AND practitioner = '123';")
print(time.time() - s)

0.013528823852539062


In [57]:
s = time.time()
sqlContext.sql("SELECT * FROM practitioners WHERE experience_years > 10 AND location = 'Hospital C';")
print(time.time() - s)

0.011412620544433594


In [59]:
s = time.time()
sqlContext.sql("SELECT * FROM patients WHERE diagnosis REGEXP 'Cough' OR practitioner IN (SELECT practitioner_id FROM practitioners WHERE specialty REGEXP 'Orthopedic');")
print(time.time() - s)

0.03357577323913574
