In [131]:
import psycopg2
import psycopg2.extras as extras
import config_warehouse as creds
import config_lake as creds_lake
import pandas as pd
import numpy as np
from psycopg2.extensions import register_adapter, AsIs

psycopg2.extensions.register_adapter(np.int64, psycopg2._psycopg.AsIs) #allow int64 for db import

In [22]:
# Show Tables

def Show_tables():
    try:
        # DB connection
        conn_string = "host="+ creds_lake.PGHOST +" port="+ "5432" +" dbname="+ creds_lake.PGDATABASE +" user=" + creds_lake.PGUSER +" password="+ creds_lake.PGPASSWORD
        conn = psycopg2.connect(conn_string)
        cur = conn.cursor()
        # Select data
        cur.execute("""SELECT table_name FROM information_schema.tables
        WHERE table_schema = 'public'""")
        records = cur.fetchall()
        #print(records)
        #print(type(records[0][0]))
        for item in records:
            print(item)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

Show_tables()

[('measuringpointscoordinates',), ('ZIPCode',), ('RealTimeStatus',), ('electriccars21',), ('electricityproductionplants',), ('measuringpoints',), ('chargingstations_static',), ('chargingstations_locations',)]
<class 'str'>
('measuringpointscoordinates',)
('ZIPCode',)
('RealTimeStatus',)
('electriccars21',)
('electricityproductionplants',)
('measuringpoints',)
('chargingstations_static',)
('chargingstations_locations',)


In [168]:
def create_tables():
    """ create tables in the PostgreSQL database"""
    commands = (
        """
        CREATE TABLE IF NOT EXISTS Electric_cars_21 (
            ID SERIAL PRIMARY KEY,
            Vehicle_group VARCHAR(255) NOT NULL,
            Year_of_first_registration VARCHAR(255) NOT NULL,
            Fuel VARCHAR(255) NOT NULL,
            Count_2021 Int NOT NULL,
            Canton_abbreviation VARCHAR(255) NOT NULL,
            FOREIGN KEY (Canton_abbreviation) REFERENCES regions_and_cantons(canton_abbreviation))
        """,
        """
        CREATE TABLE IF NOT EXISTS Measuring_points (
            Nr_measuring_point Int PRIMARY KEY,
            Point_name VARCHAR(255) NOT NULL,
            Status VARCHAR(255) NOT NULL,
            Type_point VARCHAR(255) NOT NULL,
            Road VARCHAR(255) NOT NULL,
            Canton_abbreviation CHAR(2) NOT NULL,
            FOREIGN KEY (Canton_abbreviation) REFERENCES regions_and_cantons(canton_abbreviation))
        """,
        """
        CREATE TABLE IF NOT EXISTS Traffic_measurement_21 (
            Nr Int PRIMARY KEY,
            Nr_measuring_point Int NOT NULL,
            Annual_average Int,
            FOREIGN KEY (Nr_measuring_point) REFERENCES Measuring_points(Nr_measuring_point))
        """,
        """
        CREATE TABLE IF NOT EXISTS Measuring_points_coordinates (
            Nr Serial PRIMARY KEY,
            Nr_measuring_point Int NOT NULL,
            Coordinate_east Int NOT NULL,
            Coordinate_nord Int NOT NULL,
            FOREIGN KEY (Nr_measuring_point) REFERENCES Measuring_points(Nr_measuring_point))
        """,
        """
        CREATE TABLE IF NOT EXISTS Main_categories_electricity_production (
            ID_main_category Int PRIMARY KEY,
            Name_main_category VARCHAR(255) NOT NULL)
        """,
        """
        CREATE TABLE IF NOT EXISTS Sub_categories_electricity_production (
            ID_sub_category Int PRIMARY KEY,
            Name_sub_category VARCHAR(255) NOT NULL,
            ID_main_category Int NOT NULL,
            FOREIGN KEY (ID_main_category)
            REFERENCES Main_categories_electricity_production(ID_main_category))
        """,
        """
        CREATE TABLE IF NOT EXISTS Electricity_production_plants (
            xtf_id Int PRIMARY KEY,
            ID_postal_code Int NOT NULL,
            Municipality VARCHAR(255) NOT NULL,
            Canton_abbreviation CHAR(2) NOT NULL,
            ID_sub_category INT NOT NULL,
            Total_power Int NOT NULL,
            FOREIGN KEY (ID_postal_code) REFERENCES Postal_codes(ID_postal_code),
            FOREIGN KEY (Canton_abbreviation) REFERENCES regions_and_cantons(canton_abbreviation),
            FOREIGN KEY (ID_sub_category)
            REFERENCES Sub_categories_electricity_production(ID_sub_category))
        """,
        """
        CREATE TABLE IF NOT EXISTS Operators (
            Operator_ID VARCHAR(255) PRIMARY KEY,
            Name_operator VARCHAR(255) NOT NULL)
        """,
        """
        CREATE TABLE IF NOT EXISTS Charging_stations_occupancy (
            Evse_ID VARCHAR(255) PRIMARY KEY,
            Daily_occupancy INT NOT NULL,
            Daily_kwH INT NOT NULL,
            Daily_cars INT NOT NULL,
            Day_id INT NOT NULL)
        """,
        """
        CREATE TABLE IF NOT EXISTS Charging_stations_static (
            Charging_station_ID VARCHAR(255) PRIMARY KEY,
            Operator_ID VARCHAR(255) NOT NULL,
            Evse_ID VARCHAR(255) NOT NULL,
            City VARCHAR(255) NOT NULL,
            Street VARCHAR(255) NOT NULL,
            Power INT NOT NULL,
            ID_postal_code Int NOT NULL,
            Canton_abbreviation CHAR(2) NOT NULL,
            FOREIGN KEY (Operator_ID) REFERENCES Operators(Operator_ID),
            FOREIGN KEY (Evse_ID) REFERENCES Charging_stations_occupancy(Evse_ID),
            FOREIGN KEY (ID_postal_code) REFERENCES Postal_codes(ID_postal_code),
            FOREIGN KEY (Canton_abbreviation) REFERENCES regions_and_cantons(canton_abbreviation))
        """,
        """
        CREATE TABLE IF NOT EXISTS Chargin_stations_location (
            Location_ID Int PRIMARY KEY,
            Charging_station_ID VARCHAR(255) NOT NULL,
            Coordinate_east Int NOT NULL,
            Coordinate_nord Int NOT NULL,
            FOREIGN KEY (Charging_station_ID) REFERENCES Charging_stations_static(Charging_station_ID))
        """
    )

    #  PRIMARY KEY (Postal_code)

    try:
        # Set up a connection to the postgres server.
        conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.PGUSER \
                  +" password="+ creds.PGPASSWORD

        conn = psycopg2.connect(conn_string)
        print("Connected!")

        # Create a cursor object
        cursor = conn.cursor()

        # Create tables
        for command in commands:
            cursor.execute(command)
        print('Tables created')

        # Import data from dataframe
        #execute_values(conn, fetch_data_plz(), 'Postal_codes')

        # close communication with the PostgreSQL database server
        cursor.close()
        # commit the changes
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

