In [None]:
# Analyzing Suicide Trends in the Philippines using PySpark (1979-2016)
# Kaggle Data Set: WHO Suicide Statistics: https://www.kaggle.com/datasets/szamil/who-suicide-statistics/data
# Basic historical (1979-2016) data by country, year and demographic groups 

In [None]:
# Import libraries
from pyspark.sql import SparkSession
import csv
from io import StringIO
from pyspark.sql.functions import col, when, format_number, desc
from pyspark.sql.functions import sum, count, avg, round, regexp_extract

In [None]:
# Create a SparkSession and Spark Context
spark = SparkSession.builder.appName("Philippine_Suicide_Data_Analysis").getOrCreate()
sc = spark.sparkContext

In [None]:
# DATA DICTIONARY
data_df = spark.read.csv("who_suicide_statistics.csv", header=True, inferSchema=True)
data_df.printSchema()

num_rows = data_df.count()
columns = data_df.columns
num_columns = len(columns)

print("Data Dictionary")
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

null_counts = {col_name: data_df.where(col(col_name).isNull()).count() for col_name in data_df.columns}
for col_name, null_count in null_counts.items():
    print(f"Column '{col_name}': {null_count} null values")

In [None]:
# RDD Data Manipulation
rdd = sc.textFile("who_suicide_statistics.csv")
print(f"first elements of the rdd: {rdd.first()}")
print(f"number of elements of the rdd: {rdd.count()}")

In [None]:
# Remove header
header = rdd.first() 
data_rdd = rdd.filter(lambda row: row != header)
print(f"first elements of the rdd after removing the header: {data_rdd.first()}")
print(f"number of elements of the rdd after removing the header: {data_rdd.count()}")

In [None]:
# Helper function to parse a CSV row.
def parse_csv(row):
    reader = csv.DictReader(StringIO(row), fieldnames=header.split(','))
    return next(reader)

# Parse each row into a dictionary
parsed_rdd = data_rdd.map(parse_csv)

# Filter the dataset for rows where the country is 'Philippines'
philippines_rdd = parsed_rdd.filter(lambda row: row['country'] == 'Philippines')

In [None]:
# Using collect() method
print(philippines_rdd.collect())

In [None]:
# Using take() method
print(philippines_rdd.take(10))

In [None]:
# Empty suicides number rows
empty_suicides_no_rdd = philippines_rdd.filter(lambda row: row['suicides_no'] == '')
print(f"empty suicide number rows count: {empty_suicides_no_rdd.count()}")

In [None]:
# Filter rows with no null or empty string values
philippines_rdd = philippines_rdd.filter(lambda d: all(v not in (None, '') for v in d.values()))
empty_suicides_no_rdd = philippines_rdd.filter(lambda row: row['suicides_no'] == '')
print(f"empty suicide number rows count: {empty_suicides_no_rdd.count()}")

In [None]:
# Group By
grouped_by_age_rdd = philippines_rdd.groupBy(lambda row: row['age'])
grouped_by_age_rdd = grouped_by_age_rdd.map(lambda x: (x[0], list(x[1])))
flattened_rdd = grouped_by_age_rdd.flatMap(lambda x: [(x[0], entry) for entry in x[1]])

# Compute suicide rates
suicide_rates_rdd = flattened_rdd.map(lambda x: (
    x[0],  # age group
    float(x[1]['suicides_no']) / float(x[1]['population']) * 100000  # suicide rate per 100,000
))

# Aggregate by age group, calculate average suicide rate per age group 
# using formula: Suicide rate = Total Number of suicides per Age Group / Total Population per Age Group) × 100,000

aggregated_rdd = suicide_rates_rdd.combineByKey(
    lambda value: (value, 1),
    lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
).map(lambda x: (x[0], x[1][0] / x[1][1]))

# Collect and print results
results = aggregated_rdd.collect()
for result in results:
    print(f"Age Group: {result[0]}, Average Suicide Rate: {result[1]:.2f}%")

In [None]:
# Sort by suicide rate in descending order
sorted_rdd = aggregated_rdd.sortBy(lambda x: x[1], ascending=False)
results = sorted_rdd.collect()
for result in results:
    print(f"Age Group: {result[0]}, Average Suicide Rate: {result[1]:.2f}%")

