# Funciones para interaccionar con AWS Redshift desde python

## Table to Dataframe

In [39]:
import boto3
import pandas as pd
import time

def table_to_dataframe(table,schema,database='landing_zone', NUM_ENTRIES = 0, cluster_identifier = 'redshift-data', db_user = 'admintreinta'):
    """
    Ejecuta una consulta SQL en Amazon Redshift y devuelve los resultados como un DataFrame de pandas.
    
    Parámetros:
    - table : Tabla a copiar en un dataframe
    - schema: schema de la tabla a copiar en un datafrmae
    - database: database de la tabla a copiar en un dataframe
    - LIMIT: limitar entries
    - cluster_identifier: Identificador del cluster de Amazon Redshift.
    - database: Nombre de la base de datos.
    - db_user: Usuario de la base de datos.
    
    
    Retorna:
    - Un DataFrame de pandas con los resultados de la consulta.
    """
    client = boto3.client('redshift-data')
    sql_query = f"SELECT * FROM {database}.{schema}.{table}"
    if NUM_ENTRIES != 0:
        sql_query = sql_query + f"LIMIT {NUM_ENTRIES}"
        
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql_query
    )

    statement_id = response['Id']
    
    # Espera hasta que la consulta se haya completado
    status = ''
    while status not in ['FINISHED', 'FAILED', 'ABORTED']:
        time.sleep(5)  # Espera 5 segundos antes de verificar el estado nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Current status: {status}")
    
    if status == 'FINISHED':
        response1 = client.get_statement_result(Id=statement_id)
        
        # Extrayendo los nombres de las columnas de la metadata de columnas
        column_metadata = response1['ColumnMetadata']
        column_names = [column['name'] for column in column_metadata]
        
        # Construyendo el DataFrame
        df = pd.DataFrame([[field.get('stringValue', '') for field in record] for record in response1['Records']], columns=column_names)
        
        return df
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(f"Error: {error_message}")
    else:
        print("La operación fue abortada o no se completó exitosamente.")
    return pd.DataFrame()  # Retorna un DataFrame vacío si la consulta falla

OK

In [40]:
DATA = table_to_dataframe("frequency","public","landing_zone")
DATA

Current status: FINISHED


Unnamed: 0,id,months_frequency,discount
0,,1,0.0
1,,12,0.4
2,,6,0.25


ERROR

In [41]:
DATA = table_to_dataframe("frequency","pu123blic","landing_zone")
DATA

Current status: FAILED
Error: ERROR: Could not find parent table for alias "landing_zone.pu123blic.frequency".


## Volcar el contenido de una query en un Dataframe

In [42]:
import boto3
import pandas as pd
import time

def query_to_dataframe(sql_query, cluster_identifier = 'redshift-data', database = "landing_zone", db_user = 'admintreinta'):
    """
    Ejecuta una consulta SQL en Amazon Redshift y devuelve los resultados como un DataFrame de pandas.
    
    Parámetros:
    - sql_query: Consulta SQL para ejecutar.
    - cluster_identifier: Identificador del cluster de Amazon Redshift.
    - database: Nombre de la base de datos.
    - db_user: Usuario de la base de datos.
    
    
    Retorna:
    - Un DataFrame de pandas con los resultados de la consulta.
    """
    client = boto3.client('redshift-data')
    
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql_query
    )

    statement_id = response['Id']
    
    # Espera hasta que la consulta se haya completado
    status = ''
    while status not in ['FINISHED', 'FAILED', 'ABORTED']:
        time.sleep(5)  # Espera 5 segundos antes de verificar el estado nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Current status: {status}")
    
    if status == 'FINISHED':
        response1 = client.get_statement_result(Id=statement_id)
        
        # Extrayendo los nombres de las columnas de la metadata de columnas
        column_metadata = response1['ColumnMetadata']
        column_names = [column['name'] for column in column_metadata]
        
        # Construyendo el DataFrame
        df = pd.DataFrame([[field.get('stringValue', '') for field in record] for record in response1['Records']], columns=column_names)
        
        return df
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(f"Error: {error_message}")
    else:
        print("La operación fue abortada o no se completó exitosamente.")
    return pd.DataFrame()  # Retorna un DataFrame vacío si la consulta falla

