In [58]:
!pip install pyspark==4.0.0



In [59]:
import os

os.makedirs('data', exist_ok=True)

In [None]:
 #!/bin/bash
# Download datasets from Kaggle using their API

!curl -L -o ./data/gdp-countries.zip \
    "https://www.kaggle.com/api/v1/datasets/download/nitishabharathi/gdp-per-capita-all-countries"

!curl -L -o ./data/marine-microplastic.zip \
    "https://www.kaggle.com/api/v1/datasets/download/william2020/marine-microplastics"

!curl -L -o ./data/food-microplastic.zip \
    "https://www.kaggle.com/api/v1/datasets/download/jayeshrmohanani/dataset-for-microplastic-consumption-in-food-items"

!curl -L -o ./data/life-exp-countries.zip \
    "https://www.kaggle.com/api/v1/datasets/download/sahirmaharajj/country-health-trends-dataset"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 44393  100 44393    0     0  54985      0 --:--:-- --:--:-- --:--:-- 54985
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 1681k  100 1681k    0     0  1083k      0  0:00:01  0:00:01 --:--:-- 3218k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  115k  100  115k    0     0   136k      0 --:--:-- --:--:-- --:--:--  262k
  % Total    % Received % Xferd  Average Speed   Tim

In [None]:
# Unzip the downloaded datasets

import zipfile
import os

def unzip_dataset(dataset_name):
    try:
        with zipfile.ZipFile(f'./data/{dataset_name}.zip', 'r') as zip_ref:
            zip_ref.extractall('./data/')
            print("Extracted marine microplastic dataset")
    except zipfile.BadZipFile:
        print(f"{dataset_name} file is not a valid zip")

unzip_dataset('marine-microplastic')
unzip_dataset('life-exp-countries')
unzip_dataset('gdp-countries')
unzip_dataset('food-microplastic')

# List all files in data directory
print("\nAll files in data directory:")
for file in os.listdir('./data/'):
    print(f"- {file}")

Extracted marine microplastic dataset
Extracted marine microplastic dataset
Extracted marine microplastic dataset
Extracted marine microplastic dataset

All files in data directory:
- train.csv
- GDP.parquet
- train.parquet
- gapminder.parquet
- marine-microplastic.zip
- processed_microplastics.parquet
- gdp-countries.zip
- food-microplastic.zip
- GDP.csv
- life-exp-countries.zip
- processed_microplastics.csv
- silver
- Marine_Microplastics.csv
- gapminder.csv
- Marine_Microplastics.parquet


In [62]:
from pyspark.sql import SparkSession

# Create Spark session≥≥≥≥
spark = SparkSession.builder \
    .appName("MicroplasticsAnalysis") \
    .getOrCreate()

25/06/13 14:17:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# Create Bronze layer with explicit schemas

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Define explicit schemas for better performance and type safety
ocean_schema = StructType([
    StructField("OBJECTID", IntegerType(), True),
    StructField("Oceans", StringType(), True),
    StructField("Regions", StringType(), True),
    StructField("SubRegions", StringType(), True),
    StructField("Sampling Method", StringType(), True),
    StructField("Measurement", DoubleType(), True),
    StructField("Unit", StringType(), True),
    StructField("Density Range", StringType(), True),
    StructField("Density Class", StringType(), True),
    StructField("Short Reference", StringType(), True),
    StructField("Long Reference", StringType(), True),
    StructField("DOI", StringType(), True),
    StructField("Organization", StringType(), True),
    StructField("Keywords", StringType(), True),
    StructField("Accession Number", StringType(), True),
    StructField("Accession Link", StringType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True),
    StructField("Date", StringType(), True),
    StructField("GlobalID", StringType(), True),
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True)
])

food_schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Food_Type", StringType(), True),
    StructField("Microplastic_Density", DoubleType(), True),
    StructField("Unit", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Source", StringType(), True)
])

health_trends_schema = StructType([
    StructField("Country", StringType(), True),
    StructField("LifeExpectancy", DoubleType(), True),
    StructField("FertilityRate", DoubleType(), True),
    StructField("Population", LongType(), True),
    StructField("Region", StringType(), True)
])

gdp_schema = StructType([
           StructField("Country", StringType(), True),
           StructField("Country Code", StringType(), True)
       ] + [StructField(str(year), DoubleType(), True) for year in range(1990, 2020)])