In [None]:
# Dataframe Data Manipulation
df = spark.read.csv("who_suicide_statistics.csv", header=True, inferSchema=True)
df.printSchema()

In [None]:
# Show the first few rows of the DataFrame
print("Before dropping nulls:")
df.show()

In [None]:
# Drop rows where any of the specified columns have null values
columns_to_check = df.columns
cleaned_df = df.dropna(subset=columns_to_check)
print("After dropping nulls:")
cleaned_df.show()

In [None]:
# Show the first few rows of the DataFrame before filtering
print("Before filtering:")
cleaned_df.show()

In [None]:
# Filter
# Filter the DataFrame to only include rows where the country is "Philippines"
filtered_df = cleaned_df.dropDuplicates().filter(df['country'] == 'Philippines')
print("After filtering for Philippines:")
filtered_df.show()

In [None]:
# Add column called suicide rate
# using formula: Suicide rate = ( suicides number / population ) × 100,000

filtered_df = filtered_df.withColumn("suicide_rate", (col("suicides_no") / col("population")) * 100000 )
filtered_df = filtered_df.withColumn("suicide_rate", format_number("suicide_rate", 2))
filtered_df.show()

In [None]:
# Select Distinct Age Groups

select_age_groups_df = filtered_df.select("age").distinct()
select_age_groups_df.show()

In [None]:
# Group By Age

# Cast the 'suicides_no' column to integers, 'suicide_rate' column to float (or double)
filtered_df = filtered_df.withColumn('suicides_no', col('suicides_no').cast('integer'))
filtered_df = filtered_df.withColumn('suicide_rate', col('suicide_rate').cast('double'))

# Group by the 'age' column, sum the 'suicides_no' column, and get the average of the 'suicide_rate' column
grouped_by_age_df = filtered_df.groupBy('age').agg(
    sum('suicides_no').alias('total_suicides_no'),
    round(avg('suicide_rate'), 2).alias('average_suicide_rate')
)

grouped_by_age_df.show()

In [None]:
# Order By "total_suicides_no" descending
grouped_by_age_df = grouped_by_age_df.orderBy("total_suicides_no", ascending=False)
print("Ordered age groups by most number of suicides")
grouped_by_age_df.show()

In [None]:
# Order By "average_suicide_rate" descending
grouped_by_age_df = grouped_by_age_df.orderBy("average_suicide_rate", ascending=False)
print("Ordered age groups by average suicide rate per 100 000 population")
grouped_by_age_df.show()

In [None]:
# Group By sex

# Group by the 'sex' column, sum the 'suicides_no' column, and get the average of the 'suicide_rate' column
grouped_by_sex_df = filtered_df.groupBy('sex').agg(
    sum('suicides_no').alias('total_suicides_no'),
    round(avg('suicide_rate'), 2).alias('average_suicide_rate')
)

# Show the grouped DataFrame
grouped_by_sex_df.show()

In [None]:
# Order By "average_suicide_rate" descending
grouped_by_sex_df = grouped_by_sex_df.orderBy(col('average_suicide_rate').desc())
grouped_by_sex_df.show()

In [None]:
# Bar plot to compare suicide counts across age groups and genders
import seaborn as sns
import matplotlib.pyplot as plt

# Define a function to extract the lower bound of the age range
def extract_lower_bound(age_range):
    if age_range == '75+':
        return 75  # Return a specific value for "75+"
    else:
        return int(age_range.split('-')[0])
    
filtered_df_age_ordered = filtered_df.toPandas()

# Convert the 'age' column to numerical format
filtered_df_age_ordered['age_numeric'] = filtered_df_age_ordered['age'].apply(lambda x: extract_lower_bound(x.split()[0]))

# Order the DataFrame by the numerical age values in descending order
filtered_df_age_ordered = filtered_df_age_ordered.sort_values(by='age_numeric')

sns.barplot(data = filtered_df_age_ordered, x='age', y='suicide_rate', hue='sex')
plt.title('Suicide Rate by Age Group and Gender in the Philippines from 1992-2011')
plt.xlabel('Age Group')
plt.xticks(rotation=45)
plt.ylabel('Suicide Rate Standardized by 100,000')
plt.show()

In [None]:
# Group By year

# Group by the 'year' column, sum the 'suicides_no' column, and get the average of the 'suicide_rate' column
grouped_by_year_df = filtered_df.groupBy('year').agg(
    sum('suicides_no').alias('total_suicides_no'),
    round(avg('suicide_rate'), 2).alias('average_suicide_rate')
)

