In [18]:
# imports
import pandas as pd
import openpyxl
import psycopg2
import os
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
from pathlib import Path

# pfade
BASE_DIR = Path().resolve()
SQL = BASE_DIR / 'sql'
ENV = SQL / 'env'
path_to_sql_env = SQL / 'env' /'.env'
path_to_xlsx = BASE_DIR / '03_nicht_normalisierte_Daten.xlsx'

In [21]:
# verbindung zum postgres server aufbauen
# example.env kann benutzt werden, um .env zu erstellen
load_dotenv(path_to_sql_env, override = True)
user         = os.getenv('DB_USER')
password     = os.getenv('DB_PASSWORD')
host         = os.getenv('DB_HOST')
port         = os.getenv('DB_PORT')
dbname       = os.getenv('DB_NAME')

# mit sqlalchemy
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}')

# mit psycopg2:
connection = psycopg2.connect(host     = host,
                              port     = port,
                              database = dbname,
                              user     = user,
                              password = password 
                             )

In [22]:
connection.close()

In [23]:
# datenbank "normal_shop" erstellen
create_database = text('''
                       CREATE DATABASE normal_shop;
                       ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_database)

In [24]:
# verbindung zu normal_shop aufbauen:
load_dotenv(SQL / '.env', override = True)
user         = os.getenv('DB_USER')
password     = os.getenv('DB_PASSWORD')
host         = os.getenv('DB_HOST')
port         = os.getenv('DB_PORT')
dbname = 'normal_shop'

engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}')

In [25]:
# schemas erstellen
create_schema = text('''  
                     CREATE SCHEMA bruno;
                     CREATE SCHEMA theo;
                     CREATE SCHEMA michael;
                     CREATE SCHEMA sunny;
                     CREATE SCHEMA patrick;
                     CREATE SCHEMA raw_tables;
                     CREATE SCHEMA norm_tables;
                     CREATE SCHEMA core;
                     ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_schema)

In [27]:
# nutzer erstellen
user_names = ['bruno', 'sunny', 'michael', 'theo']

for usr in user_names: 
    load_dotenv(ENV / f'{usr}.env', override = True)
    user      = os.getenv('DB_USER')
    password  = os.getenv('DB_PASSWORD')
    user_dict = {'password': password}

    create_user =  text(f'''
                        CREATE USER {user} WITH PASSWORD :password;
                        ''')
    
    with engine.connect() as connection:
        connection = connection.execution_options(isolation_level="AUTOCOMMIT")
        connection.execute(create_user, user_dict)

In [None]:
# excel datei lesen und in postgres übertragen, ausgangstabelle ~ agt

# excel datei einlesen
agt_df = pd.read_excel(path_to_xlsx,
                       sheet_name = 0, 
                       engine     = 'openpyxl',
                       dtype = {'KundeKreditkarte' : str,
                                'ProduktPreis' : float}
)

# Probleme bei der dd.mm.yyyy erkennung
agt_df['Bestelldatum'] = pd.to_datetime(agt_df['Bestelldatum'], dayfirst=True)

# addresses aufteilen
# danke regex101: r"^(.+?)\s*,\s*(\d{5})\s+(.+)$"gm
pattern = r'^(.*?),\s*(\d{5})\s+(.*)$'
adress_columns = agt_df['KundeAdresse'].str.extract(pattern)
adress_columns.columns = ['Straße', 'Postleitzahl', 'Stadt']

address_index = agt_df.columns.get_loc('KundeAdresse')

# namen aufteilen
# danke regex101: r"^(.*?)\s+(.*)$"
pattern = r"^(.*?)\s+(.*)$"
name_columns = agt_df['KundeName'].str.extract(pattern)
name_columns.columns = ['Vorname', 'Nachname']

name_index = agt_df.columns.get_loc('KundeName')

agt_extras_df = pd.concat([agt_df.iloc[:, :name_index + 1],
                           name_columns,
                           agt_df.iloc[:, address_index],
                           adress_columns,
                           agt_df.iloc[:, address_index + 1:]], 
                           axis = 1)

# daten in die datenbank schreiben, schema 'raw_tables'
agt_extras_df.to_sql('ausgangstabelle', 
                     engine,
                     schema    = 'raw_tables',
                     if_exists = 'replace',
                     index     = False)

100

