In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.types import *

In [2]:
credentials_location = './google_credentials.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName("Obesity Data Processing") \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar,./lib-bigquery/spark-3.3-bigquery-0.38.0.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)


In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/05/14 21:19:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/05/14 21:19:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
bucket_name="obesity-analysis-bucket"

In [6]:
death_rate_obesity = spark.read.csv(f"gs://{bucket_name}/death-rate-from-obesity.csv", header=True, inferSchema=True)
mean_bmi_men = spark.read.csv(f"gs://{bucket_name}/mean-body-mass-index-bmi-in-adult-males.csv", header=True, inferSchema=True)
mean_bmi_women = spark.read.csv(f"gs://{bucket_name}/mean-body-mass-index-bmi-in-adult-women.csv", header=True, inferSchema=True)
obesity_adults = spark.read.csv(f"gs://{bucket_name}/share-of-adults-defined-as-obese.csv", header=True, inferSchema=True)
deaths_by_risk_factor = spark.read.csv(f"gs://{bucket_name}/number-of-deaths-by-risk-factor.csv", header=True, inferSchema=True)

                                                                                

In [7]:
def sort_and_filter(df, sort_column, filter_columns):
    filter_condition = col(sort_column).isNotNull()
    for column in filter_columns:
        filter_condition &= col(column).isNotNull()
    return df.orderBy(sort_column).filter(filter_condition)

In [8]:
def print_nonzero_blanks(df, df_name):
    blank_found = False
    for col_name in df.columns:
        blank_count = df.filter((col(col_name).isNull()) | (col(col_name) == "")).count()
        if blank_count != 0:
            blank_found = True
            print(f"DataFrame: {df_name}, Column: {col_name} - {blank_count} blank values")
    if not blank_found:
        print(f"No blank values found in DataFrame: {df_name}")

In [9]:
def print_duplicates(df, df_name):
    duplicate_count = df.count() - df.dropDuplicates().count()
    if duplicate_count != 0:
        print(f"DataFrame: {df_name} - {duplicate_count} duplicates found")
        df.groupBy(df.columns).count().where(col('count') > 1).show(truncate=False)
    else:
        print(f"No duplicates found in DataFrame: {df_name}")

In [10]:
def remove_duplicates(df, df_name):
    initial_count = df.count()
    df_no_duplicates = df.dropDuplicates()
    final_count = df_no_duplicates.count()
    
    if initial_count != final_count:
        print(f"Removed {initial_count - final_count} duplicates from DataFrame: {df_name}")
    else:
        print(f"No duplicates found in DataFrame: {df_name}")
    
    return df_no_duplicates

In [11]:
def process_data(df, df_name, sort_column, filter_columns, date_col=None):
    df = sort_and_filter(df, sort_column, filter_columns)
    
    print_nonzero_blanks(df, df_name)

    print_duplicates(df, df_name)
    df = remove_duplicates(df, df_name)
    
    return df

In [12]:
from pyspark.sql.functions import when, col, regexp_replace, udf

deaths_by_risk_factor_processed = process_data(deaths_by_risk_factor, "Deaths by Risk Factor", "Year", ["Year", "Entity"])
mean_bmi_women_processed = process_data(mean_bmi_women, "Mean BMI Women", "Year", ["Year", "Entity"], "Year")
mean_bmi_men_processed = process_data(mean_bmi_men, "Mean BMI Men", "Year", ["Year", "Entity"], "Year")
obesity_adults_processed = process_data(obesity_adults, "Obesity Adults", "Year", ["Year", "Entity"], "Year")
death_rate_obesity_processed = process_data(death_rate_obesity, "Death Rate Obesity", "Year", ["Year", "Entity"], "Year")

                                                                                

DataFrame: Deaths by Risk Factor, Column: Code - 690 blank values
24/05/14 21:20:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

No duplicates found in DataFrame: Deaths by Risk Factor
No duplicates found in DataFrame: Deaths by Risk Factor
DataFrame: Mean BMI Women, Column: Code - 378 blank values
No duplicates found in DataFrame: Mean BMI Women
No duplicates found in DataFrame: Mean BMI Women
DataFrame: Mean BMI Men, Column: Code - 378 blank values
No duplicates found in DataFrame: Mean BMI Men
No duplicates found in DataFrame: Mean BMI Men
DataFrame: Obesity Adults, Column: Code - 294 blank values
No duplicates found in DataFrame: Obesity Adults
No duplicates found in DataFrame: Obesity Adults
DataFrame: Death Rate Obesity, Column: Code - 690 blank values
No duplicates found in DataFrame: Death Rate Obesity
No duplicates found in DataFrame: Death Rate Obesity