# Show the grouped DataFrame
grouped_by_year_df.show()

In [None]:
# Order By year ascending
grouped_by_year_df = grouped_by_year_df.orderBy(col('year').desc())
grouped_by_year_df.show()

In [None]:
# Line plot
sns.lineplot(data=filtered_df_age_ordered, x='year', y='suicide_rate', hue='age')
plt.xticks(filtered_df_age_ordered['year'].unique(), rotation=45)
plt.title('Suicide Trends by Age Group in the Philippines Over Years')
plt.xlabel('Year')
plt.ylabel('Suicide Rate Standardized by 100,000')
plt.show()

In [None]:
# Questions
# 1. What age group has the highest suicide rate in the Philippines throughout the years?
# 2. Are Filipino men more likely to commit suicide than Filipino women?
# 3. In what year from 1992-2011 has the highest recorded number of suicide in the Philippines? lowest? ( with constraints for unavailable data)
# 4. In what year from 1992-2011 has the highest measured suicide rate in the Philippines? lowest? (with constraints for unavailable data)

In [None]:
# SQL Queries

# 1. What age group has the highest suicide rate in the Philippines throughout the years?

filtered_df.createOrReplaceTempView("temp_view1")
query_result = spark.sql("""
    SELECT age, ROUND(AVG(suicide_rate), 2) AS avg_suicide_rate
    FROM temp_view1
    WHERE country = 'Philippines'
    GROUP BY age
    ORDER BY avg_suicide_rate DESC
    LIMIT 1
""")
query_result.show()

In [None]:
# 2. Are Filipino men more likely to commit suicide than Filipino women? 

filtered_df.createOrReplaceTempView("temp_view2")
query_result = spark.sql("""
    SELECT sex, ROUND(AVG(suicide_rate), 2) AS avg_suicide_rate
    FROM temp_view2
    WHERE country = 'Philippines'
    GROUP BY sex
    ORDER BY avg_suicide_rate DESC
""")

# Show the result
query_result.show()

In [None]:
# 3. In what year from 1992-2011 has the highest recorded number of suicide in the Philippines? lowest? ( with constraints for unavailable data)

# For the highest recorded number of suicides
filtered_df.createOrReplaceTempView("temp_view3")
query_result_highest = spark.sql("""
    SELECT year, SUM(suicides_no) AS total_suicides
    FROM temp_view3
    WHERE country = 'Philippines' AND year BETWEEN 1992 AND 2011
    GROUP BY year
    ORDER BY total_suicides DESC
    LIMIT 1
""")

print("The highest recorded number of suicide in the Philippines from 1992-2011")
query_result_highest.show()

# For the lowest recorded number of suicides
filtered_df.createOrReplaceTempView("temp_view4")
query_result_highest = spark.sql("""
    SELECT year, SUM(suicides_no) AS total_suicides
    FROM temp_view4
    WHERE country = 'Philippines' AND year BETWEEN 1992 AND 2011
    GROUP BY year
    ORDER BY total_suicides
    LIMIT 1
""")

print("The lowest recorded number of suicide in the Philippines from 1992-2011")
query_result_highest.show()


In [None]:
# 4. In what year from 1992-2011 has the highest measured suicide rate in the Philippines? lowest? (with constraints for unavailable data)

# For the highest recorded suicide rate
query_result_highest = spark.sql("""
    SELECT year, AVG(suicide_rate) AS avg_suicide_rate
    FROM temp_view3
    WHERE country = 'Philippines' AND year BETWEEN 1992 AND 2011
    GROUP BY year
    ORDER BY avg_suicide_rate DESC
    LIMIT 1
""")

print("The year with the highest recorded suicide rate in the Philippines from 1992-2011:")
query_result_highest.show()

# For the lowest recorded suicide rate
query_result_lowest = spark.sql("""
    SELECT year, AVG(suicide_rate) AS avg_suicide_rate
    FROM temp_view4
    WHERE country = 'Philippines' AND year BETWEEN 1992 AND 2011
    GROUP BY year
    ORDER BY avg_suicide_rate ASC
    LIMIT 1
""")

print("The year with the lowest recorded suicide rate in the Philippines from 1992-2011:")
query_result_lowest.show()

In [None]:
spark.stop()