OK

In [43]:
DATA_1 = query_to_dataframe("SELECT * FROM landing_zone.public.frequency;", cluster_identifier = 'redshift-data', database = "landing_zone", db_user = 'admintreinta')
DATA_1

Current status: FINISHED


Unnamed: 0,id,months_frequency,discount
0,,1,0.0
1,,12,0.4
2,,6,0.25


ERROR

In [44]:
DATA_1 = query_to_dataframe("SELECT * FROFDM landing_zone.public.frequency;", cluster_identifier = 'redshift-data', database = "landing_zone", db_user = 'admintreinta')

Current status: FAILED
Error: ERROR: syntax error at or near "FROFDM"
  Position: 10


## Subimos un dataframe a s3

In [45]:
from io import BytesIO
import datetime
import uuid
import pandas as pd
import boto3

def dataframe_to_s3(df, bucket, endpoint='data_lake', object_name=''):
    # Generar un sello de tiempo con el formato deseado
    timestamp = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
    year = datetime.datetime.now().strftime('%Y')
    month = datetime.datetime.now().strftime('%m')
    day = datetime.datetime.now().strftime('%d')
    # Generar un identificador único (puedes reemplazarlo por cualquier otra cadena aleatoria si prefieres)
    if object_name == '':
        object_name = uuid.uuid4().hex
        
    object_path = f"{endpoint}/{year}/{month}/{day}/{object_name}_{timestamp}.csv.gz"

    # Usar BytesIO para datos binarios
    csv_buffer = BytesIO()
    df.to_csv(csv_buffer, index=False, compression='gzip')
    
    # Es necesario mover el puntero del buffer al inicio después de escribir en él
    csv_buffer.seek(0)

    s3_resource = boto3.resource('s3')
    s3_resource.Object(bucket, object_path).put(Body=csv_buffer.getvalue())
    
    return f's3://{bucket}/{object_path}'


OK

In [46]:
df_test = pd.DataFrame({
    'columna1': [42, 27]  # Dos valores tipo int
})

print(df_test)

   columna1
0        42
1        27


In [47]:
dataframe_to_s3(df_test,"ms-redshift-core")

's3://ms-redshift-core/data_lake/2024/01/24/576f049edaf248749ad35c61d8143ad9_2024_01_24_15_30_47_080742.csv.gz'

KO

In [48]:
dataframe_to_s3(df_test1,"ms-redsh3ift-core")

NoSuchBucket: An error occurred (NoSuchBucket) when calling the PutObject operation: The specified bucket does not exist

In [None]:
import boto3
import time

def load_s3_to_redshift(table,schema, s3_object_path, database='landing_zone', cluster_identifier='redshift-data', db_user='admintreinta'):
    client = boto3.client('redshift-data')
    sql = f"""
        COPY {database}.{schema}.{table}
        FROM '{s3_object_path}'
        IAM_ROLE default
        delimiter ','
        IGNOREHEADER 1
        GZIP
        CSV;
    """

    print(sql)
    # Ejecuta el comando COPY
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql
    )
    
    statement_id = response['Id']
    
    # Espera a que la ejecución termine
    status = 'STARTED'
    while status in ['SUBMITTED', 'STARTED', 'PICKED']:
        time.sleep(5)  # Espera 5 segundos antes de consultar nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Estado actual: {status}")

    # Verifica el resultado de la ejecución
    if status == 'FINISHED':
        print("La carga ha sido exitosa.")
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(f"Error al truncar la tabla: {error_message}")
    else:
        print("La operación fue abortada o no se completó exitosamente.")
    
        
    return response



