In [1]:
import redshift_connector
from faker import Faker
import random
from collections import namedtuple
import dotenv
import os
dotenv.load_dotenv()


conn = redshift_connector.connect(
    host=os.environ.get('HOST'),
    port=int(os.environ.get('PORT')),
    database=os.environ.get('DATABASE'),
    user=os.environ.get('USER'),
    password=os.environ.get('PASSWORD')
)
conn.autocommit = True
c = conn.cursor()

fake = Faker()

In [15]:
conn.rollback()

In [2]:
def df(sql):
    return c.execute(sql).fetch_dataframe()

In [3]:
schemas = [
    'schema_parts',
    'schema_iron',
    'schema_sales',
    'schema_research',
    'schema_consumer',
    'schema_production',
]
groups = [
    'group_marketing',
    'group_sales',
    'group_hr',
    'group_development',
    'group_engineering',
]
tables = [
    'table_a',
    'table_b',
    'table_c',
    'table_d',
    'table_e',
    'table_f',
    'table_h',
    'table_i',
    'table_j',
]

schema_group_permissions = {
    'schema_parts': ['group_development', 'group_engineering'],
    'schema_iron': ['group_engineering'],
    'schema_sales': ['group_sales', 'group_marketing'],
    'schema_research': ['group_development', 'group_engineering'],
    'schema_consumer': ['group_marketing', 'group_sales', 'group_hr', 'group_development'],
    'schema_production': ['group_engineering'],
}


In [4]:
test_schemas = [
    'marketing',
    'sales',
    'engineering',
    'manufacturing',
    'electrical',
    'hr',
    'batteries',
]

test_tables = [
    'table_a',
    'table_b',
    'table_c',
    'table_d',
    'table_e',
    'table_gg',
    'table_h',
    'table_i',
    'table_j',
]

test_tables = [f'{s}.{t}' for t in test_tables for s in test_schemas]


In [33]:
c.execute('create schema admin;')
conn.commit()

In [None]:
"""
/**********************************************************************************************
Purpose: 		Find all objects owned by the user to be dropped
Columns -
objtype:		Type of object owned by the user. Object types are Function,Schema,Table or View
objowner:		Object owner
userid:			Object owner user id
schemaname:		Schema for the user object
objname:		Name of the object
Notes:			Create prepared statement. Run it i.e execute find_drop_userobjs('<username>'). This
				will return all the objects owned by the user. You can then alter the object owner
				or drop the object
History:
2017-03-24 adedotua created
2017-03-27 adedotua updated prepared statement name
2017-04-06 adedotua improvements
**********************************************************************************************/
"""

In [None]:
c.execute("""

prepare find_drop_userobjs(varchar) as 
select owner.objtype,owner.objowner,owner.userid,owner.schemaname,owner.objname from(
-- Functions owned by the user
select 'Function',pgu.usename,pgu.usesysid,nc.nspname,textin(regprocedureout(pproc.oid::regprocedure))
from 
pg_proc pproc,pg_user pgu,pg_namespace nc 
where pproc.pronamespace=nc.oid and pproc.proowner=pgu.usesysid
UNION ALL
-- Databases owned by the user
select 'Database',pgu.usename,pgu.usesysid,null,pgd.datname from pg_database pgd,pg_user pgu where pgd.datdba=pgu.usesysid
UNION ALL
-- Schemas owned by the user
select 'Schema',pgu.usename,pgu.usesysid,null,pgn.nspname from pg_namespace pgn,pg_user pgu where pgn.nspowner=pgu.usesysid
UNION ALL
-- Tables or Views owned by the user
select decode(pgc.relkind,'r','Table','v','View'),pgu.usename,pgu.usesysid,nc.nspname,pgc.relname
from
pg_class pgc,pg_user pgu,pg_namespace nc 
where pgc.relnamespace=nc.oid and pgc.relkind in ('r','v') and pgu.usesysid=pgc.relowner) owner("objtype","objowner","userid","schemaname","objname")
where owner.objowner = $1;

""")

