# Loading Dataset to PostgreSQL

In [1]:
# pip install psycopg2

In [2]:
import os

import psycopg2
import psycopg2.extras as extras
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from pathlib import Path 

In [3]:
file_path = "data/IEA_EDGAR_CO2_1970_2023.xlsx"
env_path = ""
if os.path.exists(file_path) and os.path.isfile(file_path):
    env_path = Path('.') / '.env'
    print(f"The file {file_path} exists")
else:
    file_path = "../../data/IEA_EDGAR_CO2_1970_2023.xlsx"
    env_path = Path('../../') / '.env'
df = pd.read_excel(file_path,sheet_name="IPCC 2006",skiprows=9)

The file data/IEA_EDGAR_CO2_1970_2023.xlsx exists


In [4]:
df = df.rename(columns={"C_group_IM24_sh":"zone","Country_code_A3":"code"})
df = df.rename(columns={"ipcc_code_2006_for_standard_report":"ipcc_code","ipcc_code_2006_for_standard_report_name":"ipcc_name"})
df.fillna(0,inplace=True)

In [5]:
create_queries = {"country":
    """
    CREATE TABLE country(
        country_id SERIAL PRIMARY KEY,
        name VARCHAR(255),
        code VARCHAR(3),
        zone VARCHAR(25),
        ipcc_annex VARCHAR(25)
    )
    """
    ,"ipcc":
    """
    CREATE TABLE ipcc(
        ipcc_id SERIAL PRIMARY KEY,
        ipcc_code VARCHAR(15),
        ipcc_name VARCHAR(255)
    )
    """
    ,"emission":
    """
    CREATE TABLE emission(
        emission_id SERIAL PRIMARY KEY,
        country_id INTEGER REFERENCES country(country_id),
        ipcc_id INTEGER REFERENCES ipcc(ipcc_id) {columns}, UNIQUE(country_id,ipcc_id)
    )
    """}

In [6]:
check_query = """
SELECT EXISTS (
   SELECT 1 FROM information_schema.tables 
   WHERE table_schema ='public' 
   AND table_name ='{table}'
   )"""

In [7]:
table_list=["country","ipcc","emission"]

In [8]:
def connectToDB(db = "postgres"):
    try:
        load_dotenv(verbose=True)
        if(load_dotenv(dotenv_path=env_path)):
            _host = os.environ.get("pg_host")
            _username = os.environ.get("pg_username")
            _password = os.environ.get("pg_password")
            _port = int(os.environ.get("pg_port"))
            conn = psycopg2.connect(database=db,
                    host=_host,
                    user=_username,
                    password=_password,
                    port=_port)
            return conn
        else:
            return None
    except (psycopg2.DatabaseError, Exception) as error:
        print(error)

In [9]:
def create_database(db):
    try:
        connection = connectToDB()

        # "CREATE DATABASE" requires automatic commits
        connection.autocommit = True

        #cursor - structure that enables traversal over the records in a database
        cursor = connection.cursor()

        #Droping database MYDATABASE if already exists.
        cursor.execute(f"DROP database IF EXISTS {db} WITH (FORCE);")
                
        #creating a database
        sql = f"CREATE database {db}"
        cursor.execute(sql)

        cursor.close()
     
    except (Exception, psycopg2.Error) as error:
        print("Error while getting data", error)

    finally:
        if connection is not None:
            connection.close()

In [10]:
data_collection = {
    "country":set(),
    "ipcc":set(),
    "emission":[]
}

year_list=[]
for i in range(2000,2024,1):
    year_list.append("Y_" + str(i))

