In [13]:
import boto3
import configparser
import json
import pandas as pd

In [14]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config.get('AWS','ACCESS_KEY')
SECRET = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH","DWH_DB")
DWH_CLUSTER_DB = config.get("DWH","DWH_DB")
DWH_DB_USER = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH",'DWH_IAM_ROLE_NAME')

FileNotFoundError: [Errno 2] No such file or directory: 'dwh.cfg'

## Create clients

In [3]:
ec2 = boto3.resource('ec2',
                    region_name = 'us-west-2',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

s3 = boto3.resource('s3',
                   region_name = 'us-west-2',
                   aws_access_key_id = KEY,
                   aws_secret_access_key = SECRET)
iam = boto3.client('iam',
                  region_name = 'us-west-2',
                  aws_access_key_id = KEY,
                  aws_secret_access_key = SECRET)
redshift = boto3.client('redshift',
                       region_name = 'us-west-2',
                       aws_access_key_id = KEY,
                       aws_secret_access_key = SECRET)


### Check out the sample data sourcse on S3

In [4]:
LOG_DATA = config.get("S3","BUCKET")
DataBucket = s3.Bucket(LOG_DATA)
for obj in DataBucket.objects.filter(Prefix = 'log_data'):
    print(obj)


NoOptionError: No option 'bucket' in section: 'S3'

In [None]:
fileObj = s3.get_object(Bucket='bucket_name', Key='key')
for row in fileObj["body"]:
    line = row.decode('utf-8')
    print(json.loads(line))


In [None]:

obj = s3.Object(LOG_DATA,'log_data/2018/11/2018-11-02-events.json')
#data = json.load(obj.get()['Body'])
#data
json.load(obj.get()['Body'])

In [None]:
SONG_DATA = config.get("S3","BUCKET")
DataBucket = s3.Bucket(SONG_DATA)
for obj in DataBucket.objects.filter(Prefix='song_data'):
    print(obj)
   

In [None]:
obj = s3.Object(SONG_DATA, 'song_data/A/A/A/TRAAAAK128F9318786.json')
data = json.load(obj.get()['Body'])
data

## STEP1: IAM ROLE

In [5]:
try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path = '/',
        RoleName = DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS servers on your behalf",
        AssumeRolePolicyDocument = json.dumps(
            {'Statement': [{'Action':'sts:AssumeRole',
                           'Effect':'Allow',
                           'Principal':{'Service':'redshift.amazonaws.com'}}],
            'Version': '2012-10-17'})
    )
except Exception as e:
    print(e)
    
print('1.2 Attching Policy')
## Attaching Read-only Access
iam.attach_role_policy(RoleName = DWH_IAM_ROLE_NAME,
                      PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess",
                      )['ResponseMetadata']['HTTPStatusCode']

print('1.3 Get  the IAM Role ARN')
roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attching Policy
1.3 Get  the IAM Role ARN


## Redshift Cluster

In [6]:
try:
    response = redshift.create_cluster(
        ClusterType = DWH_CLUSTER_TYPE,
        NodeType = DWH_NODE_TYPE,
        NumberOfNodes = int(DWH_NUM_NODES),
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
        DBName = DWH_DB,
        MasterUsername = DWH_DB_USER,
        MasterUserPassword = DWH_DB_PASSWORD,
        IamRoles = [roleArn]
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


### Describe the cluster to see its status

In [7]:
def prettyRedShiftProps(props):
        
    pd.set_option('display.max_colwidth',-1)
    keysToShow = ["ClusterIdentifier","NodeType","ClusterStatus","MasterUsername","DBName","Endpoint","NumberOfNodes","VpcId"]
    x = [(k,v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns = ["Key","Value"])
    
myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedShiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.c77uvrb0sszx.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-c023bab8
7,NumberOfNodes,4


In [8]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.c77uvrb0sszx.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::426439643124:role/dwhRole


### Open an incoming TCP port to access the cluster endpoint

In [9]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName = defaultSg.group_name,
        CidrIp = '0.0.0.0/0',
        IpProtocol = 'TCP',
        FromPort = int(DWH_PORT),
        ToPort = int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-95be3bce')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## Connect to the cluster

In [10]:
%load_ext sql

In [11]:
conn_string = "postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)
%sql $conn_string

'Connected: dwhuser@dwh'

In [12]:
%%sql
SELECT *
FROM staging_events
LIMIT 5;

 * postgresql://dwhuser:***@dwhcluster.c77uvrb0sszx.us-west-2.redshift.amazonaws.com:5439/dwh
8056 rows affected.
(psycopg2.ProgrammingError) syntax error at or near "LIMIT"
LINE 1: LIMIT 5;
        ^

[SQL: LIMIT 5;]
(Background on this error at: http://sqlalche.me/e/f405)


In [62]:
!create_tables.py

/bin/sh: create_tables.py: command not found


In [64]:
%%sql SELECT *
FROM songs;

 * postgresql://dwhuser:***@dwhcluster.c77uvrb0sszx.us-west-2.redshift.amazonaws.com:5439/dwh
(psycopg2.ProgrammingError) relation "songs" does not exist

[SQL: SELECT *
FROM songs;]
(Background on this error at: http://sqlalche.me/e/f405)


In [66]:
!ls

1101.json        Untitled.ipynb   dwh.cfg          sql_queries.py
README (1).md    create_tables.py etl.py


In [67]:
!python create_tables.py

Traceback (most recent call last):
  File "create_tables.py", line 32, in <module>
    main()
  File "create_tables.py", line 22, in main
    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
  File "/anaconda3/lib/python3.6/site-packages/psycopg2/__init__.py", line 130, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: could not translate host name "dwhCluster" to address: nodename nor servname provided, or not known

