# Hospital Incident Data Analysis with PySpark Structured Streaming
## Project Overview
This notebook demonstrates how to analyze streaming hospital incident data using PySpark’s Structured Streaming. Hospitals generate large volumes of data daily, including records of various incidents across departments. Analyzing this data in real-time can help hospital management quickly identify high-incident departments and take preventative actions or allocate resources more efficiently.

## Objectives
In this analysis, we aim to:

* Continuously monitor and display the number of incidents by department (e.g., Cardiology, Neurology).
* Identify and display the two years with the highest number of incidents, providing insights into trends over time.
## Data Format
The data is streamed as CSV files with the following structure:

* Id: Unique identifier for each incident.
* Title: Brief title of the incident.
* Description: Detailed description of the incident.
* Service: Hospital department where the incident occurred.
* Date: Date of the incident.

#  Initialize Spark Session and Set Up Streaming Source


In [3]:
#Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, col, count


In [4]:
#Create Spark Session
spark = SparkSession.builder.appName("HospitalIncidents").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


In [14]:
import os
import pandas as pd

# Ensure the static data folder exists
static_data_folder = "../data/static/"
os.makedirs(static_data_folder, exist_ok=True)

# Sample data for the CSV file
data = {
    "Id": [1, 2, 3, 4, 5],
    "titre": ["Incident 1", "Incident 2", "Incident 3", "Incident 4", "Incident 5"],
    "description": ["Description 1", "Description 2", "Description 3", "Description 4", "Description 5"],
    "service": ["Emergency", "Pediatrics", "Radiology", "Emergency", "Pediatrics"],
    "date": ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04", "2023-01-05"]
}

# Create a DataFrame and save it as a CSV file
df = pd.DataFrame(data)
df.to_csv(os.path.join(static_data_folder, "incoming.csv"), index=False)
print("Static CSV file created successfully.")


Static CSV file created successfully.


# Calculate Incidents by Service in Real-Time

Use a Python script or notebook cell to copy incoming.csv into the  folder at intervals, so that Spark detects it as a new file each time.
This way, the Structured Streaming application can keep processing each new "streaming" event as if they’re fresh records.

In [26]:
import shutil
import uuid
import os
import time

# Define the source and target paths
source_file = "../data/static/incoming.csv"  # Your static CSV file
target_folder = "../data/incoming/"  # Folder to copy files to



# Check if the source file exists
if not os.path.exists(source_file):
    print(f"Source file does not exist: {source_file}")
else:
    print(f"Source file exists: {source_file}")

# Ensure target folder exists
if not os.path.exists(target_folder):
    print(f"Target folder does not exist: {target_folder}")
else:
    print(f"Target folder exists: {target_folder}")

# Copy files to the target directory
for _ in range(10):  # Adjust the range for more iterations if needed
    target_file = os.path.join(target_folder, f"incoming_{uuid.uuid4()}.csv")
    try:
        shutil.copy(source_file, target_file)
        print(f"Copied file to {target_file}")
        
        # Check if the file exists after copying
        if os.path.exists(target_file):
            print(f"Confirmed: {target_file} exists.")
        else:
            print(f"Warning: {target_file} does not exist after copying.")
        
    except Exception as e:
        print(f"Error copying file: {e}")
    
    time.sleep(1)  # Wait 1 second before copying the next file

# Final check of files in the target directory
print("Files in target directory after copying:")
print(os.listdir(target_folder))


Source file exists: ../data/static/incoming.csv
Target folder exists: ../data/incoming/
Copied file to ../data/incoming/incoming_72bf1108-ccfa-4987-b8a9-18365efbe9c6.csv
Confirmed: ../data/incoming/incoming_72bf1108-ccfa-4987-b8a9-18365efbe9c6.csv exists.
Copied file to ../data/incoming/incoming_d5dd9553-2a1c-4544-8e51-2553de009401.csv
Confirmed: ../data/incoming/incoming_d5dd9553-2a1c-4544-8e51-2553de009401.csv exists.
Copied file to ../data/incoming/incoming_bcef30fc-736d-4934-b7fd-925a4184116e.csv
Confirmed: ../data/incoming/incoming_bcef30fc-736d-4934-b7fd-925a4184116e.csv exists.
Copied file to ../data/incoming/incoming_b19f733c-d169-42c1-b8b5-6100b1550a4f.csv
Confirmed: ../data/incoming/incoming_b19f733c-d169-42c1-b8b5-6100b1550a4f.csv exists.
Copied file to ../data/incoming/incoming_bd2949e1-d6bb-470b-85e0-ed0dedd920eb.csv
Confirmed: ../data/incoming/incoming_bd2949e1-d6bb-470b-85e0-ed0dedd920eb.csv exists.
Copied file to ../data/incoming/incoming_5079505b-cf3a-4915-a332-192a952