OK

In [None]:
s3_path_test = 's3://ms-redshift-core/data_lake/2024/01/24/f21e65852e944a2f8516075de529ecd6_2024_01_24_15_19_13_372618.csv.gz'
load_s3_to_redshift("test1245", "public", s3_path_test, cluster_identifier='redshift-data', database='landing_zone', db_user='admintreinta')


        COPY landing_zone.public.test1245
        FROM 's3://ms-redshift-core/data_lake/2024/01/24/f21e65852e944a2f8516075de529ecd6_2024_01_24_15_19_13_372618.csv.gz'
        IAM_ROLE default
        delimiter ','
        IGNOREHEADER 1
        GZIP
        CSV;
    
Estado actual: FINISHED
La carga ha sido exitosa.


{'ClusterIdentifier': 'redshift-data',
 'CreatedAt': datetime.datetime(2024, 1, 24, 15, 24, 31, 212000, tzinfo=tzlocal()),
 'Database': 'landing_zone',
 'DbUser': 'admintreinta',
 'Id': '4364cebe-03c6-4940-9e55-6d5db9bea0e8',
 'ResponseMetadata': {'RequestId': '4364cebe-03c6-4940-9e55-6d5db9bea0e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4364cebe-03c6-4940-9e55-6d5db9bea0e8',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '160',
   'date': 'Wed, 24 Jan 2024 20:24:31 GMT'},
  'RetryAttempts': 0}}

KO

In [None]:
s3_path_test = 's12313://ms-redshift-core/data_lake/2024/01/24/f21e65852e944a2f8516075de529ecd6_2024_01_24_15_19_13_372618.csv.gz'
load_s3_to_redshift("test1245", "public",s3_path_test, cluster_identifier='redshift-data', database='landing_zone', db_user='admintreinta')


        COPY test1245
        FROM 's12313://ms-redshift-core/data_lake/2024/01/24/f21e65852e944a2f8516075de529ecd6_2024_01_24_15_19_13_372618.csv.gz'
        IAM_ROLE default
        delimiter ','
        IGNOREHEADER 1
        GZIP
        CSV;
    
Estado actual: FAILED
Error al truncar la tabla: ERROR: LOAD source is not supported. (Hint: only S3 or DynamoDB or EMR based load is allowed)


{'ClusterIdentifier': 'redshift-data',
 'CreatedAt': datetime.datetime(2024, 1, 24, 15, 22, 3, 60000, tzinfo=tzlocal()),
 'Database': 'landing_zone',
 'DbUser': 'admintreinta',
 'Id': '507e6572-d672-4863-9d06-8a86d4dcf152',
 'ResponseMetadata': {'RequestId': '507e6572-d672-4863-9d06-8a86d4dcf152',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '507e6572-d672-4863-9d06-8a86d4dcf152',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '159',
   'date': 'Wed, 24 Jan 2024 20:22:03 GMT'},
  'RetryAttempts': 0}}

In [None]:
def dataframe_to_redshift(df,table,schema,bucket,database='landing_zone',endpoint = 'data_lake',object_name = False,db_user ='admintreinta', cluster_identifier = 'redshift-data'):
    s3_object_path = dataframe_to_s3(df, bucket, endpoint, object_name)
    output = load_s3_to_redshift(table,schema, s3_object_path, cluster_identifier, database, db_user)
    return output

In [None]:
df_test = pd.DataFrame({
    'columna1': [42, 27]  # Dos valores tipo int
})

print(df_test)

In [None]:
dataframe_to_redshift(df_test,"test1245","public","ms-redshift-core")


        COPY landing_zone.public.test1245
        FROM 's3://ms-redshift-core/data_lake/2024/01/24/False_2024_01_24_15_26_16_764772.csv.gz'
        IAM_ROLE default
        delimiter ','
        IGNOREHEADER 1
        GZIP
        CSV;
    