In [16]:
from pyspark.sql.types import StringType

codes_mapping = {
    "African Region": "WHO", "Eastern Mediterranean Region": "WHO", "Region of the Americas": "WHO", 
    "European Region": "WHO", "Western Pacific Region": "WHO", 
    "East Asia & Pacific": "WB", "Europe & Central Asia": "WB", "High Income": "WB", 
    "Latin America & Caribbean": "WB", "Low Income": "WB", "Lower Middle Income": "WB", 
    "Middle East & North Africa": "WB", "North America": "WB", "South Asia": "WB", "Sub-Saharan Africa": "WB",
    "England": "GBR", "Northern Ireland": "GBR", "Scotland": "GBR", "Wales": "GBR",
    "OECD": "USA", "Bahrain": "BHR", "Belgium": "BEL", "Bolivia": "BOL", "Brazil": "BRA",
    "Burkina Faso": "BFA", "Malaysia": "MYS", "Luxembourg": "LUX", "Lithuania": "LTU", "Lebanon": "LBN",
    "Gabon": "GAB", "France": "FRA", "Equatorial Guinea": "GNQ", "Dominican Republic": "DOM", "Dominica": "DMA",
    "Cook Islands": "COK", "Colombia": "COL", "Central African Republic": "CAF", "Cape Verde": "CPV", "Canada": "CAN"
}

def map_code(entity):
    return codes_mapping.get(entity, "UNMATCHED")

map_code_udf = udf(map_code, StringType())

def update_codes_if_empty_code(df):
    df = df.withColumn("Entity", regexp_replace(col("Entity"), r"\s+\(.*?\)", ""))
    
    return df.withColumn("Code", when(col("Code") == "", map_code_udf(col("Entity")))
                          .otherwise(map_code_udf(col("Entity"))))

In [17]:
from pyspark.sql.functions import regexp_replace, coalesce


deaths_by_risk_factor_no_blank = update_codes_if_empty_code(deaths_by_risk_factor_processed)
mean_bmi_women_no_blank = update_codes_if_empty_code(mean_bmi_women_processed)
mean_bmi_men_no_blank = update_codes_if_empty_code(mean_bmi_men_processed)
obesity_adults_no_blank = update_codes_if_empty_code(obesity_adults_processed)
death_rate_obesity_no_blank = update_codes_if_empty_code(death_rate_obesity_processed)

In [19]:
deaths_by_risk_factor_processed = process_data(deaths_by_risk_factor_no_blank, "Deaths by Risk Factor", "Year", ["Year", "Entity"])
mean_bmi_women_processed = process_data(mean_bmi_women_no_blank, "Mean BMI Women", "Year", ["Year", "Entity"], "Year")
mean_bmi_men_processed = process_data(mean_bmi_men_no_blank, "Mean BMI Men", "Year", ["Year", "Entity"], "Year")
obesity_adults_processed = process_data(obesity_adults_no_blank, "Obesity Adults", "Year", ["Year", "Entity"], "Year")
death_rate_obesity_processed = process_data(death_rate_obesity_no_blank, "Death Rate Obesity", "Year", ["Year", "Entity"], "Year")

No blank values found in DataFrame: Deaths by Risk Factor
No duplicates found in DataFrame: Deaths by Risk Factor
No duplicates found in DataFrame: Deaths by Risk Factor
No blank values found in DataFrame: Mean BMI Women
No duplicates found in DataFrame: Mean BMI Women
No duplicates found in DataFrame: Mean BMI Women
No blank values found in DataFrame: Mean BMI Men
No duplicates found in DataFrame: Mean BMI Men
No duplicates found in DataFrame: Mean BMI Men
No blank values found in DataFrame: Obesity Adults
No duplicates found in DataFrame: Obesity Adults
No duplicates found in DataFrame: Obesity Adults
No blank values found in DataFrame: Death Rate Obesity
No duplicates found in DataFrame: Death Rate Obesity
No duplicates found in DataFrame: Death Rate Obesity


In [20]:
deaths_by_risk_factor_processed