In [24]:
conn.commit()

In [30]:
c.execute("execute find_drop_userobjs('salesperson_1');").fetch_dataframe()

In [34]:
c.execute("""
CREATE OR REPLACE VIEW admin.v_get_obj_priv_by_user
AS
SELECT
    * 
FROM 
    (
    SELECT 
        schemaname
        ,objectname
        ,usename
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'select') AS sel
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'insert') AS ins
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'update') AS upd
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'delete') AS del
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'references') AS ref
    FROM
        (
        SELECT schemaname, 't' AS obj_type, tablename AS objectname, QUOTE_IDENT(schemaname) || '.' || QUOTE_IDENT(tablename) AS fullobj FROM pg_tables
        WHERE schemaname not in ('pg_internal')
        UNION
        SELECT schemaname, 'v' AS obj_type, viewname AS objectname, QUOTE_IDENT(schemaname) || '.' || QUOTE_IDENT(viewname) AS fullobj FROM pg_views
        WHERE schemaname not in ('pg_internal')
        ) AS objs
        ,(SELECT * FROM pg_user) AS usrs
    ORDER BY fullobj
    )
WHERE (sel = true or ins = true or upd = true or del = true or ref = true)
;
""")
conn.commit()

In [5]:
def create_schema(name):
    c.execute(f"CREATE SCHEMA if not exists {name};")


def create_group(name):
    c.execute(f"CREATE GROUP {name};")


def grant_usage_and_select(schema, group):
    c.execute(f"GRANT USAGE on SCHEMA {schema} to GROUP {group};")
    c.execute(f"GRANT SELECT ON ALL TABLES IN SCHEMA {schema} TO GROUP {group};")


def create_fake_users(num_users):
    fake_list = []
    for _ in range(num_users):
        first = fake.first_name().lower()
        last = fake.last_name().lower()
        full = f'{first}_{last}'

        fake_list.append(full)
    return fake_list


def add_users_to_group(users, group):
    for user in users:

        c.execute(f"CREATE USER {user} password 'xyzzy-1-XYZZY';")
        c.execute(f"ALTER GROUP {group} ADD USER {user};")


def delete_schemas():
    for schema in schemas:
        try:
            c.execute(f"drop schema if exists {schema} cascade;")
        except Exception:
            continue


def delete_groups():
    for group in groups:
        try:
            c.execute(f"drop group {group};")
        except Exception:
            continue


def delete_users():
    try:
        all_users = c.execute('select * from pg_user;').fetchall()
    except UnicodeDecodeError:
        all_users = c.execute('select * from pg_user;').fetchall()
    protected_users = ['chris-birch-admin']
    new_owner = 'chris-birch-admin'
    users_to_drop = [user[0] for user in all_users if user[0] not in protected_users]

    # Templates to change ownership, revoke permissions, and drop users
    change_ownership = "select 'alter table '+schemaname+'.'+tablename+' owner to %s;' from pg_tables where tableowner like '%s'"
    revoke_schema_permissions = "select distinct 'revoke all on schema '+schemaname+' from %s;' from admin.v_get_obj_priv_by_user where usename like '%s'"
    revoke_table_permissions = "select distinct 'revoke all on all tables in schema '+schemaname+' from %s;' from admin.v_get_obj_priv_by_user where usename like '%s'"
    drop_user = "drop user %s;"

    for user in users_to_drop:
        # Change ownership
        c.execute(change_ownership % (new_owner, user))
        for r in c.fetchall():
            print("Executing: %s" % r[0])
            c.execute(r[0])
        # Revoke schema permissions
        c.execute(revoke_schema_permissions % (user, user))
        for r in c.fetchall():
            print("Executing: %s" % r[0])
            c.execute(r[0])
        # Revoke table permissions
        c.execute(revoke_table_permissions % (user, user))
        for r in c.fetchall():
            print("Executing: %s" % r[0])
            c.execute(r[0])
        # Drop user
        c.execute(drop_user % (user))
    conn.commit()


