## Importing the required libraries.

In [None]:
!pip install --upgrade google-cloud-storage
!pip install pymongo
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, struct, when, lit, explode, isnan, count, avg, sum, mean, coalesce
from functools import reduce
from pyspark.sql.window import Window
from pyspark import StorageLevel



In [None]:
# Getting the data using API
!wget -O owid_emission_data.json https://nyc3.digitaloceanspaces.com/owid-public/data/co2/owid-co2-data.json

--2025-08-08 01:46:50--  https://nyc3.digitaloceanspaces.com/owid-public/data/co2/owid-co2-data.json
Resolving nyc3.digitaloceanspaces.com (nyc3.digitaloceanspaces.com)... 162.243.189.2
Connecting to nyc3.digitaloceanspaces.com (nyc3.digitaloceanspaces.com)|162.243.189.2|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 104417664 (100M) [application/json]
Saving to: ‘owid_emission_data.json’


2025-08-08 01:46:53 (37.1 MB/s) - ‘owid_emission_data.json’ saved [104417664/104417664]



In [None]:
from google.colab import files
uploaded = files.upload()  # Uploading the `gcs_key.json` required for connecting to google cloud storage

Saving gcs_key.json to gcs_key.json


In [None]:
import os
from google.cloud import storage

# Setting environment variable for authentication
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs_key.json"
client = storage.Client() # Initializing the GCS client
bucket_name = "aquastat-data-2025" # The bucket name in GCS
bucket = client.bucket(bucket_name)

In [None]:
# Uploading the file to GCS
blob = bucket.blob("owid_emission_data.json")  # Destination path in bucket
blob.upload_from_filename("owid_emission_data.json")  # Specifying the name of the file

print("JSON file uploaded to GCS")

JSON file uploaded to GCS


In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null # Java-11 required by PySpark 3.4.1 on Google Colab

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs_key.json"        # Security key for connecting to Google Cloud Storage
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"       # Setting JAVA_HOME

In [None]:
# Getting the Hadoop version required by gcs connector
!wget https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.5/gcs-connector-hadoop3-2.2.5-shaded.jar -O /tmp/gcs-connector-hadoop3-2.2.5-shaded.jar

--2025-08-08 01:47:17--  https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.5/gcs-connector-hadoop3-2.2.5-shaded.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 31607894 (30M) [application/java-archive]
Saving to: ‘/tmp/gcs-connector-hadoop3-2.2.5-shaded.jar’


2025-08-08 01:47:17 (95.4 MB/s) - ‘/tmp/gcs-connector-hadoop3-2.2.5-shaded.jar’ saved [31607894/31607894]



In [None]:
# Initializing the Spark session by alloacating sufficient memory, required hadoop jar, and the security key required to connect to GCS.

