<a href="https://colab.research.google.com/github/Aaditya140/Aaditya140-HEALTH-MONITORING-SYSTEM/blob/main/Health_Monitoring_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

IMPORTING ESSENTIAL LIBRARIES

In [None]:
import random
import json
from pyspark.sql import SparkSession
from kafka import KafkaProducer, KafkaConsumer
from pymongo import MongoClient
from textblob import TextBlob

**Initialize Spark Session**

In [None]:
spark = SparkSession.builder.appName("HealthMonitoring").getOrCreate()

def generate_patient_data(num_patients=10000):
    patients = []
    for i in range(num_patients):
        patient = {
            "id": i,
            "blood_pressure": random.randint(90, 180),
            "sugar_level": random.randint(70, 200),
            "cholesterol": random.randint(100, 300),
            "haemoglobin": round(random.uniform(10, 18), 1)
        }
        patients.append(patient)
    return patients

Store data in Hadoop HDFS (simulated by JSON file)

In [None]:
def save_to_hdfs(patients, filename="patients_data.json"):
    with open(filename, "w") as f:
        json.dump(patients, f)


**Process data using Spark**

In [None]:
def process_data(filename="patients_data.json"):
    df = spark.read.json(filename)
    stats = df.describe().toPandas()
    return stats

**Kafka Producer**

In [None]:
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

def send_to_kafka(stats):
    producer.send('health_stats', stats.to_dict())


** Kafka Consumer**

In [None]:
def receive_from_kafka():
    consumer = KafkaConsumer('health_stats', bootstrap_servers='localhost:9092',
                             value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    for msg in consumer:
        print("Received:", msg.value)

** NoSQL Storage (MongoDB)**

In [None]:
client = MongoClient('localhost', 27017)
db = client.health_monitoring
feedback_collection = db.feedback

def store_feedback(patient_id, feedback):
    feedback_collection.insert_one({"patient_id": patient_id, "feedback": feedback})

**Sentiment Analysis**

In [None]:
def analyze_feedback():
    for feedback in feedback_collection.find():
        sentiment = TextBlob(feedback['feedback']).sentiment.polarity
        print(f"Patient {feedback['patient_id']} Feedback Sentiment: {'Positive' if sentiment > 0 else 'Negative'}")

**Sample Workflow**

In [None]:
if __name__ == "__main__":
    patients = generate_patient_data()
    save_to_hdfs(patients)
    stats = process_data()
    send_to_kafka(stats)
    store_feedback(1, "The test process was smooth and quick.")
    analyze_feedback()