def create_table(table):
    c.execute(f'create table {table} (colors varchar(20))')


def _make_tuple_colors(num):
    return tuple(zip([fake.color_name() for _ in range(num)]))


def insert_colors_in_table(table):
    c.executemany(f'insert into {table} (colors) values (%s)', _make_tuple_colors(20))


In [6]:
delete_schemas()
conn.commit()

In [7]:
delete_groups()
conn.commit()

In [8]:
delete_users()
conn.commit()

ProgrammingError: {'S': 'ERROR', 'C': '3F000', 'M': 'schema "admin" does not exist', 'F': '/home/ec2-user/padb/src/pg/src/backend/parser/parse_relation.c', 'L': '4836', 'R': 'CheckUnnestValidNavigation'}

In [None]:
for schema in schemas:
    create_schema(schema)
conn.commit()
for schema in schemas:
    for table in tables:
        full_table = f'{schema}.{table}'
        create_table(full_table)
        insert_colors_in_table(full_table)
conn.commit()

In [None]:
for group in groups:
    create_group(group)
    users = create_fake_users(20)
    add_users_to_group(users, group)

In [None]:
for schema in schema_group_permissions:
    grant_usage_and_select(schema, schema_group_permissions.get(schema))

In [None]:
c.execute('select * from schema_parts.table_a')
c.fetch_dataframe()

In [None]:
def check_schema_exists(schema):
    sql = f"""
    select exists (
      select * from pg_tables
      where schemaname = '{schema}'
    );
    """
    return c.execute(sql).fetchone()[0]

def check_table_exists(schema, table):
    sql = f"""
    select exists (
      select * from pg_tables
      where schemaname = '{schema}'
      and tablename = '{table}'
    );
    """
    return c.execute(sql).fetchone()[0]

In [None]:
def check_schema_table_exists(schema_tables):
    error_list = []
    for st in schema_tables:
        s, t = st.split('.')
        schema_exists = check_schema_exists(s)
        table_exists = check_table_exists(s, t)
        if schema_exists == False:
            error_list.append(f"SCHEMA '{s}' does not exist.")
            continue
        if schema_exists == True and table_exists == False:
            error_list.append(f"TABLE '{t}' does not exist inside SCHEMA '{s}'")
    return sorted(list(set(error_list)))

In [None]:
check_schema_table_exists(test_schemas)

In [None]:
def get_users_in_security_groups(security_groups):
    for sg in security_groups:
        # get the username column
        pass

def get_users_with_no_security_groups(security_groups):
    for sg in security_groups:
        # get the username column
        pass

In [None]:
c.execute('revoke all on all from all for all;')


# Demo
- https://blog.satoricyber.com/hardening-aws-redshift-security-access-controls-explained

# Users and Groups

In [None]:
# view assigned roles to users
c.execute("""
SELECT usename AS user_name, groname AS group_name 
FROM pg_user, pg_group
WHERE pg_user.usesysid = ANY(pg_group.grolist)
AND pg_group.groname in (SELECT DISTINCT pg_group.groname from pg_group)
"""
).fetch_dataframe()

In [None]:
c.execute('select * from pg_group;').fetch_dataframe()

In [None]:
c.execute('select nspname from pg_catalog.pg_namespace;').fetch_dataframe()

In [None]:
c.execute("select nspname from pg_catalog.pg_namespace where nspname like '%schema' ").fetch_dataframe()

In [None]:
shipping_table_sql = """
CREATE TABLE sales.orders (
order_id varchar(255), order_checksum int, shipping_firstname varchar(50), 
shipping_middlename varchar(25), shipping_lastname varchar(50), 
shipping_street1 varchar(255), shipping_street2 varchar(255), 
shipping_street3 varchar(255), shipping_zipcode varchar(15), 
shipping_pob varchar(15), shipping_city varchar(50), 
shipping_phone1 varchar(50), shipping_phone2 varchar(50), 
shipping_cellular varchar(50), shipping_hours varchar(50), 
shipping_comments varchar(255), payer_creditcard varchar(19), 
payer_expmonth varchar(2), payer_expyear varchar(4), 
payer_firstname varchar(50), payer_middlename varchar(25), 
payer_lastname varchar(50), payer_street1 varchar(255), 
payer_street2 varchar(255), payer_street3 varchar(255), 
payer_zipcode varchar(15), payer_pob varchar(15), payer_city varchar(50), 
payer_phone1 varchar(50), payer_phone2 varchar(50), payer_cellular varchar(50), 
payer_hours varchar(50), payer_comments varchar(255));
"""
c.execute(shipping_table_sql)