Estado actual: FINISHED
La carga ha sido exitosa.


{'ClusterIdentifier': 'redshift-data',
 'CreatedAt': datetime.datetime(2024, 1, 24, 15, 26, 19, 222000, tzinfo=tzlocal()),
 'Database': 'landing_zone',
 'DbUser': 'admintreinta',
 'Id': '48972e79-5fa9-4800-9f0d-39953bedfd5f',
 'ResponseMetadata': {'RequestId': '48972e79-5fa9-4800-9f0d-39953bedfd5f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '48972e79-5fa9-4800-9f0d-39953bedfd5f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '160',
   'date': 'Wed, 24 Jan 2024 20:26:19 GMT'},
  'RetryAttempts': 0}}

In [None]:
dataframe_to_redshift(df_test,"test113245","public","ms-redshift-core")


        COPY landing_zone.public.test113245
        FROM 's3://ms-redshift-core/data_lake/2024/01/24/False_2024_01_24_15_26_36_942945.csv.gz'
        IAM_ROLE default
        delimiter ','
        IGNOREHEADER 1
        GZIP
        CSV;
    
Estado actual: FAILED
Error al truncar la tabla: ERROR: Cannot COPY into nonexistent table test113245


{'ClusterIdentifier': 'redshift-data',
 'CreatedAt': datetime.datetime(2024, 1, 24, 15, 26, 39, 765000, tzinfo=tzlocal()),
 'Database': 'landing_zone',
 'DbUser': 'admintreinta',
 'Id': '4fa9f183-34bc-47f2-865d-de3b6883bc3f',
 'ResponseMetadata': {'RequestId': '4fa9f183-34bc-47f2-865d-de3b6883bc3f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4fa9f183-34bc-47f2-865d-de3b6883bc3f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '160',
   'date': 'Wed, 24 Jan 2024 20:26:39 GMT'},
  'RetryAttempts': 0}}

In [None]:
def execute_SP(store_procedure,schema,database = "landing_zone", cluster_identifier = 'redshift-data', db_user = 'admintreinta'):
    """
    Ejecuta una un store en Amazon Redshift
    
    Parámetros:
    - store_procedure: nombre del store procedure a ejecutar
    - schema: Esquema en el que se encuentra el store procedure.
    - database: Nombre de la base de datos.
    - cluster_identifier: Identificador del cluster de Amazon Redshift.
    - db_user: Usuario de la base de datos.
    
    
    Retorna:
    - Un DataFrame de pandas con los resultados de la consulta.
    """
    client = boto3.client('redshift-data')
    
    sql_query = f"CALL {database}.{schema}.{store_procedure}()"
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql_query
    )

    statement_id = response['Id']
    
    # Espera hasta que la consulta se haya completado
    status = ''
    while status not in ['FINISHED', 'FAILED', 'ABORTED']:
        time.sleep(5)  # Espera 5 segundos antes de verificar el estado nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Current status: {status}")
    if status == 'FINISHED':
        print ('Store Procedure ejecutado')
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(error_message)
    else:
        print("La operación fue abortada o no se completó exitosamente.")
        return 0

In [50]:
def truncate_table(table, schema, database = "landing_zone", cluster_identifier = 'redshift-data', db_user = 'admintreinta'):
    """
    Ejecuta una un store en Amazon Redshift
    
    Parámetros:
    - table: tabla a truncar
    - schema: Esquema en el que se encuentra el store procedure.
    - store_procedure: nombre del store procedure a ejecutar
    - cluster_identifier: Identificador del cluster de Amazon Redshift.
    - database: Nombre de la base de datos.
    - db_user: Usuario de la base de datos.
    
    
    Retorna:
    - Un DataFrame de pandas con los resultados de la consulta.
    """
    client = boto3.client('redshift-data')
    
    sql_query = f"TRUNCATE {database}.{schema}.{table}"
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql_query
    )

    statement_id = response['Id']
    
    # Espera hasta que la consulta se haya completado
    status = ''
    while status not in ['FINISHED', 'FAILED', 'ABORTED']:
        time.sleep(5)  # Espera 5 segundos antes de verificar el estado nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Current status: {status}")
    
    if status == 'FINISHED':
        print ('Store Procedure ejecutado')
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(f"Error al truncar la tabla: {error_message}")
    else:
        print("La operación fue abortada o no se completó exitosamente.")
    return 0

