<a href="https://colab.research.google.com/github/06Nandhini/RDD-analysis-in-Pyspark/blob/main/Pyspark_SQL_Dataframe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import random
import numpy as np

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("DiabetesAnalysis") \
    .getOrCreate()


In [None]:
# Read CSV as DataFrame
df = spark.read.csv("diabetes_dataset.csv", header=True, inferSchema=True)

# Show sample
df.show(5)
df.printSchema()

+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|year|gender| age|location|race:AfricanAmerican|race:Asian|race:Caucasian|race:Hispanic|race:Other|hypertension|heart_disease|smoking_history|  bmi|hbA1c_level|blood_glucose_level|diabetes|
+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|2020|Female|32.0| Alabama|                   0|         0|             0|            0|         1|           0|            0|          never|27.32|        5.0|                100|       0|
|2015|Female|29.0| Alabama|                   0|         1|             0|            0|         0|           0|            0|          never|19.95|        5.0|                 90|       0|
|2015|  Male|18.0| Alabama|                   0|  

In [None]:
# Count total rows
print("Total records:", df.count())

# Filter patients with diabetes
diabetes_df = df.filter(df.diabetes == 1)
diabetes_df.show(5)

# Average BMI by gender
df.groupBy("gender").avg("bmi").show()

# Count by smoking history
df.groupBy("smoking_history").count().show()


Total records: 100000
+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|year|gender| age|location|race:AfricanAmerican|race:Asian|race:Caucasian|race:Hispanic|race:Other|hypertension|heart_disease|smoking_history|  bmi|hbA1c_level|blood_glucose_level|diabetes|
+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|2016|Female|64.0| Alabama|                   0|         0|             0|            0|         1|           0|            0|           ever|49.27|        8.2|                140|       1|
|2016|  Male|80.0| Alabama|                   1|         0|             0|            0|         0|           0|            0|         former|29.16|        8.8|                140|       1|
|2016|Female|42.0| Alabama| 

In [None]:
# Create temporary SQL view
df.createOrReplaceTempView("patients")

# Example SQL query: average blood glucose by location
spark.sql("""
    SELECT location, AVG(blood_glucose_level) AS avg_glucose
    FROM patients
    GROUP BY location
    ORDER BY avg_glucose DESC
""").show()


+--------------------+------------------+
|            location|       avg_glucose|
+--------------------+------------------+
|      Virgin Islands|139.95806028833553|
|             Montana| 139.8908017707821|
|        Rhode Island| 139.6874692874693|
|              Kansas| 139.6704322200393|
|            Arkansas|  139.379970544919|
|            Virginia|139.34814814814814|
|             Vermont| 139.3101644245142|
|        North Dakota| 139.1990171990172|
|       West Virginia|139.09540636042402|
|            Missouri|139.03882063882065|
|            Kentucky|139.01668302257116|
|                Utah|138.95364238410596|
|            Maryland| 138.8918918918919|
|            Illinois| 138.8467583497053|
|            Michigan|138.80451866404715|
|          New Mexico|138.74028529267093|
|         Puerto Rico|138.54208494208495|
|           Tennessee| 138.5171537484117|
|             Indiana| 138.3975842979366|
|District of Columbia| 138.3713163064833|
+--------------------+------------

In [None]:
# Convert DataFrame to RDD
rdd = df.rdd

# Example: Count patients per race
# Each row is a Row object; you can access columns by name
race_counts = rdd.map(lambda row: (
    "AfricanAmerican" if row['race:AfricanAmerican'] == 1 else
    "Asian" if row['race:Asian'] == 1 else
    "Caucasian" if row['race:Caucasian'] == 1 else
    "Hispanic" if row['race:Hispanic'] == 1 else
    "Other",
    1
)).reduceByKey(lambda a, b: a + b)

print("Patients per race:")
print(race_counts.collect())


Patients per race:
[('Other', 19998), ('Asian', 20015), ('Caucasian', 19876), ('AfricanAmerican', 20223), ('Hispanic', 19888)]


In [None]:
# Read CSV as RDD
cases_rdd = spark.sparkContext.textFile("diabetes_dataset.csv")

# Remove header
header = cases_rdd.first()
cases_rdd = cases_rdd.filter(lambda row: row != header)

# Split by comma
cases_rdd = cases_rdd.map(lambda row: row.split(","))


