In [17]:
import boto3
import logging
import psycopg2
import json
import sys
from psycopg2 import sql

In [2]:
catalog.list()

['processed_p18_19_5',
 'p18_19_0',
 'p18_19_1',
 'p18_19_3',
 'p18_19_5',
 'qld_p18_19',
 'qld_all',
 'parameters']

In [3]:
df = catalog.load('processed_p18_19_5')

2023-05-18 21:24:18,504 - kedro.io.data_catalog - INFO - Loading data from 'processed_p18_19_5' (CSVDataSet)...


In [18]:
table_name = 'processed_p18_19_5'

In [12]:
column_dict = {col: "TEXT" if dtype == "object" else "INTEGER" for col, dtype in df.dtypes.items()}
column_dict

{'Other_entities': 'TEXT',
 'Headcount_Female': 'INTEGER',
 'Headcount_Male': 'INTEGER',
 'Headcount_Total': 'INTEGER',
 'Percentage_Female': 'INTEGER',
 'Percentage_Male': 'INTEGER'}

In [13]:
# Create the columns string for the SQL query
columns = ', '.join(f"{k} {v}" for k, v in column_dict.items())
columns

'Other_entities TEXT, Headcount_Female INTEGER, Headcount_Male INTEGER, Headcount_Total INTEGER, Percentage_Female INTEGER, Percentage_Male INTEGER'

In [14]:
# Convert the DataFrame to a list of tuples
values = [tuple(row) for row in df.to_numpy()]
values