DataFrame[Entity: string, Code: string, Year: int, Deaths that are from all causes attributed to high systolic blood pressure, in both sexes aged all ages: double, Deaths that are from all causes attributed to diet high in sodium, in both sexes aged all ages: double, Deaths that are from all causes attributed to diet low in whole grains, in both sexes aged all ages: double, Deaths that are from all causes attributed to alcohol use, in both sexes aged all ages: double, Deaths that are from all causes attributed to diet low in fruits, in both sexes aged all ages: double, Deaths that are from all causes attributed to unsafe water source, in both sexes aged all ages: double, Deaths that are from all causes attributed to secondhand smoke, in both sexes aged all ages: double, Deaths that are from all causes attributed to low birth weight, in both sexes aged all ages: double, Deaths that are from all causes attributed to child wasting, in both sexes aged all ages: double, Deaths that are from

In [21]:
deaths_by_risk_factor_processed = deaths_by_risk_factor_processed.withColumnRenamed("Deaths that are from all causes attributed to high systolic blood pressure, in both sexes aged all ages", "SBP_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to diet high in sodium, in both sexes aged all ages", "Sodium_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to diet low in whole grains, in both sexes aged all ages", "Whole_Grains_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to alcohol use, in both sexes aged all ages", "Alcohol_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to diet low in fruits, in both sexes aged all ages", "Fruit_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to unsafe water source, in both sexes aged all ages", "Water_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to secondhand smoke, in both sexes aged all ages", "SHSmoke_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to low birth weight, in both sexes aged all ages", "LBW_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to child wasting, in both sexes aged all ages", "Wasting_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to unsafe sex, in both sexes aged all ages", "Unsafe_Sex_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to diet low in nuts and seeds, in both sexes aged all ages", "Nuts_Seeds_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to household air pollution from solid fuels, in both sexes aged all ages", "HAP_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to diet low in vegetables, in both sexes aged all ages", "Vegetable_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to smoking, in both sexes aged all ages", "Smoking_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to high fasting plasma glucose, in both sexes aged all ages", "FPG_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to air pollution, in both sexes aged all ages", "Air_Pollution_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to high body-mass index, in both sexes aged all ages", "BMI_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to unsafe sanitation, in both sexes aged all ages", "Sanitation_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to drug use, in both sexes aged all ages", "Drug_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to low bone mineral density, in both sexes aged all ages", "BMD_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to vitamin a deficiency, in both sexes aged all ages", "Vitamin_A_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to child stunting, in both sexes aged all ages", "Stunting_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to non-exclusive breastfeeding, in both sexes aged all ages", "Breast_feeding_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to iron deficiency, in both sexes aged all ages", "Iron_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to ambient particulate matter pollution, in both sexes aged all ages", "PM_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to low physical activity, in both sexes aged all ages", "Activity_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to no access to handwashing facility, in both sexes aged all ages", "Hand_washing_Deaths") \
       .withColumnRenamed("Deaths that are from all causes attributed to high ldl cholesterol, in both sexes aged all ages", "LDL_Deaths")


In [22]:
deaths_by_risk_factor_processed

DataFrame[Entity: string, Code: string, Year: int, SBP_Deaths: double, Sodium_Deaths: double, Whole_Grains_Deaths: double, Alcohol_Deaths: double, Fruit_Deaths: double, Water_Deaths: double, SHSmoke_Deaths: double, LBW_Deaths: double, Wasting_Deaths: double, Unsafe_Sex_Deaths: double, Nuts_Seeds_Deaths: double, HAP_Deaths: double, Vegetable_Deaths: double, Smoking_Deaths: double, FPG_Deaths: double, Air_Pollution_Deaths: double, BMI_Deaths: double, Sanitation_Deaths: double, Drug_Deaths: double, BMD_Deaths: double, Vitamin_A_Deaths: double, Stunting_Deaths: double, Breast_feeding_Deaths: double, Iron_Deaths: double, PM_Deaths: double, Activity_Deaths: double, Hand_washing_Deaths: double, LDL_Deaths: double]

In [23]:
import re

def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

deaths_by_risk_factor_processed = deaths_by_risk_factor_processed \
    .select([col(c).alias(camel_to_snake(c)) for c in deaths_by_risk_factor_processed.columns])

mean_bmi_women_processed = mean_bmi_women_processed \
    .select([col(c).alias(camel_to_snake(c)) for c in mean_bmi_women_processed.columns])

mean_bmi_men_processed = mean_bmi_men_processed \
    .select([col(c).alias(camel_to_snake(c)) for c in mean_bmi_men_processed.columns])

obesity_adults_processed = obesity_adults_processed \
    .select([col(c).alias(camel_to_snake(c)) for c in obesity_adults_processed.columns])

death_rate_obesity_processed = death_rate_obesity_processed \
    .select([col(c).alias(camel_to_snake(c)) for c in death_rate_obesity_processed.columns])

In [24]:
mean_bmi_women_processed = mean_bmi_women_processed.withColumn("year", col("year").cast(TimestampType()))
mean_bmi_women_processed = mean_bmi_women_processed.withColumnRenamed("Mean BMI (female)", "mean_bmi_female")
mean_bmi_women_processed.show()

+--------------------+---------+-------------------+---------------+
|              entity|     code|               year|mean_bmi_female|
+--------------------+---------+-------------------+---------------+
|              Brazil|      BRA|1970-01-01 00:32:55|    22.91392441|
|            Malaysia|      MYS|1970-01-01 00:33:06|    22.33452108|
|             Myanmar|UNMATCHED|1970-01-01 00:33:26|    22.28340314|
|            Mongolia|UNMATCHED|1970-01-01 00:32:57|    22.74688401|
|          Mauritania|UNMATCHED|1970-01-01 00:33:20|     25.2055842|
|               Libya|UNMATCHED|1970-01-01 00:33:06|    27.21854788|
| Trinidad and Tobago|UNMATCHED|1970-01-01 00:33:18|    26.55626557|
|             Croatia|UNMATCHED|1970-01-01 00:33:36|    27.03333448|
|               Libya|UNMATCHED|1970-01-01 00:33:15|    28.01256469|
|        Turkmenistan|UNMATCHED|1970-01-01 00:33:13|    24.02612442|
|            Barbados|UNMATCHED|1970-01-01 00:33:11|    27.27673603|
|               Yemen|UNMATCHED|19

In [25]:
mean_bmi_men_processed = mean_bmi_men_processed.withColumn("year", col("Year").cast(TimestampType()))
mean_bmi_men_processed = mean_bmi_men_processed.withColumnRenamed("Mean BMI (male)", "mean_bmi_male")
mean_bmi_men_processed

DataFrame[entity: string, code: string, year: timestamp, mean_bmi_male: double]

In [26]:
obesity_adults_processed = obesity_adults_processed.withColumn("year", col("Year").cast(TimestampType()))
obesity_adults_processed = obesity_adults_processed.withColumnRenamed("prevalence of obesity among adults, bmi >= 30 (crude estimate) (%) - _sex: both sexes - _age group: 18+  years", "adult_obesity_prevalence_bmi_30_plus_pct_both_sexes_18_plus")
obesity_adults_processed

DataFrame[entity: string, code: string, year: timestamp, adult_obesity_prevalence_bmi_30_plus_pct_both_sexes_18_plus: double]

In [27]:
death_rate_obesity_processed = death_rate_obesity_processed.withColumn("year", col("Year").cast(TimestampType()))
death_rate_obesity_processed = death_rate_obesity_processed.withColumnRenamed("deaths that are from all causes attributed to high body-mass index per 100,000 people, in both sexes aged age-standardized", "high_bmi_death_rate_per_100k_both_sexes_age_std")
death_rate_obesity_processed

DataFrame[entity: string, code: string, year: timestamp, high_bmi_death_rate_per_100k_both_sexes_age_std: double]

In [28]:
obesity_adults_processed = obesity_adults_processed.withColumn("year", col("Year").cast(TimestampType()))
obesity_adults_processed

DataFrame[entity: string, code: string, year: timestamp, adult_obesity_prevalence_bmi_30_plus_pct_both_sexes_18_plus: double]

In [29]:
dataframes_info = {
    deaths_by_risk_factor_processed: "deaths_by_risk_factor",
    mean_bmi_women_processed: "mean_bmi_women",
    mean_bmi_men_processed: "mean_bmi_men",
    obesity_adults_processed: "obesity_adults",
    death_rate_obesity_processed: "death_rate_obesity",
}

In [30]:
def save_to_bigquery(df, table_name):
    temp_bucket = "obesity-analysis-temp-bucket"
    df.write \
      .format("bigquery") \
      .option("table", f"useful-monitor-415110.obesity_analysis_dataset.{table_name}") \
      .option("temporaryGcsBucket", temp_bucket) \
      .mode('overwrite') \
      .save()


In [31]:
for df, name in dataframes_info.items():
    save_to_bigquery(df, name)


                                                                                

In [32]:
spark.stop()