In [27]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Define the schema
schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("titre", StringType(), True),
    StructField("description", StringType(), True),
    StructField("service", StringType(), True),
    StructField("date", StringType(), True)  # You can use DateType() if you will convert the date later
])


In [40]:
incidents_df = spark.readStream.schema(schema) \
                                 .option("header", "true") \
                                 .csv(target_folder)  # This should point to the directory


# Example aggregation: Count incidents by service
incidents_by_service = incidents_df.groupBy("service").count()

# Start the query to continuously output counts to the console
query_service_count = incidents_by_service.writeStream \
                                           .outputMode("complete") \
                                           .format("console") \
                                           .option("truncate", "false") \
                                           .start()

query_service_count.awaitTermination()


Py4JJavaError: An error occurred while calling o237.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:180)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:162)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:133)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:96)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:68)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:539)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:265)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:162)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:162)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:167)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:259)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:198)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:212)
	at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:260)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [29]:
import os

incoming_dir = "../data/incoming/"
print("Files in incoming directory:", os.listdir(incoming_dir))


Files in incoming directory: ['incoming_08bcd605-2d92-4fc9-a7b8-80fca4c23664.csv', 'incoming_100c38b4-a5e8-4d27-975c-3ef15117d970.csv', 'incoming_1291087f-5a3b-4a53-adf9-994fef0874e0.csv', 'incoming_21de8d84-98ce-4b22-bf11-48e34826051a.csv', 'incoming_2654a857-6e95-497d-b7b7-9e74b8d5893f.csv', 'incoming_285e65cd-ce41-44d1-8e64-de06dc9a2e33.csv', 'incoming_3314742d-95dc-4314-9698-64b00d562c81.csv', 'incoming_3d1ee6aa-8d87-411c-9259-7b959e586fe9.csv', 'incoming_3ec07362-587b-4dbd-9b5a-2ee260f7202b.csv', 'incoming_3ee6ed7d-e255-427f-aec4-1d9b5e9bcd7d.csv', 'incoming_43b97eba-4989-4394-aa7a-29156472cf4a.csv', 'incoming_4f564721-c38e-45e6-bd9c-d64b22765fa4.csv', 'incoming_5079505b-cf3a-4915-a332-192a9527021a.csv', 'incoming_55db5e54-eeed-4eb2-8e35-6d719a2a1433.csv', 'incoming_55f9a30f-d75b-48bd-93b9-d9e338e35cf1.csv', 'incoming_577c068b-9843-474e-978a-3e2269010e82.csv', 'incoming_6946161a-0d35-4a94-b120-c121509dc25a.csv', 'incoming_72bf1108-ccfa-4987-b8a9-18365efbe9c6.csv', 'incoming_9dafbd

In [30]:
test_file_path = "../data/incoming/incoming_100c38b4-a5e8-4d27-975c-3ef15117d970.csv"  # Replace with an actual file
test_df = spark.read.schema(schema).option("header", "true").csv(test_file_path)
test_df.show()


+---+----------+-------------+----------+----------+
| Id|     titre|  description|   service|      date|
+---+----------+-------------+----------+----------+
|  1|Incident 1|Description 1| Emergency|2023-01-01|
|  2|Incident 2|Description 2|Pediatrics|2023-01-02|
|  3|Incident 3|Description 3| Radiology|2023-01-03|
|  4|Incident 4|Description 4| Emergency|2023-01-04|
|  5|Incident 5|Description 5|Pediatrics|2023-01-05|
+---+----------+-------------+----------+----------+