[('Legal Aid Queensland',
  480,
  143,
  623,
  0.7704654895666132,
  0.2295345104333868),
 ('Office of the Health Ombudsman',
  100,
  42,
  142,
  0.704225352112676,
  0.2957746478873239),
 ('Queensland Art Gallery',
  234,
  131,
  365,
  0.6410958904109589,
  0.3589041095890411),
 ('Queensland Family and Child Commission',
  56,
  17,
  73,
  0.7671232876712328,
  0.2328767123287671),
 ('Queensland Human Rights Commission',
  35,
  9,
  44,
  0.7954545454545454,
  0.2045454545454545),
 ('Queensland Museum', 219, 110, 329, 0.6656534954407295, 0.3343465045592705),
 ('Resources Safety and Health Queensland',
  120,
  176,
  296,
  0.4054054054054054,
  0.5945945945945946),
 ('State Library of Queensland',
  255,
  98,
  353,
  0.7223796033994334,
  0.2776203966005666),
 ('Trade and Investment Queensland',
  83,
  52,
  135,
  0.6148148148148148,
  0.3851851851851852),
 ('Sector sub-total: Other entities',
  1582,
  778,
  2360,
  0.6703389830508475,
  0.3296610169491525),
 ('Sector t

In [23]:
column_names = list(column_dict.keys())
column_names

In [16]:
# Use string interpolation to dynamically generate the SQL statement to insert the data
values_sql = ','.join(['%s'] * len(values)[0])
values_sql

'%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s'

In [19]:
# Generate a placeholder string. The placeholder represent one tuple of values.
placeholders = sql.SQL(', ').join(sql.Placeholder() * len(values)[0])

# SQL quert to execute
query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
    sql.Identifier(table_name),
    sql.SQL(', ').join(map(sql.Identifier, column_names)),
    placeholders
)

In [20]:
query

Composed([SQL('INSERT INTO '), Identifier('processed_p18_19_5'), SQL(' ('), Composed([Identifier('Other_entities, Headcount_Female, Headcount_Male, Headcount_Total, Percentage_Female, Percentage_Male')]), SQL(') VALUES ('), Composed([Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder(), SQL(', '), Placeholder()]), SQL(')')])

In [None]:
def load_into_rds(df):
    """
    This function creates a new rds database table and writes records to it
    """

    # Retrieve the secret
    client = boto3.client('secretsmanager')
    response = client.get_secret_value(SecretId='credentials')
    secret = response['SecretString']

    # RDS settings
    rds_host = "terraform-20230412032239110300000001.cz9iamvdbbik.ap-southeast-2.rds.amazonaws.com"
    user_name = "gundalai"
    password = secret
    db_name = "data_store"

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # Create the database connection
    try:
        conn = psycopg2.connect(host=rds_host, user=user_name, password=password, database=db_name, connect_timeout=5)
    except psycopg2.DatabaseError as e:
        logger.error("ERROR: Unexpected error: Could not connect to PostgreSQL instance.")
        logger.error(e)
        sys.exit()

    logger.info("SUCCESS: Connection to rds PostgreSQL instance succeeded")


    # Define the table name and column names/types based on the DataFrame's name and columns
    table_name = 'processed_p18_19_5'
    columns = {col: "varchar(255)" if dtype == "object" else "integer" for col, dtype in df.dtypes.items()}

    # Convert the DataFrame to a list of tuples
    values = [tuple(row) for row in df.to_numpy()]

    # Add the dataframe as a SQL table
    item_count = 0
    # Use string interpolation to dynamically generate the SQL statement to insert the data
    values_sql = ','.join(['%s'] * len(values))
    sql_string = f"INSERT INTO {table_name} ({', '.join(columns.keys())}) VALUES ({values_sql})"

    with conn.cursor() as cur:
        cur.execute(
            # TODO add attributes
            f"CREATE TABLE if not exists {table_name} ({', '.join(columns.keys())})")
        cur.executemany(sql_string, values)
        conn.commit()
        cur.execute(f"SELECT * FROM {table_name}")
        logger.info("The following items have been added to the database:")
        for row in cur:
            item_count += 1
            logger.info(row)
    conn.commit()

    # Close the connection
    cur.close()
    conn.close()

    print("Added %d items to rds PostgreSQL table" % (item_count))

In [None]:
# Lambda function that tests if it can connect and retrieve a credential from a secrets manager using boto3
import json
import boto3
from botocore.exceptions import ClientError
import logging
import psycopg2
import json
import sys

def get_secret():

    secret_name = "db_credentials"
    region_name = "ap-southeast-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    # Decrypts secret using the associated KMS key.
    secret = get_secret_value_response['SecretString']

    # Your code goes here.
    return secret




def lambda_handler(event, context):
    try:
        secret = get_secret()
        print("Secret retrieved")
        sm_response_body = "Successfully retrieved secret from Secrets Manager."
        status_code = 200
    except Exception as e:
        print("Error occurred:", e)
        sm_response_body = f"Failed to retrieve secret from Secrets Manager: {str(e)}"
        status_code = 500

    # RDS settings
    rds_host = "terraform-20230509135541811400000001.cz9iamvdbbik.ap-southeast-2.rds.amazonaws.com"
    user_name = "gundalai"
    password = secret
    db_name = "data_store"

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # Create the database connection
    try:
        conn = psycopg2.connect(host=rds_host, user=user_name, password=password, database=db_name, connect_timeout=5)
        rds_response_body = "Successfully connected to PostgreSQL instanse"
    except psycopg2.DatabaseError as e:
        logger.error("ERROR: Unexpected error: Could not connect to PostgreSQL instance.")
        logger.error(e)
        rds_response_body = "ERROR: Unexpected error: Could not connect to PostgreSQL instance."
        sys.exit()


    message = event['Records'][0]['body']
    data = json.loads(message)
    CustID = data['CustID']
    Name = data['Name']

    item_count = 0
    sql_string = f"insert into Customer (CustID, Name) values({CustID}, '{Name}')"

    with conn.cursor() as cur:
        cur.execute("create table if not exists Customer ( CustID  int NOT NULL, Name varchar(255) NOT NULL, PRIMARY KEY (CustID))")
        cur.execute(sql_string)
        conn.commit()
        cur.execute("select * from Customer")
        logger.info("The following items have been added to the database:")
        for row in cur:
            item_count += 1
            logger.info(row)
    conn.commit()

    db_response_body = "Added %d items to RDS PostgreSQL table" % (item_count)

    return {
        'statusCode': status_code,
        'sm_body': json.dumps(sm_response_body),
        'rds_body': json.dumps(rds_response_body),
        'db_body' : json.dumps(db_response_body)
    }

In [None]:
# Lambda function that tests if it can connect and retrieve a credential from a secrets manager using boto3
import json
import boto3
from botocore.exceptions import ClientError
import logging
import psycopg2
import json
import sys
import pandas as pd


def get_secret():

    secret_name = "db_credentials"
    region_name = "ap-southeast-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    # Decrypts secret using the associated KMS key.
    secret = get_secret_value_response['SecretString']

    # Your code goes here.
    return secret


def lambda_handler(event, context):
    # Extract the bucket and object key from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    print(bucket)
    print(key)

    # Create an S3 client
    s3_client = boto3.client('s3')

    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        csv_bytes = response['Body'].read()

        # Read the CSV file into a DataFrame
        df = pd.read_csv(io.BytesIO(csv_bytes))
        print(df.head())

    except Exception as e:
        print(f"Error retrieving file from S3: {str(e)}")

    try:
        secret = get_secret()
        print("Secret retrieved")
        sm_response_body = "Successfully retrieved secret from Secrets Manager."
        status_code = 200
    except Exception as e:
        print("Error occurred:", e)
        sm_response_body = f"Failed to retrieve secret from Secrets Manager: {str(e)}"
        status_code = 500

    # RDS settings
    rds_host = "terraform-20230509135541811400000001.cz9iamvdbbik.ap-southeast-2.rds.amazonaws.com"
    user_name = "gundalai"
    password = secret
    db_name = "data_store"

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # Create the database connection
    try:
        conn = psycopg2.connect(host=rds_host, user=user_name, password=password, database=db_name, connect_timeout=5)
        rds_response_body = "Successfully connected to PostgreSQL instanse"
    except psycopg2.DatabaseError as e:
        logger.error("ERROR: Unexpected error: Could not connect to PostgreSQL instance.")
        logger.error(e)
        rds_response_body = "ERROR: Unexpected error: Could not connect to PostgreSQL instance."
        sys.exit()

    # Define the table name and column names/types based on the DataFrame's name and columns
    table_name = 'processed_p18_19_5'
    columns = {col: "varchar(255)" if dtype == "object" else "integer" for col, dtype in df.dtypes.items()}

    # Convert the DataFrame to a list of tuples
    values = [tuple(row) for row in df.to_numpy()]

    # Add the dataframe as a SQL table
    item_count = 0
    # Use string interpolation to dynamically generate the SQL statement to insert the data
    values_sql = ','.join(['%s'] * len(values))
    sql_string = f"INSERT INTO {table_name} ({', '.join(columns.keys())}) VALUES ({values_sql})"

    with conn.cursor() as cur:
        cur.execute(
            # TODO add attributes
            f"CREATE TABLE if not exists {table_name} ({', '.join(columns.keys())})")
        cur.executemany(sql_string, values)
        conn.commit()
        cur.execute(f"SELECT * FROM {table_name}")
        logger.info("The following items have been added to the database:")
        for row in cur:
            item_count += 1
            logger.info(row)
    conn.commit()

    # Close the connection
    cur.close()
    conn.close()

    print("Added %d items to rds PostgreSQL table" % (item_count))

In [None]:
import boto3
import pandas as pd
from io import BytesIO

s3_client = boto3.client('s3')

def lambda_handler(event, context):
   try:
       bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
       s3_file_name = event["Records"][0]["s3"]["object"]["key"]
       resp = s3_client.get_object(Bucket=bucket_name, Key=s3_file_name)

       ###########################################
       # one of these methods should work for you.
       # Method 1
       df = pd.read_csv(resp['Body'], sep=',')
       #
       # Method 2
       # df_s3_data = pd.read_csv(BytesIO(resp['Body'].read().decode('utf-8')))
       ###########################################
       print(df.head())

   except Exception as err:
      print(err)

In [None]:
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import psycopg2
import logging
from psycopg2 import sql



def lambda_handler(event, context):

    # Set the CloudWatch logger
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # Secrets Manager settings
    secret_name = "db_credentials"
    region_name = "ap-southeast-2"

    # RDS database settings
    host = "terraform-20230509135541811400000001.cz9iamvdbbik.ap-southeast-2.rds.amazonaws.com"
    user = "gundalai"
    password = get_secret(region_name, secret_name)
    database = "data_store"
    connect_timeout = 5

    s3_client = boto3.client('s3')

    df = get_dataframe(event, s3_client)

    table_name, primary_key, values, attr_names, attr_names_dtypes = process_dataframe_for_sql(event, df)

    conn = connect_database(host, user, password, database, connect_timeout)

    create_table(conn, table_name, attr_names_dtypes, primary_key)

    add_values_to_table(conn, table_name, values, attr_names)

    conn.close()


def get_dataframe(event, s3_client):
    """
    Create a dataframe from a csv file stored in a S3 bucket using the trigger event
    """

    try:
       # Get the object from the bucket
       bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
       file_name = event["Records"][0]["s3"]["object"]["key"]
       resp = s3_client.get_object(Bucket=bucket_name, Key=file_name)

       # Create a dataframe from the csv file
       df = pd.read_csv(resp['Body'], sep=',')

       print("Created a dataframe from the stored file with the following head lines: ")
       print(df.head())

    except Exception as e:
       print(f"Error retrieving file from S3: {str(e)}")

    return df


def get_secret(region_name, secret_name, primary_key):
    """
    Get a secret stored in Secrets Manager
    """

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
        # Decrypts the secret string using the associated KMS key.
        secret = get_secret_value_response['SecretString']
        print("Secret retrieved")

    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        print("Error occurred:", e)
        raise e

    return secret


def connect_database(host, user, password, database, connect_timeout):
    """
    Establish the database connection
    """

    try:
        conn = psycopg2.connect(host=rds_host, user=user_name, password=password, database=db_name, connect_timeout=5)
        print ("Successfully connected to PostgreSQL instanse")
    except psycopg2.DatabaseError as e:
        logger.error("ERROR: Unexpected error: Could not connect to PostgreSQL instance.")
        logger.error(e)

    return conn


def process_dataframe_for_sql(event, df):
    """
    This function takes a DataFrame and prepares it for a SQL query.
    """

    # Define the table name based on the csv file's name
    table_name = event["Records"][0]["s3"]["object"]["key"]

    # Define the first column of the dataframe as the primary key of the table
    primary_key = df.columns[0]

    # Define attribute's names/types based on the DataFrame's columns
    attr_dict = {col: "TEXT" if dtype == "object" else "INTEGER" for col, dtype in df.dtypes.items()}

    # Update the primary key
    for k, v in attr_dict.items():
        if k == primary_key:
            v += ' PRIMARY KEY'
            return

    attr_names_dtypes = ', '.join(f"{k} {v}" for k, v in attr_dict.items()

    attr_names = list(attr_dict.keys())

    # Convert the DataFrame to a list of tuples
    values = [tuple(row) for row in df.to_numpy()]

    return table_name, primary_key, values, attr_names, attr_names_dtypes


def create_table(conn, table_name, attr_names_dtypes)
    """
    Create a table in the database if not already exists
    """

    with conn.cursor() as cursor:
        try:
            cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({attr_names_dtypes})")
            conn.commit()
            print(f"Succesfully created the table: {table_name}")
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1

        cursor.close()


def add_values_to_table(conn, table_name, values, attr_names)
    """
    Add values to the table in the database
    """
    item_count = 0

    # Generate placeholders string. The placeholder represent one tuple of values.
    placeholders = sql.SQL(', ').join(sql.Placeholder() * len(values[0]))

    # SQL query to execute
    query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
        sql.Identifier(table_name),
        sql.SQL(', ').join(map(sql.Identifier, attr_names)),
        placeholders
    )

    with conn.cursor() as cursor:
        try:
            cursor.executemany(query, values)
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1
        print("execute_many() done")

        cursor.execute(f"SELECT * FROM {table_name}")
        logger.info("The following items have been added to the database:")
        for row in cursor:
            item_count += 1
            logger.info(row)
        conn.commit()

        print("Added %d items to rds PostgreSQL table" % (item_count))

        cursor.execute(f"DROP TABLE {table_name}")

        cursor.close()

In [3]:
s = "03_processed/processed_p18_19_5.csv"
table_name = s.split('/')[-1].split('.')[0]
print(table_name)

processed_p18_19_5
