In [3]:
import pandas as pd
import boto3
import json
import psycopg2




# Load DWH Params from a file

In [4]:
import configparser
import os

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = os.getenv("AWS_ACCESS_KEY_ID")
SECRET = os.getenv("AWS_SECRET_ACCESS_KEY")

DWH_CLUSTER_TYPE       = config.get("DWH_SETUP","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH_SETUP","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH_SETUP","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH_SETUP","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DB_PORT")

DWH_IAM_ROLE_NAME      = config.get("IAM_ROLE", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,sparkify
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


# Create IAM, Redshift, EC2, & S# clients

In [87]:
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

# Actually, I don't think I need this one                       
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )                       

# Create IAM Role

In [6]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::229998455760:role/dwhRole


# Create the Redshift Cluster

In [9]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


# Validate the Redshift cluster is available

In [88]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClientError: An error occurred (InvalidClientTokenId) when calling the DescribeClusters operation: The security token included in the request is invalid.

# Cluster Endpoint & Role ARN

In [8]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cvjkim9fsfbc.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::229998455760:role/dwhRole


# Open an incoming  TCP port to access the cluster ednpoint 

In [9]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0de7c4f3ea7e375a6')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


# Validate Connection

In [10]:
# %load_ext sql

In [11]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
conn.autocommit = True
cur = conn.cursor()


In [15]:
cur.execute("SELECT 10")
# conn.commit()
cur.fetchall()

[(10,)]

In [16]:
cur.execute("SELECT DISTINCT tablename FROM pg_table_def where schemaname='sparkify'")
# conn.commit()
cur.fetchall()

[]

In [13]:
# conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
# print(conn_string)
# %sql $conn_string

# Clean Up Resources

In [81]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'dwhuser',
  'DBName': 'sparkify',
  'Endpoint': {'Address': 'dwhcluster.cvjkim9fsfbc.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2024, 6, 17, 23, 17, 26, 975000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0de7c4f3ea7e375a6',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-056d8a9859858477e',
  'AvailabilityZone': 'us-west-2d',
  'PreferredMaintenanceWindow': 'wed:09:30-wed:10:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4

In [86]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClientError: An error occurred (InvalidClientTokenId) when calling the DescribeClusters operation: The security token included in the request is invalid.

In [28]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': 'e72aa4da-4ebc-4271-81bf-bd5baab71dd5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Mon, 17 Jun 2024 22:02:48 GMT',
   'x-amzn-requestid': 'e72aa4da-4ebc-4271-81bf-bd5baab71dd5',
   'content-type': 'text/xml',
   'content-length': '200'},
  'RetryAttempts': 0}}

# Fiddle

In [36]:
cur.execute("""
CREATE TABLE songs (
    num_songs INTEGER,
    artist_id VARCHAR(50),
    artist_latitude DECIMAL(9,6),
    artist_longitude DECIMAL(9,6),
    artist_location VARCHAR(255),
    artist_name VARCHAR(255),
    song_id VARCHAR(50),
    title VARCHAR(255),
    duration DECIMAL(15,5),
    year INTEGER
);
""")
cur.fetchall()

DuplicateTable: Relation "songs" already exists


In [37]:
cur.execute("""
select * from songs limit 100
""")
cur.fetchall()