# Create Bronze layer with explicit schemas
df_ocean = spark.read.csv("data/Marine_Microplastics.csv",
                        header=True,
                        schema=ocean_schema)

df_food = spark.read.csv("data/processed_microplastics.csv",
                        header=True,
                        schema=food_schema)

df_health_trends = spark.read.csv("data/gapminder.csv",
                        header=True,
                        schema=health_trends_schema)

df_gdp = spark.read.csv("data/GDP.csv",
                        header=True,
                        schema=gdp_schema)


In [64]:
# Transform each CSV file to Parquet format

data_dir = 'data'
csv_files = [f for f in os.listdir(data_dir) if f.endswith('.csv')]
print(f"Found CSV files: {csv_files}")
print(f"Spark version: {spark.version}")

# Convert each CSV file to Parquet using Spark
for csv_file in csv_files:
    csv_path = os.path.join(data_dir, csv_file)
    parquet_file = csv_file.replace('.csv', '.parquet')
    parquet_path = os.path.join(data_dir, parquet_file)
    
    print(f"Converting {csv_file} to {parquet_file}...")
    
    # Read CSV with Spark
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_path)
    
    # Write as Parquet (overwrite existing)
    df.write.mode("overwrite").parquet(parquet_path)
    
    row_count = df.count()
    col_count = len(df.columns)
    print(f"✓ Converted {csv_file} ({row_count} rows, {col_count} columns)")

print("\nAll CSV files converted to Parquet format using Spark!")

Found CSV files: ['train.csv', 'GDP.csv', 'processed_microplastics.csv', 'Marine_Microplastics.csv', 'gapminder.csv']
Spark version: 4.0.0
Converting train.csv to train.parquet...
✓ Converted train.csv (723 rows, 20 columns)
Converting GDP.csv to GDP.parquet...
✓ Converted GDP.csv (260 rows, 32 columns)
Converting processed_microplastics.csv to processed_microplastics.parquet...
✓ Converted processed_microplastics.csv (723 rows, 21 columns)
Converting Marine_Microplastics.csv to Marine_Microplastics.parquet...
✓ Converted Marine_Microplastics.csv (20425 rows, 22 columns)
Converting gapminder.csv to gapminder.parquet...
✓ Converted gapminder.csv (191 rows, 5 columns)

All CSV files converted to Parquet format using Spark!


In [None]:
# Load the Parquet files

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Load the parquet files to understand the structure
food_df = spark.read.parquet("data/train.parquet")
marine_df = spark.read.parquet("data/Marine_Microplastics.parquet") 
gdp_df = spark.read.parquet("data/GDP.parquet")
gapminder_df = spark.read.parquet("data/gapminder.parquet")

print("=== Current Data Schemas ===")
print("\n1. Food Microplastics:")
food_df.printSchema()
print(f"Sample data:")
food_df.show(3)

print("\n2. Marine Microplastics:")
marine_df.printSchema()

print("\n3. GDP Data:")
gdp_df.printSchema()

print("\n4. Gapminder (Life expectancy, fertility):")
gapminder_df.printSchema()
gapminder_df.show(3)

=== Current Data Schemas ===

1. Food Microplastics:
root
 |-- Year: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Cheese: double (nullable = true)
 |-- Yoghurt: double (nullable = true)
 |-- Total Milk: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Refined Grains: double (nullable = true)
 |-- Whole Grains: double (nullable = true)
 |-- Nuts And Seeds: double (nullable = true)
 |-- Total Processed Meats: double (nullable = true)
 |-- Unprocessed Red Meats: double (nullable = true)
 |-- Fish: double (nullable = true)
 |-- Shellfish: double (nullable = true)
 |-- Eggs: double (nullable = true)
 |-- Total Salt: double (nullable = true)
 |-- Added Sugars: double (nullable = true)
 |-- Non-Starchy Vegetables: double (nullable = true)
 |-- Potatoes: double (nullable = true)
 |-- Other Starchy Vegetables: double (nullable = true)
 |-- Beans And Legumes: double (nullable = true)

Sample data:
+----+------------+-----------+-----------+-----------+

In [70]:
print("=== SILVER LAYER TRANSFORMATION DESIGN ===")

# 1. Food Microplastics Silver Layer
print("1. FOOD_MICROPLASTICS_SILVER")
print("   Purpose: Normalize food microplastic data from wide to long format")
print("   Transformations:")
print("   - Unpivot food columns to create food_type and microplastic_density")
print("   - Calculate total microplastic exposure per country/year")
print("   - Add contamination level categories")