In [29]:
# rechte verteilen für nutzer
# dabei hat jeder nutzer sein eigenes schema, in dem nur er arbeiten (editieren) kann
# andere nutzer können aber das schema lesen
create_rights = text('''
                     -- Owner:
                     ALTER SCHEMA bruno OWNER TO bruno;
                     ALTER SCHEMA michael OWNER TO michael;
                     ALTER SCHEMA sunny OWNER TO sunny;
                     ALTER SCHEMA theo OWNER TO theo;
                     -- Schema USAGE:
                     GRANT USAGE ON SCHEMA bruno TO michael, sunny, theo;
                     GRANT USAGE ON SCHEMA michael TO bruno, michael, sunny, theo;
                     GRANT USAGE ON SCHEMA sunny TO bruno, michael, sunny, theo;
                     GRANT USAGE ON SCHEMA theo TO bruno, michael, sunny, theo;
                     GRANT USAGE ON SCHEMA raw_tables TO bruno, michael, sunny, theo;
                     GRANT USAGE ON SCHEMA norm_tables TO bruno, michael, sunny, theo;
                     GRANT USAGE ON SCHEMA core TO bruno, michael, sunny, theo;
                     -- SELECT für jetzige und zukünftige:
                     GRANT SELECT ON ALL TABLES IN SCHEMA raw_tables TO bruno, michael, sunny, theo;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA bruno GRANT SELECT ON TABLES TO michael, sunny, theo;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA michael GRANT SELECT ON TABLES TO bruno, sunny, theo;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA sunny GRANT SELECT ON TABLES TO bruno, michael, theo;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA theo GRANT SELECT ON TABLES TO bruno, michael, sunny;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA raw_tables GRANT SELECT ON TABLES TO bruno, michael, sunny, theo;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA norm_tables GRANT SELECT ON TABLES TO bruno, michael, sunny, theo;
                     ALTER DEFAULT PRIVILEGES IN SCHEMA core GRANT SELECT ON TABLES TO bruno, michael, sunny, theo;
                     ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_rights)

In [None]:
# # beispiel für rechte für präsentation
# create_rights = text('''
#                      ALTER SCHEMA bruno OWNER TO bruno;
#                      GRANT SELECT ON ALL TABLES IN SCHEMA raw_tables TO bruno, michael, sunny, theo;
#                      ALTER DEFAULT PRIVILEGES IN SCHEMA bruno GRANT SELECT ON TABLES TO michael, sunny, theo;
#                      ''')

In [30]:
# normalisierung durchführen als sql script
sql_script_path = SQL / "normal.sql"

with open(sql_script_path, 'r', encoding = 'utf-8') as file:
    sql_script = file.read()

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(text(sql_script))

In [32]:
# testabfrage
query = '''SELECT * FROM core.orders 
           JOIN core.customers 
           USING (customer_id) 
           JOIN core.products 
           USING (product_id)
           JOIN core.categories
           USING (category_id)
           JOIN core.addresses
           USING (address_id)
        '''
with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    orders_df = pd.read_sql(query, connection)
    
print(orders_df)

    address_id  category_id  product_id  customer_id  order_id  quantity  \
0            5           19          32            8         1         8   
1           11           18          33            1         2         3   
2           24            4          27            2         3         5   
3           16            7          28            6         4         4   
4            2            3           9           17         5         1   
..         ...          ...         ...          ...       ...       ...   
95          21           19          37           13        97         5   
96          15            4          20            3        96         4   
97          24            4          20            2        98         9   
98          20            5           3           11        99         6   
99          25           16          23           18       100         1   

   order_date last_name first_name               ccn  \
0  2020-10-21   Firmage     Lin

In [33]:
# protokoll-trigger erstellen
# logs table erstellen:
create_log_table = text('''
                        CREATE TABLE core.logs (
                        id SERIAL PRIMARY KEY,
                        user_name TEXT NOT NULL,
                        table_name TEXT NOT NULL,
                        type_of_usage TEXT NOT NULL,
                        old_data JSONB,
                        new_data JSONB,
                        used_at TIMESTAMPTZ NOT NULL DEFAULT now()
                        );
                        '''
                       )

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_log_table)