OK

In [51]:
truncate_table("test1245","public")

Current status: FINISHED
Store Procedure ejecutado


0

KO

In [52]:
truncate_table("test1245123","public")

Current status: FAILED
Error al truncar la tabla: ERROR: relation "public.test1245123" does not exist


0

## DROP de una tabla

In [57]:
def drop_table(table, schema, database = "landing_zone", cluster_identifier = 'redshift-data', db_user = 'admintreinta'):
    """
    Ejecuta una un store en Amazon Redshift
    
    Parámetros:
    - table: tabla a truncar
    - schema: Esquema en el que se encuentra el store procedure.
    - database: Nombre de la base de datos.
    - store_procedure: nombre del store procedure a ejecutar
    - cluster_identifier: Identificador del cluster de Amazon Redshift.
    
    - db_user: Usuario de la base de datos.
    
    
    Retorna:
    - Un DataFrame de pandas con los resultados de la consulta.
    """
    client = boto3.client('redshift-data')
    
    sql_query = f"DROP TABLE {database}.{schema}.{table}"
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql_query
    )

    statement_id = response['Id']
    
    # Espera hasta que la consulta se haya completado
    status = ''
    while status not in ['FINISHED', 'FAILED', 'ABORTED']:
        time.sleep(5)  # Espera 5 segundos antes de verificar el estado nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Current status: {status}")
    
    if status == 'FINISHED':
        print ('Taabla eliminada!')
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(f"Error al eliminar la tabla: {error_message}")
    else:
        print("La operación fue abortada o no se completó exitosamente.")
        return 0

OK

In [58]:
drop_table("test1245","public")

Current status: FINISHED
Taabla eliminada!


KO

In [59]:
drop_table("test1241245","public")

Current status: FAILED
Error al eliminar la tabla: ERROR: Table "test1241245" does not exist


In [60]:
def sql_query(sql_query, database = "landing_zone",cluster_identifier = 'redshift-data', db_user = 'admintreinta'):
    """
    Ejecuta una un store en Amazon Redshift
    
    Parámetros:
    - sql_query: query a ejecutar en SQL
    - database: Nombre de la base de datos.
    - cluster_identifier: Identificador del cluster de Amazon Redshift.
    - db_user: Usuario de la base de datos.
    
    
    Retorna:
    - Un DataFrame de pandas con los resultados de la consulta.
    """
    client = boto3.client('redshift-data')
    
    response = client.execute_statement(
        ClusterIdentifier=cluster_identifier,
        Database=database,
        DbUser=db_user,
        Sql=sql_query
    )

    statement_id = response['Id']
    
    # Espera hasta que la consulta se haya completado
    status = ''
    while status not in ['FINISHED', 'FAILED', 'ABORTED']:
        time.sleep(5)  # Espera 5 segundos antes de verificar el estado nuevamente
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']
        print(f"Current status: {status}")
    
    if status == 'FINISHED':
        print ('Query ejecutada!')
    elif status == 'FAILED':
        # Obtiene y muestra el mensaje de error
        error_message = status_response.get('Error', 'No se proporcionó información de error.')
        print(f"Error ejecutando la query SQL: {error_message}")
    else:
        print("La operación fue abortada o no se completó exitosamente.")

        return 0

OK

In [62]:
sql_query("CREATE TABLE public.test1245 (columna1 integer ENCODE az64) DISTSTYLE AUTO;")

Current status: FINISHED
Taabla eliminada!


KO