for i,row in df.iterrows():
    #country data
    country_name = row["Name"]
    country_zone = row["zone"]
    country_code = row["code"]
    IPCC_annex = row["IPCC_annex"]
    data_collection["country"].add((country_name,country_zone,country_code,IPCC_annex))
    #ipcc data
    ipcc_code = row["ipcc_code"]
    ipcc_name = row["ipcc_name"]
    data_collection["ipcc"].add((ipcc_code,ipcc_name))
    #emission data

    year_columns = df[year_list];
    emission = {
        'country_code': country_code,
        'ipcc_code': ipcc_code,
        'ipcc_name': ipcc_name
    }   
    for year_column in year_columns:
        emission_rate = row[year_column]
        if pd.notna(emission_rate):
            emission[year_column] = emission_rate
    data_collection["emission"].append(emission)
            

In [11]:
def insertCountryData():
    insert_query = """
    INSERT INTO country(name,zone,code,ipcc_annex)
    VALUES (%s,%s,%s,%s)
    """
    try:
        cursor.executemany(insert_query,data_collection["country"])
        conn.commit()
    except (psycopg2.DatabaseError, Exception) as error:
        print(error)
        conn.rollback()

def insertIpccData():
    insert_query = """
    INSERT INTO ipcc(ipcc_code,ipcc_name)
    VALUES (%s,%s)
    """
    try:
        cursor.executemany(insert_query,data_collection["ipcc"])
        conn.commit()
    except (psycopg2.DatabaseError, Exception) as error:
        print(error)
        conn.rollback()
        
def insertEmissionData():
    emission_rate_values=[]
    for emission in data_collection["emission"]:
        try:
            cursor.execute("""
                SELECT country_id FROM country WHERE code = %s
                """,(emission['country_code'],)
            )
            country_id_list = cursor.fetchall()
            cursor.execute("""
                SELECT ipcc_id FROM ipcc WHERE ipcc_code = %s
                AND ipcc_name = %s
                """,(emission['ipcc_code'],emission['ipcc_name'])
            )
            ipcc_id_list = cursor.fetchall()
            
            if country_id_list and ipcc_id_list:
                emission["country_id"] = country_id_list[0][0]
                emission["ipcc_id"] = ipcc_id_list[0][0]
                emission_rate_values.append(emission)
            else:
                print("Id not found!")
        except (psycopg2.DatabaseError, Exception) as error:
            print(error)
            conn.rollback()
            break

    try:
        size = 10
        insert_query = """
        INSERT INTO emission(country_id,ipcc_id{year})
        VALUES (%s,%s{values})
        """
        column_list = [","+x for x in year_list]
        value_list = [",%s" for x in year_list]
        insert_query = insert_query.format(year="".join(column_list),values="".join(value_list))
        for i in range(0, len(emission_rate_values),size):
            batch = emission_rate_values[i: i + size]
            row_data=[]
            for x in batch:
                data = [x["country_id"],x["ipcc_id"]]
                for year in year_list:
                    data.append(x[year])
                row_data.append(tuple(data))
            cursor.executemany(insert_query,row_data)
            conn.commit()
    except (psycopg2.DatabaseError, Exception) as error:
            print(error)
            conn.rollback()
                

In [12]:
try:
    create_database("co2_emission")
    conn = connectToDB("co2_emission")
    cursor = conn.cursor()
    for table in table_list:
        query = str(check_query).format(table=table)
        cursor.execute(query)
        table_exist = cursor.fetchall()[0][0]
        if(table_exist):
            continue
        else:
            create_query = create_queries[table]
            if(table == "emission"):
                # print(table)
                column_list = [","+x+" NUMERIC(20,10)" for x in year_list]
                columns = "".join(column_list)
                create_query =  create_query.format(columns = columns)
                # print(create_query)
            cursor.execute(create_query)
            conn.commit()
            if(table == "country"):
                insertCountryData()
            if(table == "ipcc"):
                insertIpccData()
            if(table == "emission"):
                insertEmissionData()
    #add indexes
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_country_year_emission ON emission(country_id)")
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_ipcc_year_emission ON emission(ipcc_id)")
    conn.commit()
except (psycopg2.DatabaseError, Exception) as error:
    print(error)
    conn.rollback()