In [34]:
# trigger funktion
create_function_for_trigger = text('''
CREATE OR REPLACE
FUNCTION core.log_usage_with_data() RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO core.logs
    (
        user_name,
	    table_name,
	    type_of_usage,
	    old_data,
	    new_data
    )
    VALUES 
    (
        current_user,
        TG_TABLE_NAME,
        TG_OP,
    CASE
	WHEN TG_OP IN ('UPDATE', 'DELETE') THEN to_jsonb(OLD)
	ELSE NULL
    END,
    CASE
	WHEN TG_OP IN ('UPDATE', 'INSERT') THEN to_jsonb(NEW)
	ELSE NULL
    END
    );

RETURN NEW;
END;

$$ LANGUAGE plpgsql;
''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_function_for_trigger)

In [35]:
# trigger erstellen für alle  5 tabellen:
create_Trigger_products = text('''
CREATE TRIGGER logs_on_products
AFTER INSERT OR UPDATE OR DELETE ON core.products
FOR EACH ROW
EXECUTE FUNCTION core.log_usage_with_data();
                               
CREATE TRIGGER logs_on_categories
AFTER INSERT OR UPDATE OR DELETE ON core.categories
FOR EACH ROW
EXECUTE FUNCTION core.log_usage_with_data();

CREATE TRIGGER logs_on_orders
AFTER INSERT OR UPDATE OR DELETE ON core.orders
FOR EACH ROW
EXECUTE FUNCTION core.log_usage_with_data(); 

CREATE TRIGGER logs_on_customers
AFTER INSERT OR UPDATE OR DELETE ON core.customers
FOR EACH ROW
EXECUTE FUNCTION core.log_usage_with_data();

CREATE TRIGGER logs_on_addresses
AFTER INSERT OR UPDATE OR DELETE ON core.addresses
FOR EACH ROW
EXECUTE FUNCTION core.log_usage_with_data();                                                            
''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_Trigger_products)

In [None]:
# # API / Request / Token:
# import requests

# ip_address = os.getenv('IP_ADDRESS')
# xc_token   = os.getenv('XC_TOKEN')

# url = 'http://xxx.xxx.xxx.xxx:8080/api/v2/tables/m0nnh0qjbap71ez/records?offset=0&limit=25&where=&viewId=vwxxug86nmrpesad'
# headers = {
#     "accept": "application/json",
#     "xc-token": "secret"
# }

# response = requests.get(url, headers=headers)

# data = response.json()
# rows = data.get('list', [])
# df = pd.DataFrame(rows)
# df

Unnamed: 0,order_id,customer_id,quantity,order_date,customers,products
0,1,8,8,2020-10-21 00:00:00+00:00,"{'customer_id': 8, 'last_name': 'Firmage'}","{'product_id': 32, 'product_name': 'Rolex Subm..."
1,2,1,3,2021-01-22 00:00:00+00:00,"{'customer_id': 1, 'last_name': 'Aaronson'}","{'product_id': 33, 'product_name': 'Royal Cani..."
2,3,2,5,2021-02-10 00:00:00+00:00,"{'customer_id': 2, 'last_name': 'Delea'}","{'product_id': 27, 'product_name': 'Moleskine ..."
3,4,6,4,2021-02-11 00:00:00+00:00,"{'customer_id': 6, 'last_name': 'Eles'}","{'product_id': 28, 'product_name': 'Netflix Pr..."
4,5,17,1,2021-03-11 00:00:00+00:00,"{'customer_id': 17, 'last_name': 'Mattedi'}","{'product_id': 9, 'product_name': 'Der Herr de..."
5,6,10,9,2021-03-17 00:00:00+00:00,"{'customer_id': 10, 'last_name': 'Holby'}","{'product_id': 7, 'product_name': 'Coleman Cam..."
6,7,18,8,2021-03-21 00:00:00+00:00,"{'customer_id': 18, 'last_name': 'Mawne'}","{'product_id': 11, 'product_name': 'Eames Loun..."
7,8,11,7,2021-03-28 00:00:00+00:00,"{'customer_id': 11, 'last_name': 'Jackalin'}","{'product_id': 28, 'product_name': 'Netflix Pr..."
8,9,1,3,2021-04-21 00:00:00+00:00,"{'customer_id': 1, 'last_name': 'Aaronson'}","{'product_id': 31, 'product_name': 'Philips So..."
9,10,20,4,2021-04-24 00:00:00+00:00,"{'customer_id': 20, 'last_name': 'McClaren'}","{'product_id': 29, 'product_name': 'Pampers Sw..."


In [None]:
# # verbindung zu normal_shop aufbauen um datenbanken und nutzer für metabase und superset einzurichten:
# load_dotenv(SQL / '.env', override = True)
# user         = os.getenv('DB_USER')
# password     = os.getenv('DB_PASSWORD')
# host         = os.getenv('DB_HOST')
# port         = os.getenv('DB_PORT')
# dbname = 'normal_shop'

# engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}')

In [None]:
# # metabase einrichten:
# create_metabase_db = text('''
# CREATE DATABASE metabase;''')

# create_metabase_user = text('''
# CREATE USER metabase_user WITH ENCRYPTED PASSWORD 'metabase_123';
# GRANT ALL PRIVILEGES ON DATABASE metabase TO metabase_user;
#                           ''')

# create_rights_to_metabse_user = text('''
# GRANT ALL ON SCHEMA public TO metabase_user;
# ALTER SCHEMA public OWNER TO metabase_user;
#                                      ''')

