### Importing Libraries

In [1]:
import pandas as pd
import pymongo
from psycopg2 import sql
import psycopg2

### MongoDB Connection to fetch data

In [2]:
renewable_client = pymongo.MongoClient("mongodb://localhost:27017/")

In [3]:
renewable_db = renewable_client["co2_renew_life_db"]

In [4]:
print(renewable_db)

Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'co2_renew_life_db')


In [5]:
renewable_collection = renewable_db["renewable_energy"]

In [6]:
renewable_all_records = renewable_collection.find()

In [7]:
print(renewable_all_records)

<pymongo.synchronous.cursor.Cursor object at 0x0000025CE37E3380>


In [8]:
for record in renewable_all_records:
    print(record)

{'_id': ObjectId('680625e5f5024b2c90d00ee1'), 'Entity': 'Africa', 'Code': nan, 'Year': 1965, 'Renewables (% equivalent primary energy)': 5.7402806}
{'_id': ObjectId('680625e5f5024b2c90d00ee2'), 'Entity': 'Africa', 'Code': nan, 'Year': 1966, 'Renewables (% equivalent primary energy)': 6.1139693}
{'_id': ObjectId('680625e5f5024b2c90d00ee3'), 'Entity': 'Africa', 'Code': nan, 'Year': 1967, 'Renewables (% equivalent primary energy)': 6.31658}
{'_id': ObjectId('680625e5f5024b2c90d00ee4'), 'Entity': 'Africa', 'Code': nan, 'Year': 1968, 'Renewables (% equivalent primary energy)': 6.994845}
{'_id': ObjectId('680625e5f5024b2c90d00ee5'), 'Entity': 'Africa', 'Code': nan, 'Year': 1969, 'Renewables (% equivalent primary energy)': 7.9439163}
{'_id': ObjectId('680625e5f5024b2c90d00ee6'), 'Entity': 'Africa', 'Code': nan, 'Year': 1970, 'Renewables (% equivalent primary energy)': 9.148895}
{'_id': ObjectId('680625e5f5024b2c90d00ee7'), 'Entity': 'Africa', 'Code': nan, 'Year': 1971, 'Renewables (% equivale

In [9]:
renewable_all_records = renewable_collection.find()

In [10]:
print(renewable_all_records)

<pymongo.synchronous.cursor.Cursor object at 0x0000025CE381E350>


In [11]:
renewable_list_cursor=list(renewable_all_records)

In [12]:
renewable_list_cursor

[{'_id': ObjectId('680625e5f5024b2c90d00ee1'),
  'Entity': 'Africa',
  'Code': nan,
  'Year': 1965,
  'Renewables (% equivalent primary energy)': 5.7402806},
 {'_id': ObjectId('680625e5f5024b2c90d00ee2'),
  'Entity': 'Africa',
  'Code': nan,
  'Year': 1966,
  'Renewables (% equivalent primary energy)': 6.1139693},
 {'_id': ObjectId('680625e5f5024b2c90d00ee3'),
  'Entity': 'Africa',
  'Code': nan,
  'Year': 1967,
  'Renewables (% equivalent primary energy)': 6.31658},
 {'_id': ObjectId('680625e5f5024b2c90d00ee4'),
  'Entity': 'Africa',
  'Code': nan,
  'Year': 1968,
  'Renewables (% equivalent primary energy)': 6.994845},
 {'_id': ObjectId('680625e5f5024b2c90d00ee5'),
  'Entity': 'Africa',
  'Code': nan,
  'Year': 1969,
  'Renewables (% equivalent primary energy)': 7.9439163},
 {'_id': ObjectId('680625e5f5024b2c90d00ee6'),
  'Entity': 'Africa',
  'Code': nan,
  'Year': 1970,
  'Renewables (% equivalent primary energy)': 9.148895},
 {'_id': ObjectId('680625e5f5024b2c90d00ee7'),
  'Entity

In [13]:
renewable_energy_df = pd.DataFrame(renewable_list_cursor)

In [14]:
renewable_energy_df

Unnamed: 0,_id,Entity,Code,Year,Renewables (% equivalent primary energy)
0,680625e5f5024b2c90d00ee1,Africa,,1965,5.740281
1,680625e5f5024b2c90d00ee2,Africa,,1966,6.113969
2,680625e5f5024b2c90d00ee3,Africa,,1967,6.316580
3,680625e5f5024b2c90d00ee4,Africa,,1968,6.994845
4,680625e5f5024b2c90d00ee5,Africa,,1969,7.943916
...,...,...,...,...,...
4898,680625e5f5024b2c90d02203,World,OWID_WRL,2019,12.228147
4899,680625e5f5024b2c90d02204,World,OWID_WRL,2020,13.404395
4900,680625e5f5024b2c90d02205,World,OWID_WRL,2021,13.469198
4901,680625e5f5024b2c90d02206,World,OWID_WRL,2022,14.119935


### Data Transformation

In [15]:
renewable_energy_df.drop(columns=["Code"], inplace=True, axis=1)

In [16]:
renewable_energy_df.drop(columns=["_id"], inplace=True, axis=1)

In [17]:
renewable_energy_df.rename(columns={"Entity": "Area"}, inplace=True)

In [18]:
renewable_energy_df.rename(columns={'Renewables (% equivalent primary energy)': 'Renewable_Energy_Share'}, inplace=True)

In [19]:
renewable_energy_df.head()

Unnamed: 0,Area,Year,Renewable_Energy_Share
0,Africa,1965,5.740281
1,Africa,1966,6.113969
2,Africa,1967,6.31658
3,Africa,1968,6.994845
4,Africa,1969,7.943916


### Loading Transformed Data to PostgreSQL

In [20]:
pgconn=psycopg2.connect(host="localhost", database="co2_renew_life", user="postgres", password="admin")

In [21]:
pgcursor=pgconn.cursor()

In [22]:
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [23]:
pgcursor.execute("CREATE TABLE IF NOT EXISTS renewable_energy (Area TEXT, Year INT, Renewable_Energy_Share FLOAT);")

In [24]:
from psycopg2.extras import execute_values

execute_values(pgcursor, "INSERT INTO renewable_energy (Area, Year, Renewable_Energy_Share) VALUES %s", renewable_energy_df.values.tolist())

In [25]:
pgconn.commit()

### MongoDB Connection to fetch data and transformation

In [26]:
def check_mongodb_collection(client, db_name, collection_name):
    try:
        db = client[db_name]
        print(f"Connected to database: {db_name}")
        collections = db.list_collection_names()
        print(f"Collections in {db_name}: {collections}")
        
        if collection_name not in collections:
            print(f"Collection '{collection_name}' does not exist in {db_name}")
            return False
        
        collection = db[collection_name]
        doc_count = collection.count_documents({})
        print(f"Number of documents in {collection_name}: {doc_count}")
        
        if doc_count == 0:
            print(f"Collection '{collection_name}' is empty")
            return False
        
        sample_doc = collection.find_one()
        print(f"Sample document from {collection_name}: {sample_doc}")
        return True
    except Exception as e:
        print(f"Error accessing {db_name}/{collection_name}: {e}")
        return False


print("--- Processing Carbon Emissions Data ---")

try:
    client = pymongo.MongoClient("mongodb://localhost:27017/")
    print("MongoDB connection successful")
    print("Available databases:", client.list_database_names())
except Exception as e:
    print(f"Failed to connect to MongoDB: {e}")
    client = None

if client:
    if check_mongodb_collection(client, "co2_renew_life_db", "carbon_emissions"):
        db = client["co2_renew_life_db"]
        emissions_collection = db["carbon_emissions"]

        emissions_all_records = emissions_collection.find()
        emissions_list_cursor = list(emissions_all_records)
        print(f"Retrieved {len(emissions_list_cursor)} records from carbon_emissions")

        emissions_df = pd.DataFrame(emissions_list_cursor)

        if emissions_df.empty:
            print("Emissions DataFrame is empty")
        else:
            print("Emissions DataFrame shape:", emissions_df.shape)
            print("Emissions DataFrame columns:", emissions_df.columns)
            print("Emissions DataFrame missing values:\n", emissions_df.isnull().sum())
            print("Emissions DataFrame head:\n", emissions_df.head())

            if '_id' in emissions_df.columns:
                emissions_df.drop(columns=["_id"], inplace=True)
            if 'Code' in emissions_df.columns:
                emissions_df.drop(columns=["Code"], inplace=True)

            emissions_df.rename(columns={
                "Entity": "Area",
                "Year": "Year",
                "Annual CO₂ emissions (per capita)": "CO2_Emissions_Per_Capita"
            }, inplace=True)

            print("Cleaned Emissions DataFrame head:\n", emissions_df.head())
    else:
        print("Skipping emissions data processing due to empty or missing collection")
else:
    print("Skipping emissions data processing due to connection failure")

--- Processing Carbon Emissions Data ---
MongoDB connection successful
Available databases: ['admin', 'co2_renew_life_db', 'config', 'local']
Connected to database: co2_renew_life_db
Collections in co2_renew_life_db: ['renewable_energy', 'carbon_emissions', 'life_expectancy']
Number of documents in carbon_emissions: 26182
Sample document from carbon_emissions: {'_id': ObjectId('680625e5f5024b2c90cfa89b'), 'Entity': 'Afghanistan', 'Code': 'AFG', 'Year': 1949, 'Annual CO₂ emissions (per capita)': 0.0019921463}
Retrieved 26182 records from carbon_emissions
Emissions DataFrame shape: (26182, 5)
Emissions DataFrame columns: Index(['_id', 'Entity', 'Code', 'Year', 'Annual CO₂ emissions (per capita)'], dtype='object')
Emissions DataFrame missing values:
 _id                                     0
Entity                                  0
Code                                 3287
Year                                    0
Annual CO₂ emissions (per capita)       0
dtype: int64
Emissions DataFrame

In [27]:
emissions_df.head()

Unnamed: 0,Area,Year,CO2_Emissions_Per_Capita
0,Afghanistan,1949,0.001992
1,Afghanistan,1950,0.010837
2,Afghanistan,1951,0.011625
3,Afghanistan,1952,0.011468
4,Afghanistan,1953,0.013123


### Loading Transformed Data to PostgreSQL

In [28]:
pgconn=psycopg2.connect(host="localhost", database="co2_renew_life", user="postgres", password="admin")

In [29]:
pgcursor=pgconn.cursor()

In [30]:
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [31]:
pgcursor.execute("CREATE TABLE IF NOT EXISTS carbon_emissions (Area TEXT, Year INT, CO2_Emissions_Per_Capita FLOAT);")

In [32]:
from psycopg2.extras import execute_values

execute_values(pgcursor, "INSERT INTO carbon_emissions (Area, Year, CO2_Emissions_Per_Capita) VALUES %s", emissions_df.values.tolist())

In [33]:
pgconn.commit()

### MongoDB Connection to fetch data and transformation

In [34]:
if client:
    if check_mongodb_collection(client, "co2_renew_life_db", "life_expectancy"):
        db = client["co2_renew_life_db"]
        life_expectancy_collection = db["life_expectancy"]

        life_expectancy_all_records = life_expectancy_collection.find()
        life_expectancy_list_cursor = list(life_expectancy_all_records)
        print(f"Retrieved {len(life_expectancy_list_cursor)} records from life_expectancy")

        life_expectancy_df = pd.DataFrame(life_expectancy_list_cursor)

        if life_expectancy_df.empty:
            print("Life Expectancy DataFrame is empty")
        else:
            print("Life Expectancy DataFrame shape:", life_expectancy_df.shape)
            print("Life Expectancy DataFrame columns:", life_expectancy_df.columns)
            print("Life Expectancy DataFrame missing values:\n", life_expectancy_df.isnull().sum())
            print("Life Expectancy DataFrame head:\n", life_expectancy_df.head())

            if '_id' in life_expectancy_df.columns:
                life_expectancy_df.drop(columns=["_id"], inplace=True)
            if 'Code' in life_expectancy_df.columns:
                life_expectancy_df.drop(columns=["Code"], inplace=True)

            life_expectancy_df.rename(columns={
                "Entity": "Area",
                "Year": "Year",
                "Period life expectancy at birth - Sex: total - Age: 0": "Life_Expectancy"
            }, inplace=True)

            print("Cleaned Life Expectancy DataFrame head:\n", life_expectancy_df.head())
    else:
        print("Skipping life expectancy data processing due to empty or missing collection")
else:
    print("Skipping life expectancy data processing due to connection failure")

Connected to database: co2_renew_life_db
Collections in co2_renew_life_db: ['renewable_energy', 'carbon_emissions', 'life_expectancy']
Number of documents in life_expectancy: 21565
Sample document from life_expectancy: {'_id': ObjectId('680625e5f5024b2c90d02208'), 'Entity': 'Afghanistan', 'Code': 'AFG', 'Year': 1950, 'Period life expectancy at birth - Sex: total - Age: 0': 28.1563}
Retrieved 21565 records from life_expectancy
Life Expectancy DataFrame shape: (21565, 5)
Life Expectancy DataFrame columns: Index(['_id', 'Entity', 'Code', 'Year',
       'Period life expectancy at birth - Sex: total - Age: 0'],
      dtype='object')
Life Expectancy DataFrame missing values:
 _id                                                         0
Entity                                                      0
Code                                                     1956
Year                                                        0
Period life expectancy at birth - Sex: total - Age: 0       0
dtype: int6

In [35]:
life_expectancy_df.head()

Unnamed: 0,Area,Year,Life_Expectancy
0,Afghanistan,1950,28.1563
1,Afghanistan,1951,28.5836
2,Afghanistan,1952,29.0138
3,Afghanistan,1953,29.4521
4,Afghanistan,1954,29.6975


### Loading Transformed Data to PostgreSQL

In [36]:
pgconn=psycopg2.connect(host="localhost", database="co2_renew_life", user="postgres", password="admin")

In [37]:
pgcursor=pgconn.cursor()

In [38]:
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [39]:
pgcursor.execute("CREATE TABLE IF NOT EXISTS life_expectancy (Area TEXT, Year INT, Life_Expectancy FLOAT);")

In [40]:
from psycopg2.extras import execute_values

execute_values(pgcursor, "INSERT INTO life_expectancy (Area, Year, Life_Expectancy) VALUES %s", life_expectancy_df.values.tolist())

In [41]:
pgconn.commit()