# Create the transformation
food_silver = food_df.select(
    col("Year").alias("year"),
    col("Country").alias("country"),
    # Unpivot all food columns
    expr("""
        stack(17,
            'Cheese', Cheese,
            'Yoghurt', Yoghurt,
            'Total Milk', `Total Milk`,
            'Fruits', Fruits,
            'Refined Grains', `Refined Grains`,
            'Whole Grains', `Whole Grains`,
            'Nuts And Seeds', `Nuts And Seeds`,
            'Total Processed Meats', `Total Processed Meats`,
            'Unprocessed Red Meats', `Unprocessed Red Meats`,
            'Fish', Fish,
            'Shellfish', Shellfish,
            'Eggs', Eggs,
            'Total Salt', `Total Salt`,
            'Added Sugars', `Added Sugars`,
            'Non-Starchy Vegetables', `Non-Starchy Vegetables`,
            'Potatoes', Potatoes,
            'Other Starchy Vegetables', `Other Starchy Vegetables`
        ) as (food_type, microplastic_density)
    """)
).filter(col("microplastic_density").isNotNull() & (col("microplastic_density") > 0))

# Add contamination categories
food_silver = food_silver.withColumn(
    "contamination_level",
    when(col("microplastic_density") < 10, "Low")
    .when(col("microplastic_density") < 50, "Medium")
    .when(col("microplastic_density") < 100, "High")
    .otherwise("Very High")
)

print(f"   Resulting rows: {food_silver.count()}")
food_silver.show(5)

=== SILVER LAYER TRANSFORMATION DESIGN ===
1. FOOD_MICROPLASTICS_SILVER
   Purpose: Normalize food microplastic data from wide to long format
   Transformations:
   - Unpivot food columns to create food_type and microplastic_density
   - Calculate total microplastic exposure per country/year
   - Add contamination level categories
   Resulting rows: 12062
+----+-------+--------------+--------------------+-------------------+
|year|country|     food_type|microplastic_density|contamination_level|
+----+-------+--------------+--------------------+-------------------+
|1990| Angola|        Cheese|         0.191780822|                Low|
|1990| Angola|       Yoghurt|          54.8997394|               High|
|1990| Angola|    Total Milk|         96.60273973|               High|
|1990| Angola|        Fruits|         76.52054795|               High|
|1990| Angola|Refined Grains|         481.2971487|          Very High|
+----+-------+--------------+--------------------+-------------------+
onl

In [72]:
print("\n2. MARINE_MICROPLASTICS_SILVER")
print("   Purpose: Calculate averaged microplastic density across all oceans per year")
print("   Transformations:")
print("   - Standardize units to pieces/m³")
print("   - Extract year from date")
print("   - Calculate average microplastic density across all oceans per year")
print("   - Add pollution level categories based on averages")
print("   - Filter out invalid measurements")

# First, clean and prepare the data
marine_cleaned = marine_df.filter(
    (col("Measurement").isNotNull()) & 
    (col("Measurement") >= 0) &
    (col("Unit").isin(["pieces/m3", "pieces/cubic meter", "particles/m3"]))
).select(
    col("Measurement").alias("microplastic_density"),
    col("Date").alias("sample_date"),
    to_date(col("Date"), "M/d/yyyy h:mm:ss a").alias("parsed_date")
).withColumn(
    "year", year(col("parsed_date"))
).filter(col("year").isNotNull() & (col("year") > 1900) & (col("year") < 2030))

# Calculate averaged microplastic density across all oceans per year
marine_silver = marine_cleaned.groupBy("year").agg(
    avg("microplastic_density").alias("avg_microplastic_density")
).withColumn(
    "pollution_level",
    when(col("avg_microplastic_density") < 1, "Low")
    .when(col("avg_microplastic_density") < 10, "Medium") 
    .when(col("avg_microplastic_density") < 100, "High")
    .otherwise("Very High")
).select("year", "avg_microplastic_density", "pollution_level").orderBy("year")

print(f"   Resulting rows: {marine_silver.count()}")
marine_silver.show()


