In [1]:
pip install cryptography

Note: you may need to restart the kernel to use updated packages.


In [5]:
import os
import psycopg2
from cryptography.fernet import Fernet


In [7]:
# Generate a key for encryption and decryption
key = Fernet.generate_key()

# Save the key to a file
with open("secret.key", "wb") as key_file:
    key_file.write(key)

print("Encryption key generated and saved to 'secret.key'")

Encryption key generated and saved to 'secret.key'


In [8]:
# Load the key from a file
def load_key():
    return open("secret.key", "rb").read()

key = load_key()
cipher_suite = Fernet(key)

# Encrypt the username and password
def encrypt_message(message):
    return cipher_suite.encrypt(message.encode())

# Decrypt the username and password
def decrypt_message(encrypted_message):
    return cipher_suite.decrypt(encrypted_message).decode()

In [9]:
import snowflake.connector
from snowflake.connector import ProgrammingError

In [12]:
def connect_to_postgres(user, password, host, port, database):
    try:
        conn = psycopg2.connect(
            user=user,
            password=password,
            host=host,
            port=port,
            database=database
        )
        print("Connection to PostgreSQL established successfully.")
        return conn
    except Exception as e:
        print(f"Failed to connect to PostgreSQL: {e}")
        return None


In [13]:
def connect_to_snowflake(user, password, account, warehouse, database, schema):
    try:
        conn = snowflake.connector.connect(
            user=user,
            password=password,
            account=account,
            warehouse=warehouse,
            database=database,
            schema=schema
        )
        print("Connection to Snowflake established successfully.")
        return conn
    except Exception as e:
        print(f"Failed to connect to Snowflake: {e}")
        return None

In [14]:
def fetch_data_from_postgres(conn):
    try:
        cursor = conn.cursor()
        fetch_query = """
        SELECT
            meals.id AS product_id,
            (ordertable.order_quantity * meals.price) AS amount,
            ordertable.created_at AS date,
            ordertable.customer_name AS user_id
        FROM
            public.ordertable
        JOIN
            public.meals ON ordertable.meal_id = meals.id;
        """
        cursor.execute(fetch_query)
        data = cursor.fetchall()
        cursor.close()
        return data
    except Exception as e:
        print(f"Error fetching data from PostgreSQL: {e}")
        return None

In [16]:
def insert_data_to_snowflake(conn, data):
    try:
        cursor = conn.cursor()
        insert_query = """
        INSERT INTO fact_schema.transaction (product_id, amount, date, user_id)
        VALUES (%s, %s, %s, %s);
        """
        cursor.executemany(insert_query, data)
        conn.commit()
        cursor.close()
        print("Data inserted into 'transaction' table in schema 'fact_schema' successfully.")
    except Exception as e:
        print(f"Error inserting data into Snowflake: {e}")

def sync_postgres_to_snowflake(pg_details, sf_details):
    # PostgreSQL connection details
    pg_user, pg_password, pg_host, pg_port, pg_database = pg_details

    # Snowflake connection details
    sf_user, sf_password, sf_account, sf_warehouse, sf_database, sf_schema = sf_details

    # Connect to PostgreSQL
    pg_conn = connect_to_postgres(pg_user, pg_password, pg_host, pg_port, pg_database)
    if not pg_conn:
        return

    # Fetch data from PostgreSQL
    data = fetch_data_from_postgres(pg_conn)
    if not data:
        pg_conn.close()
        return

    # Close PostgreSQL connection
    pg_conn.close()

    # Connect to Snowflake
    sf_conn = connect_to_snowflake(sf_user, sf_password, sf_account, sf_warehouse, sf_database, sf_schema)
    if not sf_conn:
        return

    # Insert data into Snowflake
    insert_data_to_snowflake(sf_conn, data)

    # Close Snowflake connection
    sf_conn.close()

if __name__ == "__main__":
    # PostgreSQL connection details
    pg_details = ('postgres_user', 'postgres_password', 'postgres_host', 'postgres_port', 'postgres_database')

    # Snowflake connection details
    sf_details = ('snowflake_user', 'snowflake_password', 'snowflake_account', 'snowflake_warehouse', 'snowflake_database', 'snowflake_schema')

    # Sync data from PostgreSQL to Snowflake
    sync_postgres_to_snowflake(pg_details, sf_details)

Failed to connect to PostgreSQL: invalid integer value "postgres_port" for connection option "port"

