In [1]:
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
from pathlib import Path

In [3]:
spark = SparkSession.builder.appName("HealthcareAnalysis").getOrCreate()


In [4]:
def read_csv_folders(folder_path, file_type):
    all_files = list(Path(folder_path).glob("*.csv"))
    dfs = []
    
    for file in all_files:
        pdf = pd.read_csv(file)
        pdf = pdf.loc[:, ~pdf.columns.str.contains('Unnamed')]
        pdf['year'] = file.stem
        pdf['patient_type'] = file_type
        dfs.append(pdf)
    combine = pd.concat(dfs, ignore_index=True)

    return spark.createDataFrame(combine)

In [5]:
inpatient_df = read_csv_folders('../datasets/Inpatient','inpatient')
outpatient_df = read_csv_folders('../datasets/Outpatient', 'outpatient')
mapping_df = spark.createDataFrame(pd.read_csv('../datasets/Mapping_Specialty.csv'))




In [6]:
inpatient_df.show()

+------------+--------------+--------------------+---------+-----------+-----------+-------------+-----+----------+------------+
|Archive_Date|Specialty_HIPE|      Specialty_Name|Case_Type|Adult_Child|Age_Profile|   Time_Bands|Total|      year|patient_type|
+------------+--------------+--------------------+---------+-----------+-----------+-------------+-----+----------+------------+
|  31-01-2018|             0|Small Volume Spec...|Inpatient|      Child|       0-15|   6-9 Months|    1|IN_WL 2018|   inpatient|
|  31-01-2018|             0|Small Volume Spec...|Inpatient|      Child|      16-64|  9-12 Months|    1|IN_WL 2018|   inpatient|
|  31-01-2018|           400|       Endocrinology| Day Case|      Child|       0-15|   3-6 Months|    1|IN_WL 2018|   inpatient|
|  31-01-2018|           400|       Endocrinology| Day Case|      Child|       0-15| 12-15 Months|    1|IN_WL 2018|   inpatient|
|  31-01-2018|           600|Otolaryngology (ENT)| Day Case|      Child|       0-15|   0-3 Months

In [7]:
inpatient_df.head(3)

[Row(Archive_Date='31-01-2018', Specialty_HIPE=0, Specialty_Name='Small Volume Specialities', Case_Type='Inpatient', Adult_Child='Child', Age_Profile='0-15', Time_Bands='  6-9 Months', Total=1, year='IN_WL 2018', patient_type='inpatient'),
 Row(Archive_Date='31-01-2018', Specialty_HIPE=0, Specialty_Name='Small Volume Specialities', Case_Type='Inpatient', Adult_Child='Child', Age_Profile='16-64', Time_Bands='  9-12 Months', Total=1, year='IN_WL 2018', patient_type='inpatient'),
 Row(Archive_Date='31-01-2018', Specialty_HIPE=400, Specialty_Name='Endocrinology', Case_Type='Day Case', Adult_Child='Child', Age_Profile='0-15', Time_Bands='  3-6 Months', Total=1, year='IN_WL 2018', patient_type='inpatient')]

In [8]:
inpatient_df.printSchema()

root
 |-- Archive_Date: string (nullable = true)
 |-- Specialty_HIPE: long (nullable = true)
 |-- Specialty_Name: string (nullable = true)
 |-- Case_Type: string (nullable = true)
 |-- Adult_Child: string (nullable = true)
 |-- Age_Profile: string (nullable = true)
 |-- Time_Bands: string (nullable = true)
 |-- Total: long (nullable = true)
 |-- year: string (nullable = true)
 |-- patient_type: string (nullable = true)



In [9]:
outpatient_df.printSchema()

root
 |-- Archive_Date: string (nullable = true)
 |-- Specialty_HIPE: double (nullable = true)
 |-- Speciality: string (nullable = true)
 |-- Adult_Child: string (nullable = true)
 |-- Age_Profile: string (nullable = true)
 |-- Time_Bands: string (nullable = true)
 |-- Total: long (nullable = true)
 |-- year: string (nullable = true)
 |-- patient_type: string (nullable = true)



In [10]:
#thing to remeber that out and in are diffrent in term of number of column
# the coulumn 'Case type' is missing in the out
# that an issue only if i want to union these tables because i'll need to drop this coulum
# and also unite the names of the columns like 'Speciality' and 'Speciality_name'

In [11]:
outpatient_df[['year','Speciality']].show()

+----------+-----------+
|      year| Speciality|
+----------+-----------+
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018| Cardiology|
|Op_WL 2018|Dermatology|
|Op_WL 2018|Dermatology|
|Op_WL 2018|Dermatology|
|Op_WL 2018|Dermatology|
|Op_WL 2018|Dermatology|
|Op_WL 2018|Dermatology|
+----------+-----------+
only showing top 20 rows


In [12]:
inpatient_df.select('case_type','Total').show()

