In [None]:
import csv

import pandas as pd
import psycopg
from psycopg import Cursor

from secret import POSTGRES_CONNECTION_STRING

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as db:
    db.execute("DROP TABLE IF EXISTS region CASCADE")
    db.execute("DROP TABLE IF EXISTS submission CASCADE")
    db.execute("DROP TABLE IF EXISTS activity CASCADE")
    db.execute("DROP TABLE IF EXISTS value_type CASCADE")
    db.execute("DROP TABLE IF EXISTS pollutant CASCADE")
    db.execute("DROP TABLE IF EXISTS pollutant_category CASCADE")
    db.execute("DROP TABLE IF EXISTS sectoral_approach CASCADE")
    db.execute("DROP TABLE IF EXISTS sectoral_approach_activities CASCADE")

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as db:
    for table in db.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"):
        print(table)

### Define some helper functions

In [None]:
CSV_ENCODING = 'UTF8'
CSV_DELIMITER = ','
CSV_QUOTE_CHAR = '"'

def import_sample_record(file_name: str, cursor: Cursor, table: str, primary_key: str, columns: str,
                         convert_to_bool: set={}, convert_to_int: set={}, convert_to_float: set={},
                         print_log: bool=False):
    # Load currently present data, used to skip records that already exist below 
    current_data = pd.read_sql_table(table, POSTGRES_CONNECTION_STRING, index_col=primary_key)
    
    # Walk through csv file, add records as needed
    with cursor.copy(f'COPY {table} {columns} FROM STDIN') as copy:
        with open(file_name, 'r', encoding=CSV_ENCODING, newline='') as file:
            for record in csv.DictReader(file, delimiter=CSV_DELIMITER, quotechar=CSV_QUOTE_CHAR, quoting=csv.QUOTE_NONNUMERIC):
                if record[primary_key] not in current_data.index:
                    
                    # Convert types and handle empty values
                    for key, value in record.items():
                        if isinstance(value, str) and len(value) == 0:
                            record[key] = None  # Replace empty values with NULL
                        elif key in convert_to_bool:
                            record[key] = value.lower() != 'false'
                        elif key in convert_to_int:
                            record[key] = int(value)
                   
                    copy.write_row(record.values())
                    if print_log:
                        print(f'Adding new record: {tuple(record.values())}')
                elif print_log:
                    print(f'Skipping existing record: {tuple(record.values())}')

def import_sample_time_series(file_name: str, cursor: Cursor, print_log: bool=False):
    with open(file_name, 'r', encoding=CSV_ENCODING, newline='') as file:
        for row, time_series in enumerate(csv.DictReader(file, delimiter=CSV_DELIMITER, quotechar=CSV_QUOTE_CHAR, quoting=csv.QUOTE_NONNUMERIC)):
            if print_log:
                print(f"Adding time series from row {row+2}: {time_series['region']}*{time_series['submission']}*{time_series['activity']}*{time_series['type']}*{time_series['pollutant']}")
            
            crf_variable_id = None
            if time_series.get('crf_variable', None) or None:
                cursor.execute('SELECT id FROM crf_variable WHERE submission=%s and variable=%s', (time_series['submission'], time_series['crf_variable']))
                crf_variable_id = cursor.fetchone()[0]
            
            cursor.execute("""
                INSERT INTO sectoral_approach (region_code, submission_id, value_type_short, pollutant_chemical,
                                               fuel_id, product_id, species_id, scenario, crf_variable_id, confidential, unit) 
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    RETURNING id
                """, (time_series['region'],
                      time_series['submission'],
                      time_series['type'],
                      time_series.get('pollutant', None) or None,
                      time_series.get('fuel', None) or None,
                      time_series.get('product', None) or None,
                      time_series.get('species', None) or None,
                      time_series['scenario'],
                      crf_variable_id,
                      time_series['confidential'],
                      time_series['unit']))
            time_series_database_id = cursor.fetchone()[0]
            
            activities = []
            for level, activity in enumerate(time_series['path'].split(',')):
                activities += [(time_series_database_id, activity, level, time_series['path'].endswith(f',{activity}'))]
            cursor.executemany("""
                INSERT INTO sectoral_approach_activities (time_series_id, activity_code, level, leaf) 
                    VALUES (%s, %s, %s, %s)
                """, activities)
            
            values = []
            for year in range(1990, 2050+1):
                if str(year) in time_series.keys() and time_series[str(year)]:
                    values += [(time_series_database_id,
                                year,
                                time_series[str(year)] if not isinstance(time_series[str(year)], str) else 0.0,
                                time_series[str(year)] if isinstance(time_series[str(year)], str) else None)]
            cursor.executemany("""
                INSERT INTO sectoral_approach_value (time_series_id, year, value, notation_key) 
                    VALUES (%s, %s, %s, %s)
                """, values)
                    