spark = SparkSession.builder \
    .appName("Optimized GCS Read") \
    .master("local[*]") \
    .config("spark.jars", "/tmp/gcs-connector-hadoop3-2.2.5-shaded.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "gcs_key.json") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

In [None]:
# Reading the .JSON file from Google Cloud Storage using PySpark
df_raw = spark.read \
    .option("multiline", "true") \
    .option("inferSchema", "true") \
    .option("samplingRatio", 0.1) \
    .json("gs://aquastat-data-2025/owid_emission_data.json")

df_raw.cache()  # Caching in memory for reuse of this DataFrame multiple times
df_raw.show(5)  # Displaying the country-wise top 5 entries

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------------+----------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------+--------------------+-----

In [None]:
# Using printSchema to get an idea of the data. Similar to df.info()
df_raw.printSchema()
print("Total number of root keys (countries):", len(df_raw.columns))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
 |    |    |    |-- co2_including_luc_per_gdp: double (nullable = true)
 |    |    |    |-- co2_including_luc_per_unit_energy: double (nullable = true)
 |    |    |    |-- co2_per_capita: double (nullable = true)
 |    |    |    |-- co2_per_gdp: double (nullable = true)
 |    |    |    |-- co2_per_unit_energy: double (nullable = true)
 |    |    |    |-- coal_co2: double (nullable = true)
 |    |    |    |-- coal_co2_per_capita: double (nullable = true)
 |    |    |    |-- consumption_co2: double (nullable = true)
 |    |    |    |-- consumption_co2_per_capita: double (nullable = true)
 |    |    |    |-- consumption_co2_per_gdp: double (nullable = true)
 |    |    |    |-- cumulative_cement_co2: double (nullable = true)
 |    |    |    |-- cumulative_co2: double (nullable = true)
 |    |    |    |-- cumulative_co2_including_luc: double (nullable = true)
 |    |    |    |-- cumulative_coal_co2: double (nullable = true)
 |

#### From the above structure, it becomes clear that the data is country-wise and each country has an assigned iso_code(unique identifier) and there are several features year-wise for each country. The data is in json format and needs to be processed to bring into a structured table format.

In [None]:
# Flattening the nested JSON data structure for ease of processing and analysis
#from pyspark.sql.functions import explode, col, lit
#countries = df_raw.columns
#dfs = []
#for country in countries:
#  df_country = df_raw.select(explode(col(country + ".data")).alias("data")) \
#    .withColumn("country", lit(country)) \
#    .withColumn("iso_code", lit(df_raw.select(f"{country}.iso_code").first()[0]))
#  df_country = df_country.select("country", "iso_code", "data.*")
#  dfs.append(df_country)

#### The above code gives error because all the countries don't have the iso_code mapping.
#### We will filter the countries having an iso_code. First, we will understand what are the countries that do not have an iso_code.

In [None]:
countries_missing_iso = []  # List for collecting countries with missing iso_codes

for country in df_raw.columns:
    try:
        iso_value = df_raw.select(col(f"{country}.iso_code")).first()[0]
        if iso_value is None:
            countries_missing_iso.append(country)
    except:
        countries_missing_iso.append(country)

print("Countries with missing ISO codes:", countries_missing_iso)

Countries with missing ISO codes: ['Africa', 'Africa (GCP)', 'Asia', 'Asia (GCP)', 'Asia (excl. China and India)', 'Central America (GCP)', 'Europe', 'Europe (GCP)', 'Europe (excl. EU-27)', 'Europe (excl. EU-28)', 'European Union (27)', 'European Union (28)', 'High-income countries', 'International aviation', 'International shipping', 'International transport', 'Kosovo', 'Kuwaiti Oil Fires', 'Kuwaiti Oil Fires (GCP)', 'Least developed countries (Jones et al.)', 'Low-income countries', 'Lower-middle-income countries', 'Middle East (GCP)', 'Non-OECD (GCP)', 'North America', 'North America (GCP)', 'North America (excl. USA)', 'OECD (GCP)', 'OECD (Jones et al.)', 'Oceania', 'Oceania (GCP)', 'Ryukyu Islands', 'Ryukyu Islands (GCP)', 'South America', 'South America (GCP)', 'Upper-middle-income countries', 'World']


#### From the output, it can be seen that, most of the fields missing iso_codes are continents or group of countries. We will assign iso_codes for continents and some group of countries that will be useful for analysis and we will exclude the rest.

In [None]:
# Creating a dictionary for missing codes
missing_iso_codes = {
    "Kosovo": "KSV",
    "World": "World",
    "Africa": "Africa",
    "Asia": "Asia",
    "Europe": "Europe",
    "High-income countries": "HICG",
    "Low-income countries": "LICG",
    "Lower-middle-income countries": "LMIG",
    "North America": "NOAM",
    "South America": "SOAM",
    "Upper-middle-income countries": "UMIG"
}

In [None]:
# Replacing missing iso_codes, while keeping the data as it is
for country, iso in missing_iso_codes.items():
    if country in df_raw.columns:
        df_raw = df_raw.withColumn(
            country,
            struct(
                lit(iso).alias("iso_code"),
                col(f"{country}.data").alias("data")
            )
        )

In [None]:
# Doing a check of which countries remain without iso_codes now
countries_missing_iso = []    # List for collecting countries with missing iso_codes

for country in df_raw.columns:
    try:
        iso_value = df_raw.select(col(f"{country}.iso_code")).first()[0]
        if iso_value is None:
            countries_missing_iso.append(country)
    except:
        countries_missing_iso.append(country)

print("Countries with missing ISO codes:", countries_missing_iso)

Countries with missing ISO codes: ['Africa (GCP)', 'Asia (GCP)', 'Asia (excl. China and India)', 'Central America (GCP)', 'Europe (GCP)', 'Europe (excl. EU-27)', 'Europe (excl. EU-28)', 'European Union (27)', 'European Union (28)', 'International aviation', 'International shipping', 'International transport', 'Kuwaiti Oil Fires', 'Kuwaiti Oil Fires (GCP)', 'Least developed countries (Jones et al.)', 'Middle East (GCP)', 'Non-OECD (GCP)', 'North America (GCP)', 'North America (excl. USA)', 'OECD (GCP)', 'OECD (Jones et al.)', 'Oceania', 'Oceania (GCP)', 'Ryukyu Islands', 'Ryukyu Islands (GCP)', 'South America (GCP)']


#### From the above output, it can be seen that the replacement has been successfully done and the remaining are not relevent to analysis.

## We will now structure the json data into a tabular format.

In [None]:
# Flattening the json nested data structure, excluding entries where iso_code is not present.

dfs = []     # Empty list that will store country-wise dataframes

for country in df_raw.columns:
    try:
        iso_value = df_raw.select(col(f"{country}.iso_code")).first()[0]
        if iso_value is None:
            continue

        df_country = df_raw.select(explode(col(f"{country}.data")).alias("data")) \
            .withColumn("country", lit(country)) \
            .withColumn("iso_code", lit(iso_value)) \
            .select("country", "iso_code", "data.*")

        dfs.append(df_country)

    except Exception as e:
        print(f"Skipping {country}: {e}")

Skipping Africa (GCP): [FIELD_NOT_FOUND] No such struct field `iso_code` in `data`.
Skipping Asia (GCP): [FIELD_NOT_FOUND] No such struct field `iso_code` in `data`.
Skipping Asia (excl. China and India): [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Asia (excl`.` China and India)`.`iso_code` cannot be resolved. Did you mean one of the following? [`Asia (excl. China and India)`, `Asia (GCP)`, `New Caledonia`, `Saint Kitts and Nevis`, `Sao Tome and Principe`].;
'Project ['Asia (excl. China and India).iso_code]
+- Project [Afghanistan#0, Africa#1120918, Africa (GCP)#2, Albania#3, Algeria#4, Andorra#5, Angola#6, Anguilla#7, Antarctica#8, Antigua and Barbuda#9, Argentina#10, Armenia#11, Aruba#12, Asia#1121177, Asia (GCP)#14, Asia (excl. China and India)#15, Australia#16, Austria#17, Azerbaijan#18, Bahamas#19, Bahrain#20, Bangladesh#21, Barbados#22, Belarus#23, ... 231 more fields]
   +- Project [Afghanistan#0, Africa#1120918, Africa (GCP)#2, Albania#3, Alger

In [None]:
print(df_country)

DataFrame[country: string, iso_code: string, cement_co2: double, cement_co2_per_capita: double, co2: double, co2_growth_abs: double, co2_growth_prct: double, co2_including_luc: double, co2_including_luc_growth_abs: double, co2_including_luc_growth_prct: double, co2_including_luc_per_capita: double, co2_including_luc_per_gdp: double, co2_including_luc_per_unit_energy: double, co2_per_capita: double, co2_per_gdp: double, co2_per_unit_energy: double, coal_co2: double, coal_co2_per_capita: double, consumption_co2: double, consumption_co2_per_capita: double, consumption_co2_per_gdp: double, cumulative_cement_co2: double, cumulative_co2: double, cumulative_co2_including_luc: double, cumulative_coal_co2: double, cumulative_flaring_co2: double, cumulative_gas_co2: double, cumulative_luc_co2: double, cumulative_oil_co2: double, energy_per_capita: double, energy_per_gdp: double, flaring_co2: double, flaring_co2_per_capita: double, gas_co2: double, gas_co2_per_capita: double, gdp: double, ghg_exc

In [None]:
# Combining the country-wise dataframes created and stored in dfs list earlier
df_combined = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)

df_combined.cache()
df_combined.printSchema()

root
 |-- country: string (nullable = false)
 |-- iso_code: string (nullable = false)
 |-- cement_co2: double (nullable = true)
 |-- cement_co2_per_capita: double (nullable = true)
 |-- co2: double (nullable = true)
 |-- co2_growth_abs: double (nullable = true)
 |-- co2_growth_prct: double (nullable = true)
 |-- co2_including_luc: double (nullable = true)
 |-- co2_including_luc_growth_abs: double (nullable = true)
 |-- co2_including_luc_growth_prct: double (nullable = true)
 |-- co2_including_luc_per_capita: double (nullable = true)
 |-- co2_including_luc_per_gdp: double (nullable = true)
 |-- co2_including_luc_per_unit_energy: double (nullable = true)
 |-- co2_per_capita: double (nullable = true)
 |-- co2_per_gdp: double (nullable = true)
 |-- co2_per_unit_energy: double (nullable = true)
 |-- coal_co2: double (nullable = true)
 |-- coal_co2_per_capita: double (nullable = true)
 |-- cumulative_cement_co2: double (nullable = true)
 |-- cumulative_co2: double (nullable = true)
 |-- cumu

#### From the above output, we can see that the data is now in a structured tabular format.

In [None]:
# Moving the year column to the 2nd index
cols = df_combined.columns # Getting all the column names

cols.remove("year")
cols = cols[:2] + ["year"] + cols[2:] # Year at the 2nd index


df_combined = df_combined.select(cols) # Reordering the dataframe according to the new list of columns
df_combined.printSchema()

root
 |-- country: string (nullable = false)
 |-- iso_code: string (nullable = false)
 |-- year: long (nullable = true)
 |-- cement_co2: double (nullable = true)
 |-- cement_co2_per_capita: double (nullable = true)
 |-- co2: double (nullable = true)
 |-- co2_growth_abs: double (nullable = true)
 |-- co2_growth_prct: double (nullable = true)
 |-- co2_including_luc: double (nullable = true)
 |-- co2_including_luc_growth_abs: double (nullable = true)
 |-- co2_including_luc_growth_prct: double (nullable = true)
 |-- co2_including_luc_per_capita: double (nullable = true)
 |-- co2_including_luc_per_gdp: double (nullable = true)
 |-- co2_including_luc_per_unit_energy: double (nullable = true)
 |-- co2_per_capita: double (nullable = true)
 |-- co2_per_gdp: double (nullable = true)
 |-- co2_per_unit_energy: double (nullable = true)
 |-- coal_co2: double (nullable = true)
 |-- coal_co2_per_capita: double (nullable = true)
 |-- cumulative_cement_co2: double (nullable = true)
 |-- cumulative_co2: 

In [None]:
# We will now check for missing values per column
df_combined.cache()
# Counting missing (null or NaN) values per column
df_combined.select([
    count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    for c in df_combined.columns
]).show(truncate=False)

+-------+--------+----+----------+---------------------+-----+--------------+---------------+-----------------+----------------------------+-----------------------------+----------------------------+-------------------------+---------------------------------+--------------+-----------+-------------------+--------+-------------------+---------------------+--------------+----------------------------+-------------------+----------------------+------------------+------------------+------------------+-----------------+--------------+-----------+----------------------+-------+------------------+-----+-----------------------------+--------------+-------------------+------------------------------+-------+------------------+-------------+------------------------+-------+------------------+----------+--------------------------+-----------------------+----------------+------------------------------+---------------------+----------------------------------+---------------------------+--------------

#### From the above output, it can be seen that a lot of columns have high number of missing values. These will be mostly in the very earlier days as record keeping was not very rigorous back then. We will confirm it.

In [None]:
# Grouping on the basis of years to find out amount of missing data
df_year = df_combined.groupBy("year").agg(count("*").alias("count"))

df_year.orderBy("year").show()

+----+-----+
|year|count|
+----+-----+
|1750|   49|
|1751|   49|
|1752|   49|
|1753|   49|
|1754|   49|
|1755|   49|
|1756|   49|
|1757|   49|
|1758|   49|
|1759|   49|
|1760|   49|
|1761|   49|
|1762|   49|
|1763|   49|
|1764|   49|
|1765|   49|
|1766|   49|
|1767|   49|
|1768|   49|
|1769|   49|
+----+-----+
only showing top 20 rows



In [None]:
df_year.orderBy("year", ascending=False).show()

+----+-----+
|year|count|
+----+-----+
|2023|  229|
|2022|  229|
|2021|  229|
|2020|  229|
|2019|  229|
|2018|  229|
|2017|  229|
|2016|  229|
|2015|  229|
|2014|  229|
|2013|  229|
|2012|  229|
|2011|  229|
|2010|  229|
|2009|  229|
|2008|  229|
|2007|  229|
|2006|  229|
|2005|  229|
|2004|  229|
+----+-----+
only showing top 20 rows



#### From the above two outputs, it can be said that earlier years lack data. Hence, we will consider data starting from the year 1970.

In [None]:
df_filtered = df_combined.filter((col("year") >= 1970))
df_filtered.show(10)

+-----------+--------+----+--------------------+---------------------+------------------+--------------------+------------------+------------------+----------------------------+-----------------------------+----------------------------+-------------------------+---------------------------------+-------------------+-------------------+-------------------+-------------------+--------------------+---------------------+------------------+----------------------------+-------------------+----------------------+------------------+------------------+------------------+-----------------+--------------+-------------------+----------------------+-------------------+--------------------+---------------+-----------------------------+------------------+-------------------+------------------------------+------------------+------------------+------------------+------------------------+------------------+--------------------+----------+--------------------------+-----------------------+----------------

In [None]:
# Counting the missing values in the filtered data
df_filtered.select([
    count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    for c in df_filtered.columns
]).show(truncate=False)

+-------+--------+----+----------+---------------------+---+--------------+---------------+-----------------+----------------------------+-----------------------------+----------------------------+-------------------------+---------------------------------+--------------+-----------+-------------------+--------+-------------------+---------------------+--------------+----------------------------+-------------------+----------------------+------------------+------------------+------------------+-----------------+--------------+-----------+----------------------+-------+------------------+----+-----------------------------+--------------+-------------------+------------------------------+-------+------------------+-------------+------------------------+-------+------------------+----------+--------------------------+-----------------------+----------------+------------------------------+---------------------+----------------------------------+---------------------------+-----------------

#### We can see the new dataframe has comparatively less missing values. We will remove columns with more than 40% of missing values.

In [None]:
# Total number of rows in the DataFrame
total_rows = df_filtered.count()

threshold = 0.4
# Calculating the missing values for each column using previous formula
missing_per_column = df_filtered.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_filtered.columns
]).collect()[0].asDict()

# Keeping the columns with less than 40% missing values by using the set threshold of 0.4
columns_to_keep = [
    col_name for col_name, null_count in missing_per_column.items()
    if (null_count / total_rows) <= threshold
]
df_cleaned = df_filtered.select(columns_to_keep)  # Creating a DataFrame for cleaned data
print(f"Original columns: {len(df_filtered.columns)}, After cleaning: {len(df_cleaned.columns)}") # Checking the number of columns in the original and cleaned dataframes

Original columns: 79, After cleaning: 63


In [None]:
# Count of missing values per column in the new dataframe
df_cleaned.select([
    count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    for c in df_cleaned.columns
]).show(truncate=False)

+-------+--------+----+----------+---------------------+---+--------------+---------------+-----------------+----------------------------+-----------------------------+----------------------------+-------------------------+---------------------------------+--------------+-----------+-------------------+--------+-------------------+---------------------+--------------+----------------------------+-------------------+----------------------+------------------+------------------+-----------------+-----------+----------------------+----+-----------------------------+--------------+-------------------+------------------------------+-------+------------------+-------------+------------------------+-------+------------------+----------+--------------------------+-----------------------+----------------+------------------------------+---------------------+----------------------------------+---------------------------+-----------------------------------------+--------------------------------+---

In [None]:
# Keeping the columns necessary for required analysis to reduce computational time
final_columns = [
    "country", "iso_code", "year",
    "cement_co2", "cement_co2_per_capita", "co2", "co2_growth_prct",
    "co2_including_luc", "co2_including_luc_growth_prct", "co2_per_capita", "co2_per_gdp",
    "coal_co2", "coal_co2_per_capita", "flaring_co2", "flaring_co2_per_capita",
    "ghg_excluding_lucf_per_capita", "ghg_per_capita",
    "land_use_change_co2", "land_use_change_co2_per_capita",
    "methane", "methane_per_capita", "nitrous_oxide", "nitrous_oxide_per_capita",
    "oil_co2", "oil_co2_per_capita", "population","gdp",
    "share_global_cement_co2", "share_global_co2", "share_global_co2_including_luc",
    "share_global_coal_co2", "share_global_flaring_co2", "share_global_luc_co2", "share_global_oil_co2",
    "temperature_change_from_ch4", "temperature_change_from_co2",
    "temperature_change_from_ghg", "temperature_change_from_n2o",
    "total_ghg", "total_ghg_excluding_lucf"
]

df_final = df_cleaned.select(*final_columns)

In [None]:
# We will replace the missing values in the selected columns with the country means
exclude_cols = {"country", "iso_code", "year"}  # These are the grouping columns

# Dynamically inferring the numeric columns for computation of means
numeric_cols = [f.name for f in df_final.schema.fields
                if f.dataType.simpleString() in ['double', 'float', 'int', 'bigint']
                and f.name not in exclude_cols]

# Calculating mean by numeric column for each country by grouping on the basis of the columns
avg_by_country = df_final.groupBy("country").agg(
    *[mean(c).alias(f"{c}_mean") for c in numeric_cols]
)
df_with_means = df_final.join(avg_by_country, on="country", how="left") # Joining the mean values with original DataFrame
# Replacing null values in numeric columns with country means
for c in numeric_cols:
    df_with_means = df_with_means.withColumn(
        c,
        when(col(c).isNull(), col(f"{c}_mean")).otherwise(col(c))
    )
mean_cols = [f"{c}_mean" for c in numeric_cols]
df_filled = df_with_means.drop(*mean_cols) # Dropping the added *_mean columns

In [None]:
# Now, we will check for missing values after imputation

# Calculating percentage of missing values per column in the new dataframe
missing_percentage = df_filled.select([
    (100 * count(when(col(c).isNull(), c)) / total_rows).alias(c)
    for c in df_filled.columns
])

missing_percentage.show(truncate=False)

+-------+--------+----+-----------------+---------------------+------------------+------------------+-----------------+-----------------------------+-----------------+------------------+------------------+-------------------+------------------+----------------------+-----------------------------+------------------+-------------------+------------------------------+------------------+------------------+-----------------+------------------------+------------------+------------------+------------------+-----------------+-----------------------+------------------+------------------------------+---------------------+------------------------+--------------------+--------------------+---------------------------+---------------------------+---------------------------+---------------------------+------------------+------------------------+
|country|iso_code|year|cement_co2       |cement_co2_per_capita|co2               |co2_growth_prct   |co2_including_luc|co2_including_luc_growth_prct|co2_per_

#### From the above output, it can be seen that most of the missing values have been filled. We will fill the remaining missing values with the yearly means.

In [None]:
# Computing year-wise means needed for filling missing values
yearly_means = (
    df_filled.groupBy("year")
    .agg(*[mean(c).alias(c) for c in numeric_cols])
    .withColumnRenamed("year", "year_temp")
)
# Joining back to original data
df_with_means = (
    df_filled.alias("main")
    .join(yearly_means.alias("mean"), col("main.year") == col("mean.year_temp"), "left")
    .drop("year_temp")
)
# Filling null values with yearly means
df_filled_final = df_with_means.select(
    *[
        coalesce(col(f"main.{c}"), col(f"mean.{c}")).alias(c)
        for c in numeric_cols
    ] + [col("main." + c) for c in df_filled.columns if c not in numeric_cols]
)

# Checking if any nulls values still remain
#null_exprs = [when(col(c).isNull(), 1).otherwise(0) for c in numeric_cols]
# Combining all expressions with `+` using reduce
#total_missing_expr = reduce(lambda a, b: a + b, null_exprs)

#df_filled_final.select(total_missing_expr.alias("total_missing_values")).show()

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 8)

## We have now cleaned the dataset. We will now apply some transformations to the data.

In [None]:
# Adding a emissions category column according to global standards
# If emission >= 10, value=high; if >= 2.1, then medium; otherwise, "Low" emissions
df_transformed = df_filled_final.withColumn(
    "emission_level",
    when(col("co2_per_capita") >= 10, "High")
    .when(col("co2_per_capita") >= 2.1, "Medium")
    .otherwise("Low")
)
df_transformed = df_transformed.repartition("emission_level")

#### The emission categories have been successfully created. Similarly, we will create for greenhouse gases per capita.

In [None]:
df_transformed = df_transformed.withColumn(
    "ghg_level",
    when(col("ghg_per_capita") >= 10, "High")
    .when(col("ghg_per_capita") >= 2.1, "Medium")
    .otherwise("Low")
)
df_transformed = df_transformed.repartition("ghg_level")

In [None]:
# Adding a column for CO2 3yr rolling average to help find trend

window_spec = Window.partitionBy("country").orderBy("year").rowsBetween(-2, 0)

df_transformed = df_transformed.withColumn(
    "co2_3yr_avg",
    avg("co2").over(window_spec)
)

In [None]:
df_transformed.printSchema()

root
 |-- cement_co2: double (nullable = true)
 |-- cement_co2_per_capita: double (nullable = true)
 |-- co2: double (nullable = true)
 |-- co2_growth_prct: double (nullable = true)
 |-- co2_including_luc: double (nullable = true)
 |-- co2_including_luc_growth_prct: double (nullable = true)
 |-- co2_per_capita: double (nullable = true)
 |-- co2_per_gdp: double (nullable = true)
 |-- coal_co2: double (nullable = true)
 |-- coal_co2_per_capita: double (nullable = true)
 |-- flaring_co2: double (nullable = true)
 |-- flaring_co2_per_capita: double (nullable = true)
 |-- ghg_excluding_lucf_per_capita: double (nullable = true)
 |-- ghg_per_capita: double (nullable = true)
 |-- land_use_change_co2: double (nullable = true)
 |-- land_use_change_co2_per_capita: double (nullable = true)
 |-- methane: double (nullable = true)
 |-- methane_per_capita: double (nullable = true)
 |-- nitrous_oxide: double (nullable = true)
 |-- nitrous_oxide_per_capita: double (nullable = true)
 |-- oil_co2: double 

#### Now, the data has been processed using PySpark. We will now store the processed data in a collection inside a MongoDB database.

In [None]:
# Getting the MongoDB Pyspark connector
!wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar

--2025-08-08 02:55:57--  https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 156368 (153K) [application/java-archive]
Saving to: ‘mongo-spark-connector_2.12-10.1.1.jar’


2025-08-08 02:55:57 (4.88 MB/s) - ‘mongo-spark-connector_2.12-10.1.1.jar’ saved [156368/156368]



In [None]:
import pymongo
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

# Converting the spark dataframe to a pandas dataframe
pandas_df = df_transformed.toPandas()
# Mongo DB server link and connection suing database and collection names
uri = "mongodb+srv://Aniket:Anirane15isro@cluster0.3ko5x.mongodb.net/?appName=Cluster0"
client = pymongo.MongoClient(uri, server_api=ServerApi('1'))
db = client["emission_data"]
collection_1 = db["carbon_emission"]
collection_1.delete_many({})
collection_1.insert_many(pandas_df.to_dict("records"))

InsertManyResult([ObjectId('68956ae2d1650b1d5275721b'), ObjectId('68956ae2d1650b1d5275721c'), ObjectId('68956ae2d1650b1d5275721d'), ObjectId('68956ae2d1650b1d5275721e'), ObjectId('68956ae2d1650b1d5275721f'), ObjectId('68956ae2d1650b1d52757220'), ObjectId('68956ae2d1650b1d52757221'), ObjectId('68956ae2d1650b1d52757222'), ObjectId('68956ae2d1650b1d52757223'), ObjectId('68956ae2d1650b1d52757224'), ObjectId('68956ae2d1650b1d52757225'), ObjectId('68956ae2d1650b1d52757226'), ObjectId('68956ae2d1650b1d52757227'), ObjectId('68956ae2d1650b1d52757228'), ObjectId('68956ae2d1650b1d52757229'), ObjectId('68956ae2d1650b1d5275722a'), ObjectId('68956ae2d1650b1d5275722b'), ObjectId('68956ae2d1650b1d5275722c'), ObjectId('68956ae2d1650b1d5275722d'), ObjectId('68956ae2d1650b1d5275722e'), ObjectId('68956ae2d1650b1d5275722f'), ObjectId('68956ae2d1650b1d52757230'), ObjectId('68956ae2d1650b1d52757231'), ObjectId('68956ae2d1650b1d52757232'), ObjectId('68956ae2d1650b1d52757233'), ObjectId('68956ae2d1650b1d527572

## Retreiving the 2nd data for aviation emissions.

In [None]:
!curl -H "Accept: application/vnd.sdmx.data+csv" -o aviation_emission_data.csv https://sdmx.oecd.org/public/rest/data/OECD.SDD.NAD.SEEA,DSD_AIR_TRANSPORT@DF_AIR_TRANSPORT,/W+ZWE+ZMB+YEM+VNM+VEN+VUT+UZB+URY+ARE+UKR+UGA+TUV+TKM+TUN+TTO+TON+TGO+TLS+THA+TZA+TJK+SYR+TWN+SUR+SDN+LKA+SSD+ZAF+SOM+SLB+SXM+SLE+SGP+SYC+SRB+SEN+SAU+STP+SMR+WSM+VCT+LCA+KNA+RWA+RUS+ROU+QAT+PHL+PER+PRY+PNG+PAN+PLW+PAK+OMN+MKD+NGA+NIU+NIC+NER+NRU+NPL+NAM+MMR+MOZ+MAR+MNE+MNG+MCO+MDA+FSM+MUS+MRT+MHL+MLT+MLI+MDV+MYS+MWI+MDG+MAC+LBY+LBR+LSO+LBN+LAO+KWT+KGZ+XKV+KIR+KEN+KAZ+JOR+JEY+JAM+IRQ+IDN+IRN+IND+HKG+HND+HTI+GUY+GNB+GIN+GTM+GRD+GHA+GEO+GMB+GAB+FJI+ETH+SWZ+ERI+GNQ+SLV+EGY+ECU+DOM+DMA+COD+DJI+PRK+CYP+CUB+HRV+CIV+COK+COG+CCK+COM+CXR+CHN+TCD+CAF+CMR+KHM+CPV+BDI+BFA+BGR+BRN+BRA+BWA+BIH+BOL+BTN+BEN+BLZ+BLR+BRB+BGD+BHR+BHS+AZE+ARM+ARG+ATG+AGO+DZA+ALB+AFG+WXOECD+OECD+USA+GBR+TUR+CHE+SWE+ESP+SVN+SVK+PRT+POL+NOR+NZL+NLD+MEX+LUX+LTU+LVA+KOR+JPN+ITA+ISR+IRL+ISL+GRC+HUN+DEU+FIN+EST+DNK+CZE+CRI+COL+CHL+BEL+CAN+AUT+AUS+FRA.A....._T+C+P.RES_TOTAL+TER_INT+TER_DOM+RES_ABROAD+NRES_TERR+NRES_INT_FROM+RES_INT_OUT+RES_INT_TO+RES_INT_FROM+NRES_DOM_IN+RES_DOM_OUT+RES_DOM_IN.?startPeriod=2013&endPeriod=2024&dimensionAtObservation=AllDimensions&format=csvfilewithlabels

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 5978k    0 5978k    0     0  1615k      0 --:--:--  0:00:03 --:--:-- 1615k


In [None]:
import os
from google.cloud import storage

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs_key.json"
client = storage.Client() # Initializing the GCS client
bucket_name = "aquastat-data-2025"  # Connecting to the storage bucket
bucket = client.bucket(bucket_name)

In [None]:
blob = bucket.blob("aviation_emission_data.csv")  # Destination path in bucket
blob.upload_from_filename("aviation_emission_data.csv")

print("CSV file uploaded to GCS")

CSV file uploaded to GCS


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Optimized GCS CSV Read") \
    .master("local[*]") \
    .config("spark.jars", "/tmp/gcs-connector-hadoop3-2.2.5-shaded.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "gcs_key.json") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

In [None]:
# Reading the data from GCS
df_avia = spark.read \
    .option("multiline", "true") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("samplingRatio", 0.1) \
    .csv("gs://aquastat-data-2025/aviation_emission_data.csv")

df_avia.cache()  # Cache in memory to reduce reading times
df_avia.show(5)  # Display top 5 rows

+--------------------+--------+----+---------+------------+----------+---------+-----------+----------------+-----------+-----------+----------------+----------+---------+--------------+--------+
|            DATAFLOW|REF_AREA|FREQ|  MEASURE|UNIT_MEASURE|ADJUSTMENT|POLLUTANT|FLIGHT_TYPE|EMISSIONS_SOURCE|METHODOLOGY|TIME_PERIOD|       OBS_VALUE|OBS_STATUS|UNIT_MULT|CLASSIFICATION|DECIMALS|
+--------------------+--------+----+---------+------------+----------+---------+-----------+----------------+-----------+-----------+----------------+----------+---------+--------------+--------+
|OECD.SDD.NAD.SEEA...|     AUS|   A|EMISSIONS|           T|         N|      CO2|         _T|      RES_INT_TO|         _Z|       2020|1171995.07681918|      NULL|        0|             E|       0|
|OECD.SDD.NAD.SEEA...|     AUT|   A|EMISSIONS|           T|         N|      CO2|         _T|      RES_INT_TO|         _Z|       2020|413154.778900947|      NULL|        0|             E|       0|
|OECD.SDD.NAD.SEEA..

In [None]:
df_avia.printSchema()

root
 |-- DATAFLOW: string (nullable = true)
 |-- REF_AREA: string (nullable = true)
 |-- FREQ: string (nullable = true)
 |-- MEASURE: string (nullable = true)
 |-- UNIT_MEASURE: string (nullable = true)
 |-- ADJUSTMENT: string (nullable = true)
 |-- POLLUTANT: string (nullable = true)
 |-- FLIGHT_TYPE: string (nullable = true)
 |-- EMISSIONS_SOURCE: string (nullable = true)
 |-- METHODOLOGY: string (nullable = true)
 |-- TIME_PERIOD: integer (nullable = true)
 |-- OBS_VALUE: double (nullable = true)
 |-- OBS_STATUS: string (nullable = true)
 |-- UNIT_MULT: integer (nullable = true)
 |-- CLASSIFICATION: string (nullable = true)
 |-- DECIMALS: integer (nullable = true)



In [None]:
# Count nulls or empty strings in each column
missing_counts = df_avia.select([
    sum(when(col(c).isNull() | (col(c) == ""), 1).otherwise(0)).alias(c)
    for c in df_avia.columns
])

missing_counts.show()

+--------+--------+----+-------+------------+----------+---------+-----------+----------------+-----------+-----------+---------+----------+---------+--------------+--------+
|DATAFLOW|REF_AREA|FREQ|MEASURE|UNIT_MEASURE|ADJUSTMENT|POLLUTANT|FLIGHT_TYPE|EMISSIONS_SOURCE|METHODOLOGY|TIME_PERIOD|OBS_VALUE|OBS_STATUS|UNIT_MULT|CLASSIFICATION|DECIMALS|
+--------+--------+----+-------+------------+----------+---------+-----------+----------------+-----------+-----------+---------+----------+---------+--------------+--------+
|       0|       0|   0|      0|           0|         0|        0|          0|               0|          0|          0|        0|     45599|        0|             0|       0|
+--------+--------+----+-------+------------+----------+---------+-----------+----------------+-----------+-----------+---------+----------+---------+--------------+--------+



#### There are no missing values in columns except the column observation status which is entirely blank. We will remove that column.

In [None]:
# Dropping  with only missing values
df_avia_cleaned = df_avia.drop("OBS_STATUS")

In [None]:
# Finding the number of duplicate rows
duplicates = df_avia_cleaned.groupBy(df_avia_cleaned.columns).count().filter(col("count") > 1)
duplicates.show()

+--------+--------+----+-------+------------+----------+---------+-----------+----------------+-----------+-----------+---------+---------+--------------+--------+-----+
|DATAFLOW|REF_AREA|FREQ|MEASURE|UNIT_MEASURE|ADJUSTMENT|POLLUTANT|FLIGHT_TYPE|EMISSIONS_SOURCE|METHODOLOGY|TIME_PERIOD|OBS_VALUE|UNIT_MULT|CLASSIFICATION|DECIMALS|count|
+--------+--------+----+-------+------------+----------+---------+-----------+----------------+-----------+-----------+---------+---------+--------------+--------+-----+
+--------+--------+----+-------+------------+----------+---------+-----------+----------------+-----------+-----------+---------+---------+--------------+--------+-----+



In [None]:
num_duplicates = duplicates.count()
print(f"Number of duplicate rows: {num_duplicates}")

Number of duplicate rows: 0


#### No duplicate rows are present

In [None]:
selected_cols = ["REF_AREA","FLIGHT_TYPE","UNIT_MEASURE","POLLUTANT","EMISSIONS_SOURCE","TIME_PERIOD","OBS_VALUE"]
df_avia_final = df_avia_cleaned.select(*selected_cols)

In [None]:
# Retrieving the iso_code - country name mapping
!wget -O iso_codes.csv https://raw.githubusercontent.com/plotly/datasets/master/2014_world_gdp_with_codes.csv

--2025-08-08 03:11:52--  https://raw.githubusercontent.com/plotly/datasets/master/2014_world_gdp_with_codes.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4515 (4.4K) [text/plain]
Saving to: ‘iso_codes.csv’


2025-08-08 03:11:52 (31.4 MB/s) - ‘iso_codes.csv’ saved [4515/4515]



In [None]:
# Now, we will add country names against country codes in REF_AREA as the data doesn't have country name mapping
iso_map = spark.read.option("header", True).csv("iso_codes.csv")

iso_map = iso_map.withColumnRenamed("CODE", "REF_AREA") \
                             .withColumnRenamed("COUNTRY", "Country_Name")

df_merged = df_avia_final.join(iso_map, on="REF_AREA", how="left")
df_merged.show(5)

+--------+-----------+------------+---------+----------------+-----------+----------------+------------+--------------+
|REF_AREA|FLIGHT_TYPE|UNIT_MEASURE|POLLUTANT|EMISSIONS_SOURCE|TIME_PERIOD|       OBS_VALUE|Country_Name|GDP (BILLIONS)|
+--------+-----------+------------+---------+----------------+-----------+----------------+------------+--------------+
|     AUS|         _T|           T|      CO2|      RES_INT_TO|       2020|1171995.07681918|   Australia|       1483.00|
|     AUT|         _T|           T|      CO2|      RES_INT_TO|       2020|413154.778900947|     Austria|        436.10|
|     BEL|         _T|           T|      CO2|      RES_INT_TO|       2020|548359.558416194|     Belgium|        527.80|
|     CHL|         _T|           T|      CO2|      RES_INT_TO|       2020|427349.949847689|       Chile|        264.10|
|     COL|         _T|           T|      CO2|      RES_INT_TO|       2020|462985.658389138|    Colombia|        400.10|
+--------+-----------+------------+-----

In [None]:
# We will check for missing values in "Country_name" column
# Find rows where country name is missing (null)
missing_country_df = df_merged.filter(df_merged['Country_Name'].isNull())
missing_iso_codes = missing_country_df.select('REF_AREA').distinct() # Selecting only unique ISO codes under REF_AREA without a country name
missing_iso_codes.show()

+--------+
|REF_AREA|
+--------+
|     XKV|
|    OECD|
|     CCK|
|       W|
|     NRU|
|     CXR|
|     BHS|
+--------+



#### From the above output it can be seen that, the codes without a country name are not valid country codes and not required for this particular analysis. We will remove those codes except for 'W' which corresponds to World.

In [None]:
# Replacing country where iso_code is 'W' with 'World'
df_merged = df_merged.withColumn(
    "Country_Name",
    when(col("REF_AREA") == "W", "World").otherwise(col("Country_Name"))
)
df_merged.show(5)

+--------+-----------+------------+---------+----------------+-----------+----------------+------------+--------------+
|REF_AREA|FLIGHT_TYPE|UNIT_MEASURE|POLLUTANT|EMISSIONS_SOURCE|TIME_PERIOD|       OBS_VALUE|Country_Name|GDP (BILLIONS)|
+--------+-----------+------------+---------+----------------+-----------+----------------+------------+--------------+
|     AUS|         _T|           T|      CO2|      RES_INT_TO|       2020|1171995.07681918|   Australia|       1483.00|
|     AUT|         _T|           T|      CO2|      RES_INT_TO|       2020|413154.778900947|     Austria|        436.10|
|     BEL|         _T|           T|      CO2|      RES_INT_TO|       2020|548359.558416194|     Belgium|        527.80|
|     CHL|         _T|           T|      CO2|      RES_INT_TO|       2020|427349.949847689|       Chile|        264.10|
|     COL|         _T|           T|      CO2|      RES_INT_TO|       2020|462985.658389138|    Colombia|        400.10|
+--------+-----------+------------+-----

In [None]:
# As the replacement has been done, we will delete the remaining entries

df_merged_cleaned = df_merged.filter(col("Country_Name").isNotNull()) # Removed rows where Country_Name is null
df_merged_cleaned.show(5) # Displaying top 5 rows to confirm

+--------+-----------+------------+---------+----------------+-----------+----------------+------------+--------------+
|REF_AREA|FLIGHT_TYPE|UNIT_MEASURE|POLLUTANT|EMISSIONS_SOURCE|TIME_PERIOD|       OBS_VALUE|Country_Name|GDP (BILLIONS)|
+--------+-----------+------------+---------+----------------+-----------+----------------+------------+--------------+
|     AUS|         _T|           T|      CO2|      RES_INT_TO|       2020|1171995.07681918|   Australia|       1483.00|
|     AUT|         _T|           T|      CO2|      RES_INT_TO|       2020|413154.778900947|     Austria|        436.10|
|     BEL|         _T|           T|      CO2|      RES_INT_TO|       2020|548359.558416194|     Belgium|        527.80|
|     CHL|         _T|           T|      CO2|      RES_INT_TO|       2020|427349.949847689|       Chile|        264.10|
|     COL|         _T|           T|      CO2|      RES_INT_TO|       2020|462985.658389138|    Colombia|        400.10|
+--------+-----------+------------+-----

In [None]:
# We will check for missing values in "Country_name" column
# Find rows where country name is missing (null)
missing_country = df_merged_cleaned.filter(df_merged_cleaned['Country_Name'].isNull())
missing_iso = missing_country.select('REF_AREA').distinct() # Selecting only unique ISO codes under REF_AREA without a country name
missing_iso.show()

+--------+
|REF_AREA|
+--------+
+--------+



## Now, we will convert the spark dataframe to a pandas dataframe and upload it to mongo DB.

In [None]:
# Assuming df_transformed is small enough to convert
pandas_avia_df = df_merged_cleaned.toPandas()

In [None]:
# Dropping the last column as it is not required in this data table
pandas_avia_df = pandas_avia_df.iloc[:, :-1]

In [None]:
pandas_avia_df.head()

Unnamed: 0,REF_AREA,FLIGHT_TYPE,UNIT_MEASURE,POLLUTANT,EMISSIONS_SOURCE,TIME_PERIOD,OBS_VALUE,Country_Name
0,AUS,_T,T,CO2,RES_INT_TO,2020,1171995.0,Australia
1,AUT,_T,T,CO2,RES_INT_TO,2020,413154.8,Austria
2,BEL,_T,T,CO2,RES_INT_TO,2020,548359.6,Belgium
3,CHL,_T,T,CO2,RES_INT_TO,2020,427349.9,Chile
4,COL,_T,T,CO2,RES_INT_TO,2020,462985.7,Colombia


In [None]:
# Now, this data will be uploaded and stored in the MongoDB database created earlier under a new collection

In [None]:
import pymongo
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

uri = "mongodb+srv://Aniket:Anirane15isro@cluster0.3ko5x.mongodb.net/?appName=Cluster0"
client = pymongo.MongoClient(uri, server_api=ServerApi('1'))
db = client["emission_data"]
collection = db["aviation_emission"]
collection.delete_many({})
collection.insert_many(pandas_avia_df.to_dict("records"))

InsertManyResult([ObjectId('68956b07d1650b1d5275a26a'), ObjectId('68956b07d1650b1d5275a26b'), ObjectId('68956b07d1650b1d5275a26c'), ObjectId('68956b07d1650b1d5275a26d'), ObjectId('68956b07d1650b1d5275a26e'), ObjectId('68956b07d1650b1d5275a26f'), ObjectId('68956b07d1650b1d5275a270'), ObjectId('68956b07d1650b1d5275a271'), ObjectId('68956b07d1650b1d5275a272'), ObjectId('68956b07d1650b1d5275a273'), ObjectId('68956b07d1650b1d5275a274'), ObjectId('68956b07d1650b1d5275a275'), ObjectId('68956b07d1650b1d5275a276'), ObjectId('68956b07d1650b1d5275a277'), ObjectId('68956b07d1650b1d5275a278'), ObjectId('68956b07d1650b1d5275a279'), ObjectId('68956b07d1650b1d5275a27a'), ObjectId('68956b07d1650b1d5275a27b'), ObjectId('68956b07d1650b1d5275a27c'), ObjectId('68956b07d1650b1d5275a27d'), ObjectId('68956b07d1650b1d5275a27e'), ObjectId('68956b07d1650b1d5275a27f'), ObjectId('68956b07d1650b1d5275a280'), ObjectId('68956b07d1650b1d5275a281'), ObjectId('68956b07d1650b1d5275a282'), ObjectId('68956b07d1650b1d5275a2