## We are going to use PostGreSQL for the ETL pipeline
### 1. Create a database in PostGreSQL
### 2. Use psycopg2 to connect to the database
### 3. Use boto3 to connect to S3
### 4. Create tables in the database
### 5. Load data from S3 to the database
### 6. Run quality checks on the data



In [None]:
#psycopg2
#boto3

In [None]:
import boto3
import psycopg2
from psycopg2 import sql
import pandas as pd
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)

# --- Step 2: Store Data in S3 ---
def upload_to_s3(file_path, bucket_name, object_name=None):
    s3 = boto3.client('s3')
    
    if object_name is None:
        object_name = file_path.split('/')[-1]
        
    try:
        s3.upload_file(file_path, bucket_name, object_name)
        logging.info(f"Uploaded {file_path} to {bucket_name}/{object_name}")
    except Exception as e:
        logging.error(f"Error uploading {file_path} to {bucket_name}/{object_name}. Error: {e}")

# --- Step 3: Prepare AWS PostgreSQL Database ---
def connect_to_postgres():
    return psycopg2.connect(
        dbname="your_db_name",
        user="your_user",
        password="your_password",
        host="your_host",
        port="your_port"
    )

def create_table_in_postgres():
    with connect_to_postgres() as conn:
        with conn.cursor() as cur:
            # Adjust this SQL according to your schema
            create_table_sql = """
            CREATE TABLE IF NOT EXISTS your_table_name (
                column1 data_type1,
                column2 data_type2,
                ...
            );
            """
            cur.execute(create_table_sql)
            conn.commit()
            logging.info("Table created successfully in PostgreSQL.")

# --- Step 4: Load Data from S3 to PostgreSQL ---
def load_data_from_s3_to_postgres(bucket_name, object_name):
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=bucket_name, Key=object_name)
    data = pd.read_csv(obj['Body'])
    
    with connect_to_postgres() as conn:
        data.to_sql('your_table_name', conn, index=False, if_exists='replace', method='multi')
        logging.info("Data loaded successfully from S3 to PostgreSQL.")

# --- Step 5: Data Validation ---
def run_validation_checks():
    with connect_to_postgres() as conn:
        with conn.cursor() as cur:
            # Sample validation check: Adjust accordingly
            cur.execute("SELECT COUNT(*) FROM your_table_name WHERE your_column IS NULL;")
            null_count = cur.fetchone()[0]
            
            if null_count > 0:
                raise ValueError(f"There are {null_count} null values in your_column.")
            logging.info("Data validation successful.")

# --- Step 6: Monitoring & Maintenance ---
def backup_postgres():
    # Implement backup logic here. This often depends on your infrastructure.
    # You may use tools like `pg_dump` or AWS-native solutions for RDS backups.
    pass

def cleanup_old_s3_files(bucket_name, age_days):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    
    # Logic to delete files older than age_days
    for obj in bucket.objects.all():
        if (obj.last_modified - pd.Timestamp.now()).days > age_days:
            obj.delete()
            logging.info(f"Deleted old file {obj.key} from S3.")

if __name__ == "__main__":
    # Adjust these parameters accordingly
    file_path = "path_to_your_data_file.csv"
    bucket_name = "your_s3_bucket_name"
    
    # Upload to S3
    upload_to_s3(file_path, bucket_name)
    
    # Setup PostgreSQL Table
    create_table_in_postgres()
    
    # Load data from S3 to PostgreSQL
    object_name = file_path.split('/')[-1]  # Assuming object_name is filename
    load_data_from_s3_to_postgres(bucket_name, object_name)
    
    # Validate data
    run_validation_checks()
    
    # Backup PostgreSQL
    backup_postgres()
    
    # Cleanup old S3 files
    cleanup_old_s3_files(bucket_name, age_days=30)