+---------+-----+
|case_type|Total|
+---------+-----+
|Inpatient|    1|
|Inpatient|    1|
| Day Case|    1|
| Day Case|    1|
| Day Case|   14|
| Day Case|    2|
| Day Case|    1|
| Day Case|    2|
| Day Case|    2|
|Inpatient|   44|
|Inpatient|   12|
|Inpatient|    5|
|Inpatient|    2|
|Inpatient|    1|
| Day Case|    1|
|Inpatient|    1|
|Inpatient|    1|
|Inpatient|    1|
| Day Case|    1|
|Inpatient|    2|
+---------+-----+
only showing top 20 rows


In [13]:
outpatient_df.select('Total','Specialty_HIPE').describe().show()

+-------+------------------+--------------+
|summary|             Total|Specialty_HIPE|
+-------+------------------+--------------+
|  count|            270983|        270983|
|   mean| 80.21071063498448|           NaN|
| stddev|148.61537343757712|           NaN|
|    min|                 1|           0.0|
|    max|              4239|           NaN|
+-------+------------------+--------------+



In [14]:
outpatient_df[outpatient_df['Specialty_HIPE']> 500].show()

+------------+--------------+--------------------+-----------+-----------+------------+-----+----------+------------+
|Archive_Date|Specialty_HIPE|          Speciality|Adult_Child|Age_Profile|  Time_Bands|Total|      year|patient_type|
+------------+--------------+--------------------+-----------+-----------+------------+-----+----------+------------+
|  31-01-2018|         600.0|Otolaryngology (ENT)|      Child|       0-15|  0-3 Months|  467|Op_WL 2018|  outpatient|
|  31-01-2018|         600.0|Otolaryngology (ENT)|      Child|       0-15|  3-6 Months|  365|Op_WL 2018|  outpatient|
|  31-01-2018|         600.0|Otolaryngology (ENT)|      Child|       0-15|  6-9 Months|  443|Op_WL 2018|  outpatient|
|  31-01-2018|         600.0|Otolaryngology (ENT)|      Child|       0-15| 9-12 Months|  486|Op_WL 2018|  outpatient|
|  31-01-2018|         600.0|Otolaryngology (ENT)|      Child|       0-15|12-15 Months|  364|Op_WL 2018|  outpatient|
|  31-01-2018|         600.0|Otolaryngology (ENT)|      

In [15]:
from pyspark.sql.functions import col, isnan, when, count

In [16]:
outpatient_df.select([
    count(when(isnan(c) | col(c).isNull(), c )).alias(c)
    for c in ['Total','Specialty_HIPE']
]).show()

+-----+--------------+
|Total|Specialty_HIPE|
+-----+--------------+
|    0|           191|
+-----+--------------+



In [17]:
for c in outpatient_df['Total','Specialty_HIPE']:
    print(type(c))

<class 'pyspark.sql.classic.column.Column'>
<class 'pyspark.sql.classic.column.Column'>


In [18]:
panage = outpatient_df.select('Age_Profile')

In [19]:
panage = panage.toPandas()

In [20]:
panage = panage['Age_Profile'].tolist()

In [21]:
panage = set(panage)

In [22]:
panage
#here we have extra space we have to left trim it

{' 0-15', '0-15', '16-64', '65+', 'nan'}

In [23]:
outpatient_df[outpatient_df['Age_Profile'] == 'nan' ].count()

175

In [24]:
testingADD = outpatient_df

In [25]:
testingADD = testingADD.withColumn('specialaty plus',col('Specialty_HIPE') + 5000)

In [26]:
testingADD.select('specialaty plus').show()

+---------------+
|specialaty plus|
+---------------+
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5100.0|
|         5300.0|
|         5300.0|
|         5300.0|
|         5300.0|
|         5300.0|
|         5300.0|
+---------------+
only showing top 20 rows


In [27]:
outpatient_df.select('Adult_child').distinct().show()

+-----------+
|Adult_child|
+-----------+
|      Adult|
|      Child|
|           |
+-----------+



In [28]:
testingADD.printSchema()

root
 |-- Archive_Date: string (nullable = true)
 |-- Specialty_HIPE: double (nullable = true)
 |-- Speciality: string (nullable = true)
 |-- Adult_Child: string (nullable = true)
 |-- Age_Profile: string (nullable = true)
 |-- Time_Bands: string (nullable = true)
 |-- Total: long (nullable = true)
 |-- year: string (nullable = true)
 |-- patient_type: string (nullable = true)
 |-- specialaty plus: double (nullable = true)



In [29]:
testingADD = testingADD.drop('specialaty plus')

In [30]:
testingADD.printSchema()

root
 |-- Archive_Date: string (nullable = true)
 |-- Specialty_HIPE: double (nullable = true)
 |-- Speciality: string (nullable = true)
 |-- Adult_Child: string (nullable = true)
 |-- Age_Profile: string (nullable = true)
 |-- Time_Bands: string (nullable = true)
 |-- Total: long (nullable = true)
 |-- year: string (nullable = true)
 |-- patient_type: string (nullable = true)



In [31]:
testingADD = testingADD.withColumnRenamed('Specialty_HIPE','Specialty')