In [None]:
# create user with read-only access to orders table
c.execute("CREATE USER shipping PASSWORD 'xyzzy-1-XYZZY';")
c.execute("GRANT SELECT ON sales.orders TO shipping;")

## Column Level Security

In [None]:
# Revoking the existing SELECT privilege on the entire table
c.execute("REVOKE SELECT ON sales.orders FROM shipping;")
# Granting SELECT privilege specifically to all columns except for the forbidden ones
c.execute("""
GRANT SELECT(order_id, order_checksum, shipping_firstname, 
shipping_middlename, shipping_lastname, shipping_street1,
shipping_street2, shipping_street3, shipping_zipcode, shipping_pob, 
shipping_city, shipping_phone1, shipping_phone2, shipping_cellular, 
shipping_hours, shipping_comments, payer_firstname, payer_middlename,
payer_lastname, payer_street1, payer_street2, payer_street3, payer_zipcode, 
payer_pob, payer_city, payer_phone1, payer_phone2, payer_cellular, 
payer_hours, payer_comments)
ON sales.orders TO shipping"""
)


## Row Level Security

In [None]:
c.execute("""
CREATE TABLE department_employees (
id int,
name varchar(50),
phone varchar(50),
salary smallint,
department varchar(50));
""")

c.execute("""
INSERT INTO department_employees VALUES
(1, 'Seller McSeller', '+1-212-5555555', 180, 'sales'),
(2, 'Sir Sell-A-Lot', '+1-212-5556666', 240, 'sales'),
(3, 'Marky McMarket', '+1-716-5555555', 210, 'marketing'),
(4, 'Sir Market-A-Lot', '+1-716-5556666', 270, 'marketing');
""")

In [None]:
c.execute("""
CREATE TABLE users_to_groups
(user_name varchar(100), group_name varchar(100));
""")

c.execute("""
INSERT INTO users_to_groups VALUES
('marketing_accountant', 'marketing');
""")

# Let's also create an accountant user
c.execute("""
CREATE USER marketing_accountant WITH PASSWORD 'xyzzy-1-XYZZY';
""")

In [None]:
c.execute("SELECT * FROM department_employees WHERE department IN (SELECT group_name FROM users_to_groups WHERE user_name='marketing_accountant')").fetch_dataframe()

In [None]:
c.execute("""
CREATE VIEW v_department_employees AS
SELECT * FROM department_employees
WHERE department IN (SELECT group_name FROM users_to_groups WHERE user_name=CURRENT_USER);
""")

In [None]:

# Granting access to the user in views
c.execute("GRANT SELECT ON users_to_groups TO marketing_accountant;")
c.execute("GRANT SELECT ON v_department_employees TO marketing_accountant;")


In [None]:

# Switching to use the context of the user 'marketing_accountant'
c.execute("SET SESSION AUTHORIZATION marketing_accountant;")
c.execute("SELECT * FROM department_employees;").fetchall()

#  We get a permission denied error, as we don't have access to the table itself:
#  Invalid operation: permission denied for relation department_employees


In [None]:

# We now get the filtered rows */
c.execute("SELECT * FROM v_department_employees;").fetch_dataframe()

In [None]:
c.execute("SET SESSION AUTHORIZATION 'chris-birch-admin';")
c.execute("SELECT * FROM department_employees;").fetch_dataframe()

