# Python: Snowflake & S3 -Crear una Tabla-

### Edgar M

## Introduccion


### Snowflake puede directamente crear tablas e importar la data desde S3, pero para crear la tabla se debe conocer la esctrutura del archivo, es decir se debe crear la tabla a mano. No se puede crear la tabla directamente desde la lectura del archivo desde S3 es decir no se puede hacer un select * into table_name from @S3_stage. 
### Lo que se pretende con este script es poder crear la tabla de forma dinamica, solo leyendo la cabecera del archivo.
### En el siguiente diagrama se prensenta la arquitectura que se utilizara para crear una tabla en Snowflake desde un archivo alojado en S3.
#### (Las bases y los esquemas en snowflake ya deben estar creados)

![Mi Imagen](snowflake.png)

# Dependencias

In [None]:
!pip install boto3
!pip install snowflake-connector-python

In [None]:
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import snowflake.connector
import boto3
from io import BytesIO
from botocore.exceptions import ClientError

# Conexion a S3 y Armado de la trama

### Las varibles de interes son el nombre del bucket y el nombre del archivo (recuerda que para los procesos de python con AWS, debes de tener configurado el AWS CLI con tus credenciales el en tu equipo)

In [None]:
bucket_name = 'bucket'
file_key = 'Credit_Card_Customer_Data.csv'
table_name="OUR_FIRST_DB.PUBLIC.Credit_Card_Customer_Data"

### Definimos una funcion que realizara la descargar solo de los primeros bytes del archivo y el armado de la trama para crear la tabla


In [None]:
def s3_exe(bucket,file,table_n):
    # Configura el cliente S3 con la región 'sa-east-1'
    s3 = boto3.client('s3', region_name='sa-east-1')
    # Descargar solo los primeros bytes del archivo
    response = s3.get_object(Bucket=bucket, Key=file, Range='bytes=0-1024')
    # Leer solo el encabezado (primera fila) del archivo CSV
    df = pd.read_csv(BytesIO(response['Body'].read()), nrows=0)

    # Mostrar las columnas del archivo
    tabla=""
    #print(df.dtypes)
    for i in df.columns:
        tabla=tabla+i.replace(" ","_")+' VARCHAR(30),'
    #definir nombre de la tabla
    
    create="""
    CREATE OR REPLACE TABLE $table_name(
    """
    create=create+tabla[:len(tabla)-1]+");"
    #agregar el nombre de la tabla
    create=create.replace("$table_name",table_n)
    print(create)
    return create

## Conexion con snowflake

### Una vez que ya tenemo el query de la creacion de la tabla, necesitamos enviar ese query a Snowflake. Para lo cual definimos una funcion que realiza la conexion y envio de querys

### Las credenciales las debes obtener de la cuenta que tiene en Snowflake que tiene un trial de 30 dias

In [None]:
def snoflak(query):
    # Conexión a Snowflake
    conn = snowflake.connector.connect(
        user='user',
        password='***.',
        account='sjbpirq-****',
        warehouse='COMPUTE_WH',
        database='OUR_FIRST_DB',
        schema='PUBLIC'
    )
    # Crear un cursor
    cursor = conn.cursor()

    try:
        # Ejecutar una consulta SQL
        cursor.execute(query)

        # Imprimir los resultados
        for row in cursor:
            print(row)
# cerrar el cursor
    finally:
        cursor.close()
        conn.close()


## Creamos la tabla

In [None]:
# enviar el query para que la tabla sea creada
query=s3_exe(bucket_name,file_key,table_name)
snoflak(query)


## Cargar los datos en la nueva tabla

### Primero debemos crear un Stage externo en Snowflake que estara enlazado a nuestro bucket

### Las creadenciales del bucket para crear el stage en snowflake deben ser ingresados dentros de la consulta. Tambien se puede configurar para snowflake se registre a nuestro bucket por defecto, pero para este ejercicio lo vamos a configurar en el query 

In [None]:
# creamos una funcio que arme la trama para la creacion de un stage 

def crear_stage_ext(stage_l,bucket_n):
        # Crear una sesión de Boto3
    session = boto3.Session()
    # Obtener las credenciales actuales
    credentials = session.get_credentials()
    try:
    # Acceder a las credenciales
        current_credentials = credentials.get_frozen_credentials()
    except NoCredentialsError:
        print("No se encontraron credenciales configuradas.")
    query="""
    CREATE OR REPLACE STAGE $stage_name
    url='s3://$bucket'
    credentials=(aws_key_id='$key' aws_secret_key='$secret');
"""
    query=query.replace('$key',current_credentials.access_key).replace('$secret',current_credentials.secret_key)\
        .replace("$stage_name",stage_l).replace("$bucket",bucket_n)
    return query

### El stage es un objeto como una tabla por ende debe estar localizado en una base y un esquema

In [None]:
stage_locate="MANAGE_DB.external_stages.aws_stage"
query=crear_stage_ext(stage_locate,bucket_name) #se genera el query para la creacion
snoflak(query) # se crea el stage

# Una vez creado el Stage se procedera a cargar los datos en la tabla

In [None]:
# Se crea una funcio para armar el query 
def copy_to(stage,table,file_name):
    query="""
COPY INTO $table_name
    FROM @$stage
    file_format= (type = csv field_delimiter=',' skip_header=1)
    files = ('$file');
"""
    query=query.replace("$table_name",table).replace("$file",file_name).replace('$stage',stage)
    
    print(query)
    return query

In [None]:
query=copy_to(stage_locate,table_name,file_key)
snoflak(query) # se carga la data