def show_full_table(table: str, primary_key: str=None):
    with pd.option_context('display.max_rows', None, 'display.max_columns', None):
        display(pd.read_sql_table(table, POSTGRES_CONNECTION_STRING, index_col=primary_key))

### CORE DIMENSION: Region

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS region (
                code     text     PRIMARY KEY CHECK (char_length(code) >= 2),
                label_en text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de text     NULL        CHECK (char_length(label_de) > 0),
                order_by smallint NOT NULL    CHECK (order_by >= 0),
                part_of  text     NULL        REFERENCES region DEFAULT NULL)
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/regions.csv', cursor=cursor, table='region',
                             primary_key='code', columns='(code, part_of, label_en, label_de, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        #show_full_table('region', 'code')

### CORE DIMENSION: Submission

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS submission (
                id          text     PRIMARY KEY,
                region_code text     NOT NULL REFERENCES region,
                date        date     NOT NULL,
                label_en    text     NOT NULL CHECK (char_length(label_en) > 0),
                label_de    text     NULL     CHECK (char_length(label_de) > 0),
                order_by    smallint NOT NULL CHECK (order_by >= 0))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/submissions.csv', cursor=cursor, table='submission',
                             primary_key='id', columns='(id, region_code, label_en, label_de, date, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        #show_full_table('submission', 'id')

### CORE DIMENSION: Activity

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS activity (
                code          text     PRIMARY KEY CHECK (char_length(code) > 0),
                label_en      text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de      text     NULL        CHECK (char_length(label_de) > 0),
                tree_label_en text     NULL        CHECK (char_length(tree_label_en) > 0),
                tree_label_de text     NULL        CHECK (char_length(tree_label_de) > 0),
                image_url     text     NULL,
                order_by      smallint NOT NULL    CHECK (order_by >= 0),
                part_of       text     NULL        REFERENCES activity)
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/activities.csv', cursor=cursor, table='activity',
                             primary_key='code', columns='(code, part_of, label_en, label_de, tree_label_en, tree_label_de, image_url, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        show_full_table('activity', 'code')

### CORE DIMENSION: Value type

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS value_type (
                short    text     PRIMARY KEY CHECK (char_length(short) >= 2),
                label_en text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de text     NULL        CHECK (char_length(label_de) > 0),
                order_by smallint NOT NULL    CHECK (order_by >= 0))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/types.csv', cursor=cursor, table='value_type',
                             primary_key='short', columns='(short, label_en, label_de, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        show_full_table('value_type', 'short')

### CORE DIMENSION: Pollutant

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS pollutant_category (
                name    text PRIMARY KEY,
                comment text NOT NULL)
            """)
        cursor.execute("""
            INSERT INTO pollutant_category VALUES
                ('GHG', 'Greenhouse gases'),
                ('AP', 'Air pollutants'),
                ('TSP', 'Dust and particles'),
                ('HM', 'Heavy metals'),
                ('POP', 'Persistent organic pollutants'),
                ('Other', 'Other pollutants')
            ON CONFLICT (name) DO NOTHING
            """)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS pollutant (
                chemical text     PRIMARY KEY CHECK (char_length(chemical) >= 2),
                category text     NOT NULL    REFERENCES pollutant_category,
                label_en text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de text     NULL        CHECK (char_length(label_de) > 0),
                order_by smallint NOT NULL    CHECK (order_by >= 0))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/pollutants.csv', cursor=cursor, table='pollutant',
                             primary_key='chemical', columns='(chemical, label_en, label_de, category, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        show_full_table('pollutant', 'chemical')

### EXTRA DIMENSION: Fuel

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS fuel_category (
                name text PRIMARY KEY)
            """)
        cursor.execute("""
            INSERT INTO fuel_category VALUES
                ('GASEOUS'),
                ('LIQUID'),
                ('SOLID')
            ON CONFLICT (name) DO NOTHING
            """)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS fuel (
                id       text     PRIMARY KEY CHECK (char_length(id) >= 2),
                fossil   boolean  NOT NULL,
                category text     NOT NULL    REFERENCES fuel_category,
                label_en text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de text     NULL        CHECK (char_length(label_de) > 0),
                order_by smallint NOT NULL    CHECK (order_by >= 0))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/fuels.csv', cursor=cursor, table='fuel',
                             primary_key='id', columns='(id, label_en, label_de, fossil, category, order_by)',
                             convert_to_bool={'fossil'}, convert_to_int={'order_by'})
        connection.commit()
        
        show_full_table('fuel', 'id')

### EXTRA DIMENSION: Product

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS product (
                id       text     PRIMARY KEY CHECK (char_length(id) >= 2),
                label_en text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de text     NULL        CHECK (char_length(label_de) > 0),
                order_by smallint NOT NULL    CHECK (order_by >= 0))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/products.csv', cursor=cursor, table='product',
                             primary_key='id', columns='(id, label_en, label_de, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        show_full_table('product', 'id')

