In [1]:
# RUN the following 2 installation lines only one time
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('Basics').getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

#Creating spark context-Its like connecting to spark cluster
from pyspark import SparkConf
from pyspark.context import SparkContext


sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# RUN the following line one by one and make sure you understand what
# each command is doing, i.e., what the elements of the rdd contains

In [5]:
# Load CSV as RDD
rdd = sc.textFile("healthcare-dataset-stroke-data.csv")
print(rdd.count())

5111


In [6]:
# Remove header
header = rdd.first()
data = rdd.filter(lambda line: line != header)

# Split each row into a list
rows = data.map(lambda line: line.split(","))

## TASK 1: Stroke Rate per Work Type

In [7]:
# Select (work_type, stroke)
work_stroke = rows.map(lambda x: (x[6], int(x[11])))

# Total people per work type
people_per_work = work_stroke.mapValues(lambda _: 1) \
                             .reduceByKey(lambda a, b: a + b)

# Stroke cases per work type
stroke_cases = work_stroke.filter(lambda x: x[1] == 1) \
                          .mapValues(lambda _: 1) \
                          .reduceByKey(lambda a, b: a + b)

# Join results: (work_type, (stroke_count, total_people))
joined = stroke_cases.join(people_per_work)

# Calculate stroke rate
stroke_rate_work = joined.map(
    lambda x: (x[0], round((x[1][0] / x[1][1]) * 100, 2))
)

print("\n--- Stroke Rate by Work Type ---")
for work, rate in stroke_rate_work.collect():
    print(f"{work}: {rate}%")

# Extra: stroke count for "Never_worked"
never_worked_stroke = work_stroke \
    .filter(lambda x: x[0] == "Never_worked" and x[1] == 1) \
    .count()

print(f"\nStroke cases for people who never worked: {never_worked_stroke}")


--- Stroke Rate by Work Type ---
Private: 5.09%
Govt_job: 5.02%
children: 0.29%
Self-employed: 7.94%

Stroke cases for people who never worked: 0


## TASK 2: Stroke Rate by (Residence Type Ã— Ever Married)

In [8]:
# Key: (Residence_type, Ever_married), Value: stroke
residence_married = rows.map(lambda x: ((x[7], x[5]), int(x[11])))

# Total people per group
people_per_group = residence_married.mapValues(lambda _: 1) \
                                    .reduceByKey(lambda a, b: a + b)

# Stroke cases per group
stroke_per_group = residence_married.filter(lambda x: x[1] == 1) \
                                    .mapValues(lambda _: 1) \
                                    .reduceByKey(lambda a, b: a + b)

# Join + calculate rate
combined = stroke_per_group.join(people_per_group)

stroke_rate_group = combined.map(
    lambda x: (x[0], round((x[1][0] / x[1][1]) * 100, 2))
)

print("\n--- Stroke Rate by Residence Type and Marital Status ---")
for (residence, married), rate in stroke_rate_group.collect():
    print(f"{residence}, Married={married}: {rate}%")


--- Stroke Rate by Residence Type and Marital Status ---
Urban, Married=Yes: 6.66%
Rural, Married=Yes: 6.46%
Urban, Married=No: 2.37%
Rural, Married=No: 0.92%


## TASK 3: Stroke Rate for Patients Aged 65+ (by Smoking & Gender)

In [11]:
# Filter rows for age >= 65
older_patients = rows.filter(lambda x: float(x[2]) >= 65)

# Only keep the values that are important to this task:
# gender / smoking_status / stroke
# stroke is either 0 or 1, so make it into an integer.
filtered_rdd = older_patients.map(lambda x: (x[1], x[-2], int(x[-1])))

# Take out "Unknown" of smoking_status.
smoking_filtered = filtered_rdd.filter(lambda x: x[1] != "Unknown")

# Create key-value pairs via smoking_status and gender.
key_value = smoking_filtered.map(lambda x: ((x[1], x[0]), x[2]))

# Add the number 1 to stroke (for the calculations),
# because each tuple is one patient: (stroke, 1).
stroke_rdd = key_value.map(lambda x: (x[0], (x[1], 1)))

# Per key/group, create a new tuple: (number of strokes, number of patients).
grouped = stroke_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Sort keys alphabetically.
sorted_keys = grouped.sortByKey(ascending=False)

# Divide the number of strokes by the number of patients per key/group.
percents = sorted_keys.map(lambda x: (x[0], round(((x[1][0] / x[1][1]) * 100), 2)))

# Print the outcome.
outcome = percents.collect()
print("Patients with an age >= 65: \n")
for (group, percent) in outcome:
 print(f"smoking_status: {group[0]:<15} | gender: {group[1]:<6} | stroke percentage: {percent}%")

Patients with an age >= 65: 

smoking_status: smokes          | gender: Male   | stroke percentage: 15.87%
smoking_status: smokes          | gender: Female | stroke percentage: 10.45%
smoking_status: never smoked    | gender: Male   | stroke percentage: 15.91%
smoking_status: never smoked    | gender: Female | stroke percentage: 16.36%
smoking_status: formerly smoked | gender: Male   | stroke percentage: 16.44%
smoking_status: formerly smoked | gender: Female | stroke percentage: 15.69%