In [None]:
# look for failed logins
c.execute("""
SELECT *
FROM stl_connection_log
WHERE event='authentication failure'
ORDER BY recordtime;
""").fetch_dataframe()

In [None]:
# show successful auth users by hour
c.execute("""
SELECT DATE_PART(YEAR, recordtime) || '-' ||
	LPAD(DATE_PART(MONTH, recordtime),2,'0') || '-' ||
	LPAD(DATE_PART(DAY, recordtime),2,'0') || ' ' ||
	LPAD(DATE_PART(HOUR, recordtime),2,'0') AS hour_bucket, username, COUNT(*)
FROM stl_connection_log
WHERE event = 'authenticated'
GROUP BY 1, 2
ORDER BY 1, 2 DESC;
""").fetch_dataframe()

In [None]:
# successful auth by hour, exclude rdsdb
c.execute("""
SELECT DATE_PART(YEAR, recordtime) || '-' ||
	LPAD(DATE_PART(MONTH, recordtime),2,'0') || '-' ||
	LPAD(DATE_PART(DAY, recordtime),2,'0') || ' ' ||
	LPAD(DATE_PART(HOUR, recordtime),2,'0') AS hour_bucket, username, COUNT(*)
FROM stl_connection_log
WHERE event = 'authenticated'
AND username != 'rdsdb'
GROUP BY 1, 2
ORDER BY 1, 2 DESC;
""").fetch_dataframe()

In [None]:
# show successful auth by number of auth
c.execute("""
SELECT username, event, COUNT(*)
FROM stl_connection_log
WHERE event = 'authenticated'
GROUP BY 1, 2
ORDER BY 3 DESC;
""").fetch_dataframe()

In [None]:
# connection drivers used
c.execute("""
SELECT username, application_name, COUNT(*) 
FROM stl_connection_log
WHERE application_name != ''
GROUP BY 1,2
ORDER BY 1,2;
""").fetch_dataframe()

In [None]:
c.execute("""
SELECT * FROM STL_QUERY
LIMIT 100;
""").fetch_dataframe()

In [None]:
# view permissions for 'user' on 'schema'
c.execute("""
SELECT
    u.usename,
    s.schemaname,
    has_schema_privilege(u.usename,s.schemaname,'create') AS create_permission,
    has_schema_privilege(u.usename,s.schemaname,'usage') AS usage_permission
FROM
    pg_user u
CROSS JOIN
    (SELECT DISTINCT schemaname FROM pg_tables) s
WHERE
    u.usename = 'chris-birch-admin'
    AND s.schemaname = 'second_schema';
""").fetch_dataframe()

In [None]:
# view all permissions for all users
c.execute("""
SELECT
    u.usename,
    s.schemaname,
    has_schema_privilege(u.usename,s.schemaname,'create') AS create_permission,
    has_schema_privilege(u.usename,s.schemaname,'usage') AS usage_permission
FROM
    pg_user u
CROSS JOIN
    (SELECT DISTINCT schemaname FROM pg_tables) s;
""").fetch_dataframe()

In [None]:
# user has permissions on specific table
c.execute("""
SELECT * 
FROM 
    (
    SELECT 
        schemaname
        ,objectname
        ,usename
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'select') AND has_schema_privilege(usrs.usename, schemaname, 'usage')  AS sel
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'insert') AND has_schema_privilege(usrs.usename, schemaname, 'usage')  AS ins
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'update') AND has_schema_privilege(usrs.usename, schemaname, 'usage')  AS upd
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'delete') AND has_schema_privilege(usrs.usename, schemaname, 'usage')  AS del
        ,HAS_TABLE_PRIVILEGE(usrs.usename, fullobj, 'references') AND has_schema_privilege(usrs.usename, schemaname, 'usage')  AS ref
    FROM
        (
        SELECT schemaname, 't' AS obj_type, tablename AS objectname, schemaname + '.' + tablename AS fullobj FROM pg_tables
        WHERE schemaname not in ('pg_internal')
        UNION
        SELECT schemaname, 'v' AS obj_type, viewname AS objectname, schemaname + '.' + viewname AS fullobj FROM pg_views
        WHERE schemaname not in ('pg_internal')
        ) AS objs
        ,(SELECT * FROM pg_user) AS usrs
    ORDER BY fullobj
    )
WHERE (sel = true or ins = true or upd = true or del = true or ref = true)
and schemaname='third_schema'
and objectname='b'
and usename = 'chris-birch-admin';
""").fetch_dataframe()