In [None]:
# Define column names
columns = ['year', 'gender', 'age', 'location', 'race_AfricanAmerican',
           'race_Asian', 'race_Caucasian', 'race_Hispanic', 'race_Other',
           'hypertension', 'heart_disease', 'smoking_history', 'bmi',
           'hbA1c_level', 'blood_glucose_level', 'diabetes']

# Convert to DataFrame
cases_df = cases_rdd.toDF(columns)

# Show sample
cases_df.show(5)
cases_df.printSchema()


+----+------+---+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|year|gender|age|location|race_AfricanAmerican|race_Asian|race_Caucasian|race_Hispanic|race_Other|hypertension|heart_disease|smoking_history|  bmi|hbA1c_level|blood_glucose_level|diabetes|
+----+------+---+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|2020|Female| 32| Alabama|                   0|         0|             0|            0|         1|           0|            0|          never|27.32|          5|                100|       0|
|2015|Female| 29| Alabama|                   0|         1|             0|            0|         0|           0|            0|          never|19.95|          5|                 90|       0|
|2015|  Male| 18| Alabama|                   0|        

In [None]:
# Convert back to RDD
cases_rdd_again = cases_df.rdd


In [None]:
# Select only specific columns
cases_df_selected = cases_df.select('location', 'gender', 'age', 'bmi', 'diabetes')

cases_df_selected.show(5)


+--------+------+---+-----+--------+
|location|gender|age|  bmi|diabetes|
+--------+------+---+-----+--------+
| Alabama|Female| 32|27.32|       0|
| Alabama|Female| 29|19.95|       0|
| Alabama|  Male| 18|23.76|       0|
| Alabama|  Male| 41|27.32|       0|
| Alabama|Female| 52|23.75|       0|
+--------+------+---+-----+--------+
only showing top 5 rows



In [None]:
from pyspark.sql import functions as F

# Sort patients by blood glucose level descending
cases_df.sort(F.desc("blood_glucose_level")).show(5)


+----+------+---+-----------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|year|gender|age|   location|race_AfricanAmerican|race_Asian|race_Caucasian|race_Hispanic|race_Other|hypertension|heart_disease|smoking_history|  bmi|hbA1c_level|blood_glucose_level|diabetes|
+----+------+---+-----------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+
|2019|Female| 15|Mississippi|                   0|         1|             0|            0|         0|           0|            0|          never|16.56|        6.1|                 90|       0|
|2016|Female| 47|    Alabama|                   1|         0|             0|            0|         0|           0|            0|        No Info|22.61|        6.6|                 90|       0|
|2019|Female| 10|Mississippi|           

In [None]:
cases_df.filter(cases_df.diabetes == 1) \
        .sort(F.desc("bmi")) \
        .select("gender", "age", "bmi", "diabetes") \
        .show(5)


+------+---+-----+--------+
|gender|age|  bmi|diabetes|
+------+---+-----+--------+
|Female| 45|88.72|       1|
|  Male| 49|83.74|       1|
|Female| 48|81.73|       1|
|Female| 36|79.46|       1|
|Female| 42|72.89|       1|
+------+---+-----+--------+
only showing top 5 rows



In [None]:
# Convert top 10 rows of PySpark DataFrame to Pandas
cases_df.limit(10).toPandas()


Unnamed: 0,year,gender,age,location,race_AfricanAmerican,race_Asian,race_Caucasian,race_Hispanic,race_Other,hypertension,heart_disease,smoking_history,bmi,hbA1c_level,blood_glucose_level,diabetes
0,2020,Female,32,Alabama,0,0,0,0,1,0,0,never,27.32,5.0,100,0
1,2015,Female,29,Alabama,0,1,0,0,0,0,0,never,19.95,5.0,90,0
2,2015,Male,18,Alabama,0,0,0,0,1,0,0,never,23.76,4.8,160,0
3,2015,Male,41,Alabama,0,0,1,0,0,0,0,never,27.32,4.0,159,0
4,2016,Female,52,Alabama,1,0,0,0,0,0,0,never,23.75,6.5,90,0
5,2016,Male,66,Alabama,0,0,1,0,0,0,0,not current,27.32,5.7,159,0
6,2015,Female,49,Alabama,0,0,1,0,0,0,0,current,24.34,5.7,80,0
7,2016,Female,15,Alabama,0,0,0,0,1,0,0,No Info,20.98,5.0,155,0
8,2016,Male,51,Alabama,1,0,0,0,0,0,0,never,38.14,6.0,100,0
9,2015,Male,42,Alabama,0,0,1,0,0,0,0,No Info,27.32,5.7,160,0