In [32]:
Noofnull = testingADD.select([count(when(isnan('Specialty') | col('Specialty').isNull(), col('Specialty'))).alias('Specialty')
                   ])

In [33]:
Noofnull.show()

+---------+
|Specialty|
+---------+
|      191|
+---------+



In [34]:
Noofnull.show()

+---------+
|Specialty|
+---------+
|      191|
+---------+



In [35]:
retrived = Noofnull.collect()

In [36]:
type(retrived[0][0])

int

In [37]:
allv = testingADD.count()

In [38]:
nanPer = float(retrived[0][0]) / allv

In [39]:
nanPer = f'{nanPer:.3%}'

In [40]:
nanPer
#so for the nulls in one column it represent 0.07% from the entire dataset
# i will collect all the nulls in other columns and combine it to see together how much they represent


'0.070%'

In [41]:
testingADD.show(1)

+------------+---------+----------+-----------+-----------+-----------+-----+----------+------------+
|Archive_Date|Specialty|Speciality|Adult_Child|Age_Profile| Time_Bands|Total|      year|patient_type|
+------------+---------+----------+-----------+-----------+-----------+-----+----------+------------+
|  31-01-2018|    100.0|Cardiology|      Child|       0-15| 0-3 Months|  167|Op_WL 2018|  outpatient|
+------------+---------+----------+-----------+-----------+-----------+-----+----------+------------+
only showing top 1 row


In [45]:
testingADD.select([
    count(when(isnan(c) | col(c).isNull(), c )).alias(c)
    for c in ['Speciality','Archive_Date','Total','Specialty','Adult_Child','Age_Profile','Time_Bands','year','patient_type']
]).show()

{"ts": "2026-02-11 13:22:39.311", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[CAST_INVALID_INPUT] The value 'Rheumatology' of the type \"STRING\" cannot be cast to \"DOUBLE\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018", "context": {"file": "java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java", "line": "104)", "fragment": "isnan", "errorClass": "CAST_INVALID_INPUT"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o652.showString.\n: org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'Rheumatology' of the type \"STRING\" cannot be cast to \"DOUBLE\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018\n== DataFrame ==

NumberFormatException: [CAST_INVALID_INPUT] The value 'Rheumatology' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"isnan" was called from
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)


In [54]:
testingADD.select('*',isnan('Speciality')).show()

{"ts": "2026-02-11 13:29:06.856", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[CAST_INVALID_INPUT] The value 'Cardiology' of the type \"STRING\" cannot be cast to \"DOUBLE\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018", "context": {"file": "java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java", "line": "104)", "fragment": "isnan", "errorClass": "CAST_INVALID_INPUT"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o860.showString.\n: org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'Cardiology' of the type \"STRING\" cannot be cast to \"DOUBLE\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018\n== DataFrame ==\n\"

NumberFormatException: [CAST_INVALID_INPUT] The value 'Cardiology' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"isnan" was called from
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)


In [None]:
# so i have to check first if the column is string or other because if it's string only null can be used other than tha

In [56]:
pdtest = testingADD.toPandas()

In [59]:
pdtest['Speciality'].isna().unique()

array([False])

In [67]:
pdtest[pdtest.isnull().any(axis=1)]

Unnamed: 0,Archive_Date,Specialty,Speciality,Adult_Child,Age_Profile,Time_Bands,Total,year,patient_type
1281,31-01-2018,,Other,Adult,16-64,0-3 Months,212,Op_WL 2018,outpatient
1282,31-01-2018,,Other,Adult,16-64,3-6 Months,181,Op_WL 2018,outpatient
1283,31-01-2018,,Other,Adult,16-64,6-9 Months,26,Op_WL 2018,outpatient
1284,31-01-2018,,Other,Adult,16-64,15-18 Months,1,Op_WL 2018,outpatient
1285,31-01-2018,,Other,Adult,65+,0-3 Months,76,Op_WL 2018,outpatient
...,...,...,...,...,...,...,...,...,...
70044,30-11-2018,,Other,Adult,65+,0-3 Months,1,Op_WL 2018,outpatient
75096,30-11-2018,,Other,Adult,16-64,0-3 Months,1,Op_WL 2018,outpatient
76914,31-12-2018,,Other,Adult,65+,0-3 Months,1,Op_WL 2018,outpatient
79742,31-12-2018,,Other,Adult,16-64,0-3 Months,1,Op_WL 2018,outpatient


In [68]:
pdtest.dropna(inplace=True)

In [71]:
pdtest[pdtest.isnull().any(axis=1)]

Unnamed: 0,Archive_Date,Specialty,Speciality,Adult_Child,Age_Profile,Time_Bands,Total,year,patient_type


In [74]:
pdtest['Age_Profile'] = pdtest['Age_Profile'].str.lstrip()

In [75]:
pdtest['Age_Profile'].unique()

<ArrowStringArray>
['0-15', '16-64', 'nan', '65+']
Length: 4, dtype: str

In [85]:
pdtest['Age_Profile'] = pdtest[pdtest['Age_Profile'] == 'nan']['Age_Profile'].astype(float)



In [88]:
spark.stop()