In [None]:
df('select default_iam_role();')

In [None]:
df('select user, current_user_id;')

In [None]:
df('select user, current_aws_account, current_database(), current_schema();')

# Audit Logs

In [None]:
df('select * from stl_connection_log;')

In [None]:
c.execute("""
select * from stl_userlog;
""").fetch_dataframe()

In [None]:
c.execute("""
select * from stl_query;
""").fetch_dataframe()

In [None]:
# last 10 queries run on the cluster
df("""
SELECT query, 
LISTAGG(CASE WHEN LEN(RTRIM(text)) = 0 THEN text ELSE RTRIM(text) END) WITHIN GROUP (ORDER BY sequence) AS query_statement, COUNT(*) as row_count 
FROM stl_querytext
GROUP BY query
ORDER BY query desc
LIMIT 10;
""")

In [None]:
# top 10 longest queries
df("""
WITH queries AS (
SELECT query, 
LISTAGG(CASE WHEN LEN(RTRIM(text)) = 0 THEN text ELSE RTRIM(text) END) WITHIN GROUP (ORDER BY sequence) AS query_statement, COUNT(*) as row_count 
FROM stl_querytext
GROUP BY query)
SELECT * FROM queries WHERE query_statement ILIKE 'select%'
ORDER BY LEN(query_statement) DESC 
LIMIT 10;
""")

In [None]:
# all grant and revoke permissions
df("""
WITH util_cmds AS (
SELECT userid, 
LISTAGG(CASE WHEN LEN(RTRIM(text)) = 0
THEN text
ELSE RTRIM(text)
END) 
WITHIN GROUP (ORDER BY sequence) AS query_statement 
FROM stl_utilitytext GROUP BY userid, xid order by xid)
SELECT util_cmds.userid, stl_userlog.username, query_statement
FROM util_cmds
LEFT JOIN stl_userlog ON (util_cmds.userid = stl_userlog.userid)
WHERE query_statement
ILIKE '%GRANT%' OR query_statement ILIKE '%REVOKE%';
""")

In [None]:
# Last 10 failed logins
df("""
SELECT *
FROM stl_connection_log
WHERE event='authentication failure'
ORDER BY recordtime DESC 
LIMIT 10;
""")

# Redshift Data Inventory

In [None]:
# Manual process
df("""
SELECT column_name, table_name, table_schema, table_catalog
FROM information_schema.columns
WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY table_catalog, table_schema, table_name, ordinal_position ;
""")

# Data Masking

In [None]:
# Static masking using Views
df("""
CREATE OR REPLACE VIEW redacted_customers AS
SELECT sha2(first_name, 256) AS first_name, 
sha2(last_name, 256) AS last_name, 
country_code,
REGEXP_REPLACE(email, '[^@]+@', '*@') AS email
FROM customers;
""")

In [None]:
# dynamic masking
df("""
CREATE VIEW v_customers AS
SELECT CASE WHEN CURRENT_USER='admin' THEN first_name ELSE sha2(first_name, 256) END AS first_name,
CASE WHEN CURRENT_USER='admin' THEN last_name ELSE sha2(last_name, 256) END AS last_name,
country_code,
CASE WHEN CURRENT_USER='admin' THEN email ELSE REGEXP_REPLACE(email, '[^@]+@', '*@') END AS email
FROM public.customers;
""")

In [None]:
df("""

""")

In [None]:
df("""

""")

In [None]:
df("""

""")

In [None]:
df("""

""")

In [None]:
df("""

""")

In [None]:
df("""

""")

In [None]:
df("""

""")