create_tables()

Connected!
Tables created


In [163]:
# Inserting dataframe into database
def execute_values(conn, df, table):
    # Set up a connection to the warehouse
    conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.PGUSER \
              +" password="+ creds.PGPASSWORD

    conn = psycopg2.connect(conn_string)
    print("Connected!")

    # Create a cursor object
    cursor = conn.cursor()

    # Set up data
    tuples = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))
    # SQL query to execute
    query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cur = conn.cursor()
    try:
        extras.execute_values(cur, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cur.close()
        return 1
    print("the dataframe is inserted")
    cur.close()

In [169]:
# Fetch all table's data

tables_lake = ['Measuring_points_coordinates', 'postal_codes', 'charging_stations_occupancy',
          'electric_cars_21', 'electricity_production_plants', 'measuring_points', 'charging_stations_static', 'charging_stations_locations'] #tables lake with warehouse name's!

def fetch_all_data():
    try:
        # DB connection
        conn_string = "host="+ creds_lake.PGHOST +" port="+ "5432" +" dbname="+ creds_lake.PGDATABASE +" user=" + creds_lake.PGUSER +" password="+ creds_lake.PGPASSWORD
        conn = psycopg2.connect(conn_string)
        cur = conn.cursor()

        # Select table names
        cur.execute("""SELECT table_name FROM information_schema.tables
        WHERE table_schema = 'public'""")
        records = cur.fetchall()
        records_list = []
        for item in records:
            item = str(item)
            records_list.append(item[2:-3])
        print(records_list)

        # Select data
        for table_name in range(0, len(records_list)):
            if records_list[table_name] == 'ZIPCode' or records_list[table_name] == 'RealTimeStatus' or \
                records_list[table_name] == 'measuringpoints':
                continue
            else:
                cur.execute((""" SELECT * FROM {}""").format(records_list[table_name])) # passing string without quotation marks
                df_records = cur.fetchall()
                df = pd.DataFrame(df_records)
                #print(df)

                if records_list[table_name] == 'measuringpointscoordinates':
                    df_selected = df.copy()
                    #Name columns
                    df_selected.columns = ['Nr_measuring_point','Status_BGDI','Point_name',
                                           'Canton_abbreviation','Road', 'Coordinate_East', 'Coordinate_Nord', 'Status', 'Type_Point', 'Number_of_Lanes']
                    #Filter needed rows
                    df_selected = df_selected[df_selected['Status'] == 'in Betrieb']
                    df_selected = df_selected[df_selected['Type_Point'] != 'nur Online']

                    #Dataframe Measuring_points_coordinates set up
                    df_coordinates = df_selected[['Nr_measuring_point', 'Coordinate_East',
                                                  'Coordinate_Nord',]]

                    #Dataframe Measuring_points; Drop not needed columns
                    df_selected.drop(['Status_BGDI','Coordinate_East','Coordinate_Nord','Number_of_Lanes'], axis=1, inplace=True)

                    # import to warehouse
                    execute_values(conn, df_selected, 'Measuring_points')
                    execute_values(conn, df_coordinates, 'Measuring_points_coordinates')
                    #print(df_selected)

                elif records_list[table_name] == 'realtimestatus':
                    continue

                elif records_list[table_name] == 'electriccars21':
                    #Name columns
                    df.columns = ['id','Canton_abbreviation','Vehicle_group',
                                           'Year_of_first_registration','Fuel', 'Count_2021']
                    #Filter needed rows
                    df_selected = df[(df['Canton_abbreviation'] != 'Switzerland') &
                                     (df['Canton_abbreviation'] != 'Confederation')]

                    #Map Canton names with Canton Abbreviations
                    dict_cantons = {'Zürich' : 'ZH', 'Bern' : 'BE', 'Luzern' : 'LU', 'Uri' : 'UR',
                                    'Schwyz' : 'SZ', 'Obwalden' : 'OW', 'Nidwalden' : 'NW',
                                    'Glarus' : 'GL', 'Zug' : 'ZG', 'Fribourg' : 'FR',
                                    'Solothurn' : 'SO', 'Basel-Stadt' : 'BS', 'Basel-Landschaft' : 'BL',
                                    'Schaffhausen' : 'SH', 'Appenzell-Ausserrhoden' : 'AR',
                                    'Appenzell-Innerrhoden' : 'AI', 'Sankt Gallen' : 'SG',
                                    'Graubünden' : 'GR', 'Aargau' : 'AG', 'Thurgau' : 'TG',
                                    'Ticino' : 'TI', 'Vaud' : 'VD', 'Valais' : 'VS', 'Neuchâtel' : 'NE',
                                    'Genève' : 'GE', 'Jura' : 'JU'}

                    df_selected['Canton_abbreviation'] = df_selected['Canton_abbreviation'].map(dict_cantons)
                    df_selected['Canton_abbreviation'] = df_selected['Canton_abbreviation'].astype(str)
                    print(df_selected['Canton_abbreviation'])
                    print(df_selected.info())

                    # import to warehouse
                    execute_values(conn, df_selected, 'Electric_cars_21')

                elif records_list[table_name] == 'electricityproductionplants':
                    continue

                elif records_list[table_name] == 'measuringpoints':
                    #Keep needed columns by index
                    df_selected = df.iloc[:,[0,1,18]]

                    #Name columns
                    df_selected.columns = ['Nr', 'Nr_measuring_point','Annual_average']

                    # import to warehouse
                    execute_values(conn, df_selected, 'Traffic_measurement_21')

                elif records_list[table_name] == 'chargingstations_static':
                    continue

                elif records_list[table_name] == 'chargingstations_locations':
                    continue

                else:
                    continue

    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

    finally:
        if conn is not None:
            conn.close()

fetch_all_data()

['measuringpointscoordinates', 'ZIPCode', 'RealTimeStatus', 'electriccars21', 'electricityproductionplants', 'measuringpoints', 'chargingstations_static', 'chargingstations_locations']
Connected!
the dataframe is inserted
Connected!
the dataframe is inserted


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_selected['Canton_abbreviation'] = df_selected['Canton_abbreviation'].map(dict_cantons)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_selected['Canton_abbreviation'] = df_selected['Canton_abbreviation'].astype(str)


30     ZH
31     ZH
32     ZH
33     ZH
34     ZH
       ..
805    JU
806    JU
807    JU
808    JU
809    JU
Name: Canton_abbreviation, Length: 780, dtype: object
<class 'pandas.core.frame.DataFrame'>
Int64Index: 780 entries, 30 to 809
Data columns (total 6 columns):
 #   Column                      Non-Null Count  Dtype 
---  ------                      --------------  ----- 
 0   id                          780 non-null    int64 
 1   Canton_abbreviation         780 non-null    object
 2   Vehicle_group               780 non-null    object
 3   Year_of_first_registration  780 non-null    object
 4   Fuel                        780 non-null    object
 5   Count_2021                  780 non-null    int64 
dtypes: int64(2), object(4)
memory usage: 42.7+ KB
None
Connected!
the dataframe is inserted