2. MARINE_MICROPLASTICS_SILVER
   Purpose: Calculate averaged microplastic density across all oceans per year
   Transformations:
   - Standardize units to pieces/m³
   - Extract year from date
   - Calculate average microplastic density across all oceans per year
   - Add pollution level categories based on averages
   - Filter out invalid measurements
   Resulting rows: 38
+----+------------------------+---------------+
|year|avg_microplastic_density|pollution_level|
+----+------------------------+---------------+
|1972|     0.01641935483870968|            Low|
|1973|    0.022285714285714287|            Low|
|1986|               0.0272914|            Low|
|1987|     0.02339903105590062|            Low|
|1989|    0.024940955555555557|            Low|
|1990|    0.025754363636363645|            Low|
|1991|     0.01906323809523809|            Low|
|1992|    0.017569849129593806|            Low|
|1993|      0.0205208729281768|            Low|
|1994|    0.011186407407407409|            Lo

In [74]:
# Transform GDP from wide to long format
gdp_years = [str(year) for year in range(1990, 2019)]  # Exclude 2019 as it's string type

gdp_silver = gdp_df.select(
    col("Country ").alias("country"),
    col("Country Code").alias("country_code"),
    expr(f"""
        stack({len(gdp_years)}, 
            {', '.join([f"'{year}', `{year}`" for year in gdp_years])})
        as (year, gdp_per_capita)
    """)
).filter(col("gdp_per_capita").isNotNull())

# Add development categories
gdp_silver = gdp_silver.withColumn(
    "development_level",
    when(col("gdp_per_capita") < 1000, "Low Income")
    .when(col("gdp_per_capita") < 4000, "Lower Middle Income")
    .when(col("gdp_per_capita") < 12000, "Upper Middle Income")
    .otherwise("High Income")
).withColumn("year", col("year").cast("int"))

print(f"   GDP silver layer rows: {gdp_silver.count()}")
gdp_silver.show(3)


   GDP silver layer rows: 6726
+-------+------------+----+--------------+-----------------+
|country|country_code|year|gdp_per_capita|development_level|
+-------+------------+----+--------------+-----------------+
|  Aruba|         ABW|1990|   24101.10943|      High Income|
|  Aruba|         ABW|1991|   25870.75594|      High Income|
|  Aruba|         ABW|1992|    26533.3439|      High Income|
+-------+------------+----+--------------+-----------------+
only showing top 3 rows


In [75]:
print("\n4. HEALTH_DEMOGRAPHICS_SILVER")
print("   Purpose: Enrich gapminder data with regional analysis")
print("   Transformations:")
print("   - Clean population data (remove non-numeric values)")
print("   - Add demographic categories")
print("   - Standardize country names")

health_silver = gapminder_df.select(
    col("Country").alias("country"),
    col("LifeExpectancy").alias("life_expectancy"),
    col("FertilityRate").alias("fertility_rate"),
    # Clean population - convert to numeric
    regexp_replace(col("Population"), "[^0-9]", "").cast("long").alias("population"),
    col("Region").alias("region")
).filter(
    (col("life_expectancy").isNotNull()) &
    (col("fertility_rate").isNotNull())
).withColumn(
    "population_category",
    when(col("population") < 1000000, "Small")
    .when(col("population") < 10000000, "Medium")
    .when(col("population") < 100000000, "Large")
    .otherwise("Very Large")
).withColumn(
    "life_expectancy_category",
    when(col("life_expectancy") < 60, "Low")
    .when(col("life_expectancy") < 70, "Medium")
    .when(col("life_expectancy") < 80, "High")
    .otherwise("Very High")
)

print(f"   Health demographics silver layer rows: {health_silver.count()}")
health_silver.show(3)


4. HEALTH_DEMOGRAPHICS_SILVER
   Purpose: Enrich gapminder data with regional analysis
   Transformations:
   - Clean population data (remove non-numeric values)
   - Add demographic categories
   - Standardize country names
   Health demographics silver layer rows: 191
+-----------+---------------+--------------+----------+--------------------+-------------------+------------------------+
|    country|life_expectancy|fertility_rate|population|              region|population_category|life_expectancy_category|
+-----------+---------------+--------------+----------+--------------------+-------------------+------------------------+
|Afghanistan|           51.0|          7.81|  19701940|          South Asia|              Large|                     Low|
|    Albania|           74.2|          2.47|   3121965|Europe & Central ...|             Medium|                    High|
|    Algeria|           73.2|          2.63|  31183658|Middle East & Nor...|              Large|                    Hi