[(1,
  'AR9E1QW1187B999A34',
  None,
  None,
  '',
  'Mikel Erentxun',
  'SOSBVKG12A8C1409B8',
  'En Que Mujer',
  Decimal('210.33751'),
  2000),
 (1,
  'AR3793X1187FB50CB3',
  None,
  None,
  'Middlesbrough, England',
  'Chris Rea',
  'SODHVES12A6701CE6B',
  'Driving Home For Christmas',
  Decimal('241.16200'),
  1986),
 (1,
  'ARNULK21187FB46537',
  None,
  None,
  '',
  'Grandmaster Flash',
  'SOOTBYV12AB0186EC8',
  'Gold',
  Decimal('260.91057'),
  1988),
 (1,
  'AROLE161187B98F7E4',
  None,
  None,
  '',
  'Olle Adolphson',
  'SOOBRRO12A8C139153',
  "Tatuerarevalsen (remaster '03)",
  Decimal('181.28934'),
  0),
 (1,
  'AR52SSB1187B9B3E5F',
  None,
  None,
  '',
  'Aquagen',
  'SOQNYWI12AB01840AA',
  'Phatt Bass',
  Decimal('242.70321'),
  0),
 (1,
  'AR567PN1187FB4B4F2',
  None,
  None,
  '',
  'Moss',
  'SONNOCP12A8C145220',
  'Silent Hill',
  Decimal('329.35138'),
  2009),
 (1,
  'AR98JLC1187B9ADE23',
  None,
  None,
  'Los Angeles, CA',
  'Megadeth',
  'SOYSCPE12AB0181BBF',
  

In [38]:
cur.execute("""
drop table songs
""")
cur.fetchall()

ProgrammingError: no results to fetch

In [53]:
cur.execute("""
select * from song_events
""")
cur.fetchall()

[(None,
  'Logged Out',
  None,
  None,
  0,
  None,
  None,
  'free',
  None,
  'PUT',
  'Login',
  None,
  52,
  None,
  307,
  1541207073796,
  None,
  None),
 (None,
  'Logged In',
  'Celeste',
  'F',
  1,
  'Williams',
  None,
  'free',
  'Klamath Falls, OR',
  'GET',
  'Home',
  1541077528796,
  52,
  None,
  200,
  1541207123796,
  '"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"',
  53),
 ('Mynt',
  'Logged In',
  'Celeste',
  'F',
  2,
  'Williams',
  166.94812,
  'free',
  'Klamath Falls, OR',
  'PUT',
  'NextSong',
  1541077528796,
  52,
  'Playa Haters',
  200,
  1541207150796,
  '"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"',
  53),
 ('Taylor Swift',
  'Logged In',
  'Celeste',
  'F',
  3,
  'Williams',
  230.47791,
  'free',
  'Klamath Falls, OR',
  'PUT',
  'NextSong',
  1541077528796,
  52,
  'You Belong With Me',
  200,
  1541207316796,
  '

In [58]:
cur.execute("""
    copy songs from 's3://udacity-dend/song_data'
    credentials 'aws_iam_role={}'
    json 'auto'
    region 'us-west-2';
""".format(DWH_ROLE_ARN)
)
cur.fetchall()

ProgrammingError: no results to fetch

In [52]:
cur.execute("""
COPY song_events FROM 's3://udacity-dend/log_data' 
    CREDENTIALS 'aws_iam_role={}'
    JSON 's3://udacity-dend/log_json_path.json'
    REGION 'us-west-2';
""".format(DWH_ROLE_ARN)
)
cur.fetchall()

ProgrammingError: no results to fetch

In [41]:
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="song_data"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json')
s3.ObjectSummary(

KeyboardInterrupt: 

# Data Exploration

In [1]:
import pandasql as ps
import pandas as pd



In [17]:
cur.execute("""
select * from song_events
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
song_events = pd.DataFrame(rows, columns=column_names)

In [18]:
song_events.head()

Unnamed: 0,artist,auth,firstname,gender,iteminsession,lastname,length,level,location,method,page,registration,sessionid,song,status,ts,useragent,userid
0,,Logged Out,,,0,,,free,,PUT,Login,,52,,307,1541207073796,,
1,,Logged In,Celeste,F,1,Williams,,free,"Klamath Falls, OR",GET,Home,1541078000000.0,52,,200,1541207123796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36""",53.0
2,Mynt,Logged In,Celeste,F,2,Williams,166.94812,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Playa Haters,200,1541207150796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36""",53.0
3,Taylor Swift,Logged In,Celeste,F,3,Williams,230.47791,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,You Belong With Me,200,1541207316796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36""",53.0
4,Amy Winehouse,Logged In,Celeste,F,4,Williams,229.85098,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Valerie,200,1541207546796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36""",53.0


In [2]:
cur.execute("""
select page, count(1) from song_events group by 1
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

NameError: name 'cur' is not defined

In [22]:
cur.execute("""
select auth, count(1) from song_events where page = 'NextSong' group by 1
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,auth,count
0,Logged In,6820


In [30]:
cur.execute(
    """
select distinct
    artist_id, 
    artist_latitude,
    artist_longitude,
    artist_location,
    artist_name
from songs
limit 100
"""
)
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,artist_id,artist_latitude,artist_longitude,artist_location,artist_name
0,AREW1KO1187B98E42E,,,,The Irish Tenors
1,AR7AE0W1187B98E40E,,,,Intocable
2,ART09111187FB5BF48,32.365029,-88.703069,"Meridian, MS",Ty Herndon;Stephanie Bentley
3,ARHGQUW1187FB3905C,,,,King of Prussia
4,ARPOUCK11E8DC5262E,,,,The Melismatics
...,...,...,...,...,...
95,ARX5KUX1187B989C8D,,,,Expensive Soul
96,ARMYMUD1187B9B018E,51.506320,-0.127140,"London, England",The Wolfgang Press
97,ARKA3ZA11C8A422F8F,,,,Futurecop!
98,AR37ILI1187B98E27B,,,"London, England",Shriekback


In [19]:
songs.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARWCIEL1187FB42EF5,39.29055,-76.6096,B-More,Beach House,SOGMORP12A8C13EF63,Auburn and Ivory,270.00117,2006
1,1,ARHAJH611C8A4219F1,,,,La Bottega Dell'Arte,SODJYTL12A8C14550E,Noi Nel Bene_ Noi Nel Male,223.24199,0
2,1,AR6Y22S1187B99906F,,,,Smartbomb,SONSPZE12AB01819CE,Complicate It,233.27302,2001
3,1,AROVXZM1187FB3C3DB,,,,Richard Durand,SOIQFYZ12AB017D108,Weep (Sundown Mix),510.01423,0
4,1,ARVYYRY1187B9953F0,-26.20494,28.04003,"Johannesburg, South Africa",Trevor Rabin,SOJDUQC12AB0181FE6,Siphon Searches,242.96444,2009


In [1]:
songs.groupby('num_songs')['artist_id'].count()

NameError: name 'songs' is not defined

In [52]:
cur.execute("""
SELECT count(1)
FROM stg_song_events 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,8056


In [53]:
cur.execute("""
SELECT count(1)
FROM stg_songs 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,14896


In [54]:
cur.execute("""
SELECT count(1)
FROM song_plays 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,6820


In [55]:
cur.execute("""
SELECT count(1)
FROM users 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,107


In [56]:
cur.execute("""
SELECT count(1)
FROM songs 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,14896


In [57]:
cur.execute("""
SELECT count(1)
FROM artists 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,10025


In [51]:
cur.execute("""
SELECT count(1)
FROM time_dimensions 
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,count
0,8023


In [36]:
cur.execute("""
SELECT TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second', *
FROM stg_song_events
LIMIT 20
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,?column?,artist,auth,firstname,gender,iteminsession,lastname,length,level,location,method,page,registration,sessionid,song,status,ts,useragent,userid
0,2018-11-05 00:33:12,A Fine Frenzy,Logged In,Anabelle,F,0,Simpson,267.91138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044398796,256,Almost Lover (Album Version),200,1541377992796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36""",69
1,2018-11-05 01:27:22,Nirvana,Logged In,Aleena,F,0,Kirby,214.77832,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,Serve The Servants,200,1541381242796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
2,2018-11-05 01:30:56,Television,Logged In,Aleena,F,1,Kirby,238.49751,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,See No Evil (Remastered LP Version),200,1541381456796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
3,2018-11-05 01:34:54,JOHN COLTRANE,Logged In,Aleena,F,2,Kirby,346.43546,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,Blues To Bechet (LP Version),200,1541381694796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
4,2018-11-05 01:40:40,NOFX,Logged In,Aleena,F,3,Kirby,80.79628,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,It's My Job To Keep Punk Rock Elite,200,1541382040796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
5,2018-11-05 01:42:00,The Backyardigans,Logged In,Aleena,F,4,Kirby,158.85016,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,Into The Thick Of It!,200,1541382120796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
6,2018-11-05 01:44:38,Bruce Springsteen,Logged In,Aleena,F,5,Kirby,202.84036,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,Radio Nowhere,200,1541382278796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
7,2018-11-05 01:48:00,Maroon 5,Logged In,Aleena,F,6,Kirby,173.66159,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,Harder To Breathe,200,1541382480796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
8,2018-11-05 01:50:53,Two Door Cinema Club,Logged In,Aleena,F,7,Kirby,189.67465,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,What You Know,200,1541382653796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44
9,2018-11-05 01:54:02,Five Finger Death Punch,Logged In,Aleena,F,8,Kirby,262.81751,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541022995796,237,Meet the Monster,200,1541382842796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0,44


In [43]:
cur.execute("""
SELECT auth,
    iteminsession AS item_in_session,
    LEVEL,
    location,
    sessionid AS session_id,
    song,
    userid AS user_id,
    TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second' AS time_key
FROM stg_song_events
WHERE PAGE = 'NextSong'
""")
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,auth,item_in_session,level,location,session_id,song,user_id,time_key
0,Logged In,0,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",256,Almost Lover (Album Version),69,2018-11-05 00:33:12
1,Logged In,0,paid,"Waterloo-Cedar Falls, IA",237,Serve The Servants,44,2018-11-05 01:27:22
2,Logged In,1,paid,"Waterloo-Cedar Falls, IA",237,See No Evil (Remastered LP Version),44,2018-11-05 01:30:56
3,Logged In,2,paid,"Waterloo-Cedar Falls, IA",237,Blues To Bechet (LP Version),44,2018-11-05 01:34:54
4,Logged In,3,paid,"Waterloo-Cedar Falls, IA",237,It's My Job To Keep Punk Rock Elite,44,2018-11-05 01:40:40
...,...,...,...,...,...,...,...,...
6815,Logged In,57,paid,"Birmingham-Hoover, AL",1076,The Pretender,16,2018-11-30 18:40:05
6816,Logged In,58,paid,"Birmingham-Hoover, AL",1076,Besos De Ceniza,16,2018-11-30 18:44:36
6817,Logged In,59,paid,"Birmingham-Hoover, AL",1076,Rose,16,2018-11-30 18:47:58
6818,Logged In,60,paid,"Birmingham-Hoover, AL",1076,The Haunting,16,2018-11-30 18:51:24


In [59]:
cur.execute(
    """
SELECT song,
    artist,
    COUNT(1)
FROM stg_song_events
WHERE PAGE = 'NextSong'
GROUP BY grouping sets ((song), (song, artist))
"""
)
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,song,artist,count
0,Supelicula,Maldita Nerea,1
1,Din Din Wo,Habib KoitÃÂ©,1
2,Greece 2000,Three Drives,5
3,Sorry,Jonas Brothers,1
4,Poor Leno Jakatta Radio Mix,RÃÂ¶yksopp,1
...,...,...,...
10479,Arboretum,,1
10480,Lay Down My Guns,,1
10481,The Woman Downstairs,,1
10482,The Coolest (Explicit Album Version),,1


In [66]:
cur.execute(
    """
    SELECT DISTINCT 
    CAST(TO_CHAR(date_trunc('hour', TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second'), 'YYYYMMDDHH24') AS bigint) AS time_key,
    TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second' AS time_key
    FROM (select * from stg_song_events limit 100)
"""
)
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,time_key,time_key.1
0,2018110301,2018-11-03 01:04:33
1,2018110301,2018-11-03 01:05:23
2,2018110301,2018-11-03 01:05:50
3,2018110301,2018-11-03 01:08:36
4,2018110301,2018-11-03 01:12:26
...,...,...
95,2018110319,2018-11-03 19:13:33
96,2018110319,2018-11-03 19:17:17
97,2018110319,2018-11-03 19:21:49
98,2018110319,2018-11-03 19:26:20


In [80]:
cur.execute(
    """
WITH unique_times AS (
    SELECT DISTINCT 
        date_trunc('hour', TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second') AS trunc_time
    FROM (select * from stg_song_events limit 100)
)
SELECT 
    CAST(TO_CHAR(trunc_time, 'YYYYMMDDHH24') AS bigint) AS time_key,
    CAST(SUBSTRING(CAST(trunc_time AS text) FROM 9 FOR 2) AS integer) AS hour,
    DATE(trunc_time) AS date,
    EXTRACT(
        DAY
        FROM trunc_time
    ) AS DAY,
    EXTRACT(
        WEEK
        FROM trunc_time
    ) AS week,
    EXTRACT(
        MONTH
        FROM trunc_time
    ) AS MONTH,
    EXTRACT(
        QUARTER
        FROM trunc_time
    ) AS quarter,
    EXTRACT(
        YEAR
        FROM trunc_time
    ) AS year,
    EXTRACT(
        DOW
        FROM trunc_time
    ) + 1 AS day_of_week,
    -- Redshift DOW: 0=Sunday, ..., 6=Saturday. Adding 1 to make it 1=Sunday, ..., 7=Saturday.
    CASE
        WHEN EXTRACT(
            DOW
            FROM trunc_time
        ) IN (0, 6) THEN TRUE
        ELSE FALSE
    END AS is_weekend,
    CASE
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) BETWEEN 1 AND 3 THEN EXTRACT(
            YEAR
            FROM trunc_time
        ) - 1
        ELSE EXTRACT(
            YEAR
            FROM trunc_time
        )
    END AS fiscal_year,
    CASE
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) BETWEEN 1 AND 3 THEN 4
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) BETWEEN 4 AND 6 THEN 1
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) BETWEEN 7 AND 9 THEN 2
        ELSE 3
    END AS fiscal_quarter,
    CASE
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) IN (12, 1, 2) THEN 'Winter'
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) IN (3, 4, 5) THEN 'Spring'
        WHEN EXTRACT(
            MONTH
            FROM trunc_time
        ) IN (6, 7, 8) THEN 'Summer'
        ELSE 'Fall'
    END AS season,
    NULL AS special_event -- Placeholder for any special events logic
FROM unique_times
"""
)
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(rows, columns=column_names)

Unnamed: 0,time_key,hour,date,day,week,month,quarter,year,day_of_week,is_weekend,fiscal_year,fiscal_quarter,season,special_event