# with engine.connect() as connection:
#     connection = connection.execution_options(isolation_level="AUTOCOMMIT")
#     connection.execute(create_metabase_db)

# with engine.connect() as connection:
#     connection = connection.execution_options(isolation_level="AUTOCOMMIT")
#     connection.execute(create_metabase_user)

# with engine.connect() as connection:
#     connection = connection.execution_options(isolation_level="AUTOCOMMIT")
#     connection.execute(create_rights_to_metabse_user)  

In [None]:
# # superset einrichten:
# dbname = 'superset'
# engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}')

# create_superset_user = text('''
# CREATE USER superset_user WITH ENCRYPTED PASSWORD 'superset_123';
# GRANT ALL PRIVILEGES ON DATABASE superset TO superset_user;
#                           ''')

# create_rights_to_superset = text('''
# GRANT ALL ON SCHEMA public TO superset_user;
# ALTER SCHEMA public OWNER TO superset_user;
#                                      ''')

# with engine.connect() as connection:
#     connection = connection.execution_options(isolation_level="AUTOCOMMIT")
#     connection.execute(create_superset_user)

# with engine.connect() as connection:
#     connection = connection.execution_options(isolation_level="AUTOCOMMIT")
#     connection.execute(create_rights_to_superset)  

In [36]:
# views erstellen
create_views = text('''
                    create view core.agent_customers_view as
                    select
                        customer_id,
                        last_name,
                        first_name,
                        address_id,
                        CONCAT (repeat ('X', CHAR_LENGTH (ccn) -3),RIGHT(ccn, 3)) as masked_ccn
                    from
                        core.customers;

                    create view core.intern_customers_view as
                    select
                        customer_id,
                        concat (left (last_name, 1),repeat ('x', CHAR_LENGTH (last_name) -1)) as masked_last_name,
                        concat (left (first_name, 1),repeat ('x', CHAR_LENGTH (first_name) -1)) as masked_first_name,
                        address_id,
                        repeat ('X', CHAR_LENGTH (ccn)) as masked_ccn
                    from
                        core.customers;
                    ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_views)

In [38]:
# praktikant tobi und mitarbeiter günther erstellen
user_names = ['tobi', 'günther']

for usr in user_names: 
    load_dotenv(ENV / f'{usr}.env', override = True)
    user      = os.getenv('DB_USER')
    password  = os.getenv('DB_PASSWORD')
    user_dict = {'password': password}

    create_user =  text(f'''
                        CREATE USER {user} WITH PASSWORD :password;
                        ''')
    
    with engine.connect() as connection:
        connection = connection.execution_options(isolation_level="AUTOCOMMIT")
        connection.execute(create_user, user_dict)

In [39]:
# rollen intern, call_center_agent, data_analyst erstellen
create_roles = text('''
                    CREATE ROLE intern;
                    CREATE ROLE call_center_agent;
                    CREATE ROLE data_analyst;
                    ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_roles)

In [40]:
# rechte verteilung für rollen,
# intern hat nur begrenzten zugriff
# call_center_agent kann etwas mehr sehen
# data_analyst hat vollen zugriff
create_rights = text('''
                     -- Schema USAGE:
                     GRANT USAGE ON SCHEMA core TO intern, call_center_agent, data_analyst;
                     -- intern:
                     GRANT SELECT ON
                        core.categories, 
                        core.products, 
                        core.orders, 
                        core.intern_customers_view 
                     TO 
                        intern;
                     -- call_center_agent:
                     GRANT SELECT ON
                        core.addresses,
                        core.categories, 
                        core.products, 
                        core.orders, 
                        core.agent_customers_view
                     TO 
                        call_center_agent;
                     -- data_analyst:
                     GRANT SELECT ON
                        core.addresses,
                        core.categories, 
                        core.products, 
                        core.orders, 
                        core.customers
                     TO 
                        data_analyst;
                     ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(create_rights)

In [41]:
# rollen zuweisen:
assign_roles_to_user = text('''
                            GRANT intern TO tobi;
                            GRANT call_center_agent TO günther;
                            GRANT data_analyst TO theo;
                            ''')

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(assign_roles_to_user)

Im Anschluss wurden die Verbindung zur Datenbank und die verschiedenen Rollen von Projektmitgliedern getestet.<br>
Mit Python oder DBeaver konnte man die Nutzer, Trigger, Schemas und Tabellen und deren Rechte einsehen.<br>
Ebenso wurden neue Daten in die Datenbank geschrieben, andere editiert und ein paar gelöscht, um die Trigger zu testen.<br>
Metabase und Superset wurden nur kurz eingebunden und als Ausblick in der Projekt-Präsentation genutzt.