# Creando un Clúster de Redshift usando AWS SDK con Python

## Instalar AWS SDK para Python

In [None]:
#!pip install boto3

## PASO 1: Asegurar que se cuenta con AWS secret key y access key

- Crear un usuario IAM en su cuenta de AWS
- Asignar la política `AdministratorAccess` al usuario creado
- Guardar access key y secret  key
- Editar el archivo `dwh.cfg` y configurar:
<font color='red'>
<BR>
[AWS]<BR>
KEY= YOUR_AWS_KEY<BR>
SECRET= YOUR_AWS_SECRET<BR>
ID= YOUR USER NAME<BR>
<font/>


## PASO 2: Importar librerías y cargar parámetros desde archivo

In [None]:
import pandas as pd
import boto3
import json
import configparser

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')
AWS_REGION             = config.get('REGION','AWS_REGION')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

S3_LOG_JDATA           = config.get("S3", "LOG_DATA")
S3_LOG_JPATH           = config.get("S3", "LOG_JSONPATH")
S3_SONG_DATA           = config.get("S3", "SONG_DATA")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

## PASO 3: Crear clientes para IAM, EC2, S3 y Redshift

In [None]:
ec2 = boto3.resource('ec2',
                       region_name=AWS_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name=AWS_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name=AWS_REGION
                  )

redshift = boto3.client('redshift',
                       region_name=AWS_REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

#### Listar el contenido de un Bucket de S3

In [None]:
#sampleDbBucket =  s3.Bucket("axity-mybucket01")
#for obj in sampleDbBucket.objects.all():
#    print(obj)

# Este es un set de datos de ejemplo en AWS
sampleDbBucket =  s3.Bucket("awssampledbuswest2")
for obj in sampleDbBucket.objects.filter(Prefix="ssbgz"):
    print(obj)

## PASO 4: IAM ROLE
- Crear un IAM role que asigne permisos a Redshift para acceder a un Bucket de S3 (ReadOnly)

In [None]:
from botocore.exceptions import ClientError

#1.1 Crear role
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

## PASO 5: Clúster de Redshift

- Crear Clúster de RedShift
- Para más detalle sobre `create_cluster`, ver [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        #NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

### 5.1 Monitorear el estatus del Clúster
- Ejecutar este bloque de código varias veces hasta que el clúster tenga un estatus de `Available`

In [None]:
def prettyRedshiftProps(props):
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

<h2> 5.2 Tomar nota del <font color='red'> endpoint y role ARN </font> del clúster  </h2>

<font color='red'>NO EJECUTAR ESTE BLOQUE a menos que el estatus del clúster sea "Available"</font>

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ROLE_ARN :: ", roleArn)
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)


## PASO 6: Abrir un puerto TCP de entrada para permitir el acceso al endpoint del clúster

In [None]:
try:   
    defaultSg = ec2.SecurityGroup(id=myClusterProps['VpcSecurityGroups'][0]['VpcSecurityGroupId'])
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

## PASO 7: Validar que se puede establecer una conexión

In [None]:
#!pip install psycopg2-binary

In [None]:
import psycopg2

In [None]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT))
cur = conn.cursor()

In [None]:
staging_events_table_create= ("""CREATE TABLE IF NOT EXISTS staging_events(
                        artist          VARCHAR,
                        auth            VARCHAR,
                        firstName       VARCHAR,
                        gender          VARCHAR,
                        itemInSession   INT,
                        lastName        VARCHAR,
                        length          FLOAT,
                        level           VARCHAR,
                        location        VARCHAR,
                        method          VARCHAR,
                        page            VARCHAR,
                        registration    VARCHAR,
                        sessionId       INT,
                        song            VARCHAR,
                        status          INT,
                        ts              VARCHAR,
                        userAgent       VARCHAR,
                        userId          INT
                        )""")

In [None]:
cur.execute(staging_events_table_create)
conn.commit()

In [None]:
staging_songs_table_create = ("""CREATE TABLE IF NOT EXISTS staging_songs(
                        num_songs       INT,
                        artist_id       VARCHAR,
                        artist_latitude VARCHAR,
                        artist_longitude VARCHAR,
                        artist_location VARCHAR,
                        artist_name     VARCHAR,
                        song_id         VARCHAR,
                        title           VARCHAR,
                        duration        FLOAT,
                        year            INT
                        )""")

In [None]:
cur.execute(staging_songs_table_create)
conn.commit()

In [None]:
staging_events_copy = ("""
    copy staging_events
    from {}
    credentials 'aws_iam_role={}'
    region '{}'
    json {}
""").format(S3_LOG_JDATA, DWH_ROLE_ARN, AWS_REGION, S3_LOG_JPATH)

In [None]:
print(staging_events_copy)

In [None]:
cur.execute(staging_events_copy)
conn.commit()

In [None]:
staging_songs_copy = ("""
    copy staging_songs
    from {}
    credentials 'aws_iam_role={}'
    region '{}'
    json 'auto'
""").format(S3_SONG_DATA, DWH_ROLE_ARN, AWS_REGION)

In [None]:
print(staging_songs_copy)

In [None]:
cur.execute(staging_songs_copy)
conn.commit()

In [None]:
conn.close()

<font color='blue'>Probar AWS Query editor o con un cliente de base de datos (Postgres)</font>

## PASO 8: Limpiar los recursos creados

In [None]:
# Eliminar clúster
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

<font color='red'>Ejecutar este bloque de código varias veces hasta que el clúster se haya eliminado</font>

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
# Eliminar IAM role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)