### EXTRA DIMENSION: Species

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS species (
                id       text     PRIMARY KEY CHECK (char_length(id) >= 2),
                label_en text     NOT NULL    CHECK (char_length(label_en) > 0),
                label_de text     NULL        CHECK (char_length(label_de) > 0),
                order_by smallint NOT NULL    CHECK (order_by >= 0))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/species.csv', cursor=cursor, table='species',
                             primary_key='id', columns='(id, label_en, label_de, order_by)',
                             convert_to_int={'order_by'})
        connection.commit()
        
        show_full_table('species', 'id')

### AUXILIARY: CRF variable

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS crf_variable (
                id         serial  PRIMARY KEY,
                variable   text    NOT NULL,
                submission text    NOT NULL REFERENCES submission,
                category   text    NOT NULL REFERENCES activity,
                export     boolean NOT NULL,
                unit       text    NULL,
                text       text    NULL,
                name       text    NOT NULL,
                commodity  text    NOT NULL,
                source     text    NOT NULL,
                target     text    NOT NULL,
                option     text    NOT NULL,
                method     text    NOT NULL,
                activity   text    NOT NULL,
                attribute  text    NOT NULL,
                gas        text    NOT NULL,
                UNIQUE (variable, submission))
            """)
        connection.commit()

        import_sample_record(file_name='sample_data/crf_variables.csv', cursor=cursor, table='crf_variable',
                             primary_key='id', columns='(id, submission, category, variable, unit, export, text, name, commodity, source, target, option, method, activity, attribute, gas)',
                             convert_to_bool={'export'}, convert_to_int={'id'})
        connection.commit()
        
        show_full_table('crf_variable', 'id')

### CORE: Sectoral approach time series and time series values

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        # TODO Add check to prevent region_code <-> submission mismatch!
        # TODO Add column for method: Tier 1, Tier 2, Model etc
        # TODO Add column for EF type: Default, CS, Guidebook etc.
        # cursor.execute("DROP TABLE IF EXISTS sectoral_approach CASCADE")
        # cursor.execute("DROP TABLE IF EXISTS sectoral_approach_activities CASCADE")
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS sectoral_approach (
                id                 serial  PRIMARY KEY,
                region_code        text    NOT NULL REFERENCES region,  
                submission_id      text    NOT NULL REFERENCES submission,
                value_type_short   text    NOT NULL REFERENCES value_type,
                pollutant_chemical text    NULL     REFERENCES pollutant,
                fuel_id            text    NULL     REFERENCES fuel,
                product_id         text    NULL     REFERENCES product,
                species_id         text    NULL     REFERENCES species,
                scenario           text    NOT NULL DEFAULT 'REF',
                crf_variable_id    integer NULL     REFERENCES crf_variable,
                confidential       boolean NOT NULL,
                unit               text    NOT NULL)
            """)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS sectoral_approach_activities (
                id             serial   PRIMARY KEY,
                time_series_id integer  NOT NULL REFERENCES sectoral_approach ON DELETE CASCADE,
                activity_code  text     NOT NULL REFERENCES activity,
                level          smallint NOT NULL CHECK (level >= 0),
                leaf           boolean  NOT NULL,
                UNIQUE (time_series_id, activity_code))
            """)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        # cursor.execute("DROP TABLE IF EXISTS sectoral_approach_value CASCADE")
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS notation_key (
                key   text PRIMARY KEY,
                label text NOT NULL)
            """)
        cursor.execute("""
            INSERT INTO notation_key VALUES
                ('NE', 'Not Estimated'),
                ('NA', 'Not Applicable'),
                ('NO', 'Not Occuring'),
                ('IE', 'Included Elsewhere'),
                ('C', 'Confidential')
            ON CONFLICT (key) DO NOTHING
            """)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS sectoral_approach_value (
                time_series_id integer  NOT NULL REFERENCES sectoral_approach ON DELETE CASCADE,
                year           smallint NOT NULL CHECK (year >= 0),
                value          numeric  NOT NULL DEFAULT 0.0,
                notation_key   text     NULL     REFERENCES notation_key,
                UNIQUE (time_series_id, year))
            """)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        cursor.execute('DELETE FROM sectoral_approach')

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/germany.csv', cursor, print_log=True)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/france.csv', cursor, print_log=True)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/belgium.csv', cursor, print_log=True)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/misc.csv', cursor, print_log=True)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/1A3bii_iii_diesel.csv', cursor, print_log=True)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/1A3c_rail.csv', cursor, print_log=True)

In [None]:
with psycopg.connect(POSTGRES_CONNECTION_STRING) as connection:
    with connection.cursor() as cursor:
        import_sample_time_series('sample_data/time_series/1A3e_natural_gas_compressors.csv', cursor, print_log=True)