In [1]:
%load_ext autoreload
%autoreload 2

# Test connection

In [2]:
# setup logger
from loguru import logger

In [3]:
import psycopg2
conn = psycopg2.connect(
    host='default.561130499334.eu-central-1.redshift-serverless.amazonaws.com',
    port="5439",
    user="awsuser",
    password="R3dsh1ft",
    database='dev'
)
cursor = conn.cursor()
conn.autocommit = True  # do not need commit then

In [4]:
from pprint import pformat

def run(query: str):
    q = query.strip()
    assert q.endswith(";")
    logger.info(f"Running\n {q}")
    cursor.execute(q)
    try:
        result = cursor.fetchall()
        column_names = [desc[0] for desc in cursor.description]
        rows = [dict(zip(column_names, row)) for row in result]
        logger.success(f"{pformat(rows)}")
    except psycopg2.ProgrammingError as e:
        if "no results to fetch" in str(e):
            logger.success("No results to fetch")
        else:
            logger.exception(e)
            conn.rollback()
            raise e
    # conn.commit() # not necessary with autocommit


run('SELECT * FROM DummyTable;')

[32m2023-08-04 00:27:49.686[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 SELECT * FROM DummyTable;[0m
[32m2023-08-04 00:27:49.713[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m12[0m - [32m[1m[{'age': 30, 'id': 1, 'name': 'John'},
 {'age': 25, 'id': 2, 'name': 'Jane'},
 {'age': 40, 'id': 3, 'name': 'Mike'}][0m


In [None]:
# if necessary: close connection
cursor.close()
conn.close()

# Try to diagnose errors and events

In [18]:
# s="""SELECT * FROM SVL_QLOG LIMIT 3;"""
# -- grant select on svv_redshift_databases to awsuser;
# -- grant select on svv_all_schemas to awsuser;
# -- grant select on svv_all_tables to awsuser;
# grant select on stv_recents to awsuser

# s="GRANT ALL PRIVILEGES on all tables in schema public to awsuser;"
s="SELECT * FROM SVL_QLOG LIMIT 3;"
run(s)

[32m2023-08-04 00:33:30.647[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 SELECT * FROM SVL_QLOG LIMIT 3;[0m


InsufficientPrivilege: permission denied for relation svl_qlog


# Using my original SQL statements

## Create tables

In [8]:
from old import sql_queries

In [25]:
for q in sql_queries.drop_table_queries:
    run(q)

[32m2023-07-24 20:04:44.216[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS event_stage_table;[0m
[32m2023-07-24 20:04:44.272[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m16[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 20:04:45.409[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS song_stage_table;[0m
[32m2023-07-24 20:04:45.477[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m16[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 20:04:46.745[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS songplays;[0m
[32m2023-07-24 20:04:46.796[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m16[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 20:04:46.832[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS s

In [15]:
for q in sql_queries.create_table_queries:
    run(q)

[32m2023-07-24 19:34:57.312[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 CREATE TABLE event_stage_table (
    artist VARCHAR(255),
    auth VARCHAR(255),
    firstName VARCHAR(255),
    gender CHAR(1),
    itemInSession SMALLINT,
    lastName VARCHAR(255),
    length FLOAT,
    level VARCHAR(50),
    location VARCHAR(255),
    method VARCHAR(10),
    page VARCHAR(50),
    registration BIGINT,
    sessionId INTEGER,
    song VARCHAR(255),
    status SMALLINT,
    ts BIGINT,
    userAgent VARCHAR(255),
    userId INTEGER
);[0m
[32m2023-07-24 19:34:57.375[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m12[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:34:58.497[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 CREATE TABLE song_stage_table (
    artist_id VARCHAR(50),
    artist_latitude FLOAT,
    artist_location VARCHAR(500),
    artist_longitude FLOAT,
    artist_name VARCHAR(500),
    

## Load tables into staging

In [16]:
for q in sql_queries.copy_table_queries:
    run(q)

[32m2023-07-24 19:35:36.050[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 COPY event_stage_table
FROM 's3://udacity-dataengineer-pipeline-project-s3/log_data'
IAM_ROLE 'arn:aws:iam::561130499334:role/my-redshift-service-role'
--JSON 'auto' -- doesn't work, because keys don't match perfectly
JSON 's3://udacity-dataengineer-pipeline-project-s3/log_json_path.json'
STATUPDATE ON
MAXERROR 1
COMPUPDATE OFF;[0m
[32m2023-07-24 19:35:39.232[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m12[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:35:40.173[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 COPY song_stage_table
FROM 's3://udacity-dataengineer-pipeline-project-s3/song_data/A/A/A/' -- use subset
-- FROM 's3://udacity-dataengineer-pipeline-project-s3/song_data/'
IAM_ROLE 'arn:aws:iam::561130499334:role/my-redshift-service-role'
JSON 'auto'
STATUPDATE ON
MAXERROR 1
COMPUPDATE OFF;[0m
[32m2023

In [19]:
import utils
for t in ["event_stage_table", "song_stage_table"]:
    run(utils.get_list_query(t))

[32m2023-07-24 19:42:04.985[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 SELECT * FROM event_stage_table LIMIT 3;[0m
[32m2023-07-24 19:42:05.044[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m11[0m - [32m[1m[{'artist': 'Dee Dee Bridgewater',
  'auth': 'Logged In',
  'firstname': 'Lily',
  'gender': 'F',
  'iteminsession': 38,
  'lastname': 'Koch',
  'length': 318.64118,
  'level': 'paid',
  'location': 'Chicago-Naperville-Elgin, IL-IN-WI',
  'method': 'PUT',
  'page': 'NextSong',
  'registration': 1541048010796,
  'sessionid': 818,
  'song': 'La Vie En Rose',
  'status': 200,
  'ts': 1542845032796,
  'useragent': '"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, '
               'like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 '
               'Safari/537.36"',
  'userid': 15},
 {'artist': "Tim O'brien",
  'auth': 'Logged In',
  'firstname': 'Lily',
  'gender': 'F',
  'iteminsession': 39,
  'lastnam

## Insert data

In [20]:
for q in sql_queries.insert_table_queries:
    run(q)

[32m2023-07-24 19:44:54.587[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 INSERT INTO users (user_id, first_name, last_name, gender, level)
SELECT DISTINCT userId, firstName, lastName, gender, level
FROM event_stage_table
WHERE userId IS NOT NULL;[0m
[32m2023-07-24 19:45:03.167[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m14[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:45:04.444[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 INSERT INTO artists (artist_id, name, location, latitude, longitude)
SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude
FROM song_stage_table;[0m
[32m2023-07-24 19:45:13.002[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m14[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:45:14.131[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m5[0m - [1mRunning
 INSERT INTO time (start_time

In [24]:
for t in ["songplays","users","songs","artists","time"]:
    # run(f"DELETE FROM {t};")
    run(utils.get_drop_query(t))

[32m2023-07-24 19:58:29.165[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS songplays;[0m
[32m2023-07-24 19:58:29.227[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m16[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:58:30.315[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS users;[0m
[32m2023-07-24 19:58:30.386[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m16[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:58:31.323[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS songs;[0m
[32m2023-07-24 19:58:31.380[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m16[0m - [32m[1mNo results to fetch[0m
[32m2023-07-24 19:58:33.451[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m7[0m - [1mRunning
 DROP TABLE IF EXISTS artists;[0m
[32m2023-0

# Using new statements

## Create tables

In [20]:
from old import utils

for t in ["artists","songplays", "songs","staging_events","staging_songs","time","users"]:
    run(utils.get_drop_query(t))

[32m2023-08-04 00:44:30.661[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 DROP TABLE IF EXISTS artists;[0m
[32m2023-08-04 00:44:30.695[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m15[0m - [32m[1mNo results to fetch[0m
[32m2023-08-04 00:44:30.696[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 DROP TABLE IF EXISTS songplays;[0m
[32m2023-08-04 00:44:30.728[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m15[0m - [32m[1mNo results to fetch[0m
[32m2023-08-04 00:44:30.730[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 DROP TABLE IF EXISTS songs;[0m
[32m2023-08-04 00:44:30.765[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m15[0m - [32m[1mNo results to fetch[0m
[32m2023-08-04 00:44:30.766[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 DROP TABLE IF EXISTS staging_events;[0m
[

In [21]:
from pathlib import Path

sql_filepath = Path().resolve().parents[1] / "template/create_tables.sql"
sql_filepath.exists()

True

In [22]:
txt = sql_filepath.read_text()
create_queries = [s.replace("\n","").replace("\t","") for s in txt.split(";")]
create_queries

['CREATE TABLE public.artists (artistid varchar(256) NOT NULL,name varchar(256),location varchar(256),lattitude numeric(18,0),longitude numeric(18,0))',
 'CREATE TABLE public.songplays (playid varchar(32) NOT NULL,start_time timestamp NOT NULL,userid int4 NOT NULL,"level" varchar(256),songid varchar(256),artistid varchar(256),sessionid int4,location varchar(256),user_agent varchar(256),CONSTRAINT songplays_pkey PRIMARY KEY (playid))',
 'CREATE TABLE public.songs (songid varchar(256) NOT NULL,title varchar(256),artistid varchar(256),"year" int4,duration numeric(18,0),CONSTRAINT songs_pkey PRIMARY KEY (songid))',
 'CREATE TABLE public.staging_events (artist varchar(256),auth varchar(256),firstname varchar(256),gender varchar(256),iteminsession int4,lastname varchar(256),length numeric(18,0),"level" varchar(256),location varchar(256),"method" varchar(256),page varchar(256),registration numeric(18,0),sessionid int4,song varchar(256),status int4,ts int8,useragent varchar(256),userid int4)',

In [23]:
for q in create_queries:
    if q:
        run(q + ";")

[32m2023-08-04 00:44:45.037[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 CREATE TABLE public.artists (artistid varchar(256) NOT NULL,name varchar(256),location varchar(256),lattitude numeric(18,0),longitude numeric(18,0));[0m
[32m2023-08-04 00:44:46.329[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m15[0m - [32m[1mNo results to fetch[0m
[32m2023-08-04 00:44:46.332[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 CREATE TABLE public.songplays (playid varchar(32) NOT NULL,start_time timestamp NOT NULL,userid int4 NOT NULL,"level" varchar(256),songid varchar(256),artistid varchar(256),sessionid int4,location varchar(256),user_agent varchar(256),CONSTRAINT songplays_pkey PRIMARY KEY (playid));[0m
[32m2023-08-04 00:44:47.519[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m15[0m - [32m[1mNo results to fetch[0m
[32m2023-08-04 00:44:47.522[0m | [1mINFO    [0m | [36m__ma

## Copy tables

In [24]:
q2="""
COPY staging_songs
FROM 's3://udacity-dataengineer-pipeline-project-s3/song_data/A/A/A/' -- use subset
-- FROM 's3://udacity-dataengineer-pipeline-project-s3/song_data/'
IAM_ROLE 'arn:aws:iam::561130499334:role/my-redshift-service-role'
JSON 'auto'
STATUPDATE OFF
MAXERROR 1
COMPUPDATE OFF;
"""
run(q2)

[32m2023-08-04 00:44:58.357[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 COPY staging_songs
FROM 's3://udacity-dataengineer-pipeline-project-s3/song_data/A/A/A/' -- use subset
-- FROM 's3://udacity-dataengineer-pipeline-project-s3/song_data/'
IAM_ROLE 'arn:aws:iam::561130499334:role/my-redshift-service-role'
JSON 'auto'
STATUPDATE OFF
MAXERROR 1
COMPUPDATE OFF;[0m


InternalError_: abort query
DETAIL:  
  -----------------------------------------------
  error:  abort query
  code:      1020
  context:   system requested abort
  query:     2418347[child_sequence:1]
  location:  queryabort.hpp:410
  process:   padbmaster [pid=1073905947]
  -----------------------------------------------



In [None]:
run(utils.get_list_query("staging_songs"))

In [14]:
q1 ="""
COPY staging_events
FROM 's3://udacity-dataengineer-pipeline-project-s3/log_data'
IAM_ROLE 'arn:aws:iam::561130499334:role/my-redshift-service-role'
JSON 's3://udacity-dataengineer-pipeline-project-s3/log_json_path.json'
STATUPDATE ON
MAXERROR 1
COMPUPDATE OFF;
"""
run(q1)

[32m2023-08-02 19:11:25.444[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 COPY staging_events
FROM 's3://udacity-dataengineer-pipeline-project-s3/log_data'
IAM_ROLE 'arn:aws:iam::561130499334:role/my-redshift-service-role'
JSON 's3://udacity-dataengineer-pipeline-project-s3/log_json_path.json'
STATUPDATE ON
MAXERROR 1
COMPUPDATE OFF;[0m
[32m2023-08-02 19:11:28.459[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m15[0m - [32m[1mNo results to fetch[0m


In [15]:
run(utils.get_list_query("staging_events"))

[32m2023-08-02 19:11:28.529[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 SELECT * FROM staging_events LIMIT 3;[0m
[32m2023-08-02 19:11:31.467[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m12[0m - [32m[1m[{'artist': None,
  'auth': 'Logged In',
  'firstname': 'Walter',
  'gender': 'M',
  'iteminsession': 0,
  'lastname': 'Frye',
  'length': None,
  'level': 'free',
  'location': 'San Francisco-Oakland-Hayward, CA',
  'method': 'GET',
  'page': 'Home',
  'registration': Decimal('1540919166796'),
  'sessionid': 38,
  'song': None,
  'status': 200,
  'ts': 1541105830796,
  'useragent': '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) '
               'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 '
               'Safari/537.36"',
  'userid': 39},
 {'artist': None,
  'auth': 'Logged In',
  'firstname': 'Kaylee',
  'gender': 'F',
  'iteminsession': 0,
  'lastname': 'Summers',
  'length': None,
  'level': 'free',
  

## Insert

In [1]:
from ...template.plugins.helpers.sql_queries import SqlQueries

dct = {k: v for k,v in vars(SqlQueries).items() if k.endswith("_insert")}
dct

{'songplay_table_insert': '\n        INSERT INTO public.songplays (playid, start_time, userid, "level", songid, artistid, sessionid, location, user_agent)\n        SELECT\n                md5(events.sessionid || events.start_time) songplay_id,\n                events.start_time, \n                events.userid, \n                events.level, \n                songs.song_id, \n                songs.artist_id, \n                events.sessionid, \n                events.location, \n                events.useragent\n                FROM (SELECT TIMESTAMP \'epoch\' + ts/1000 * interval \'1 second\' AS start_time, *\n            FROM staging_events\n            WHERE page=\'NextSong\') events\n            LEFT JOIN staging_songs songs\n            ON events.song = songs.title\n                AND events.artist = songs.artist_name\n                AND events.length = songs.duration\n    ',
 'user_table_insert': '\n        INSERT INTO public.users (userid, first_name, last_name, gender, "lev

In [17]:
for k,v in dct.items():
    run(v + ";")

[32m2023-08-02 19:11:40.290[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 INSERT INTO public.songplays (playid, start_time, userid, "level", songid, artistid, sessionid, location, user_agent)
        SELECT
                md5(events.sessionid || events.start_time) songplay_id,
                events.start_time, 
                events.userid, 
                events.level, 
                songs.song_id, 
                songs.artist_id, 
                events.sessionid, 
                events.location, 
                events.useragent
                FROM (SELECT TIMESTAMP 'epoch' + ts/1000 * interval '1 second' AS start_time, *
            FROM staging_events
            WHERE page='NextSong') events
            LEFT JOIN staging_songs songs
            ON events.song = songs.title
                AND events.artist = songs.artist_name
                AND events.length = songs.duration
    ;[0m
[32m2023-08-02 19:11:48.427[0m | [32m[1mSUCCE

In [18]:
for t in ["public.songs", "public.users"]:
    run(utils.get_list_query(t))

[32m2023-08-02 19:12:03.245[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 SELECT * FROM public.songs LIMIT 3;[0m
[32m2023-08-02 19:12:07.475[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mrun[0m:[36m12[0m - [32m[1m[{'artistid': 'AR1KTV21187B9ACD72',
  'duration': Decimal('343'),
  'songid': 'SOSMJFC12A8C13DE0C',
  'title': 'Is That All There Is?',
  'year': 0},
 {'artistid': 'ARA23XO1187B9AF18F',
  'duration': Decimal('192'),
  'songid': 'SOKTJDS12AF72A25E5',
  'title': 'Drown In My Own Tears (24-Bit Digitally Remastered 04)',
  'year': 0},
 {'artistid': 'ARSVTNL1187B992A91',
  'duration': Decimal('129'),
  'songid': 'SOEKAZG12AB018837E',
  'title': "I'll Slap Your Face (Entertainment USA Theme)",
  'year': 2001}][0m
[32m2023-08-02 19:12:07.478[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m6[0m - [1mRunning
 SELECT * FROM public.users LIMIT 3;[0m
[32m2023-08-02 19:12:07.566[0m | [32m[1mSUCCESS [0m | [36m__main_