In [None]:
import os
import paramiko
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from io import StringIO
import logging

In [None]:
pip install paramiko

In [1]:
import os
import paramiko
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from io import StringIO
import logging

# === Setup Logging ===
os.makedirs("logs", exist_ok=True)
logging.basicConfig(
    filename="logs/transfer.log",
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

# === Load Environment Variables from Deployment Server ===
SFTP_HOST = os.environ["SFTP_HOST"]
SFTP_PORT = int(os.getenv("SFTP_PORT", 22))
SFTP_USER = os.environ["SFTP_USER"]
SFTP_PASSWORD = os.environ["SFTP_PASSWORD"]

SF_USER = os.environ["SNOWFLAKE_USER"]
SF_PASSWORD = os.environ["SNOWFLAKE_PASSWORD"]
SF_ACCOUNT = os.environ["SNOWFLAKE_ACCOUNT"]
SF_WAREHOUSE = os.environ["SNOWFLAKE_WAREHOUSE"]
SF_DATABASE = os.environ["SNOWFLAKE_DATABASE"]
SF_SCHEMA = os.environ["SNOWFLAKE_SCHEMA"]

REMOTE_DIR = "/upload/"     # Remote SFTP path to CSV files
LOCAL_TMP = "/tmp"          # Temporary folder on deployment server

# === Snowflake Connection Function ===
def connect_snowflake():
    return snowflake.connector.connect(
        user=SF_USER,
        password=SF_PASSWORD,
        account=SF_ACCOUNT,
        warehouse=SF_WAREHOUSE,
        database=SF_DATABASE,
        schema=SF_SCHEMA
    )

# === Download CSV Files from SFTP ===
def download_csvs_from_sftp():
    logging.info("Connecting to SFTP server...")
    transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
    transport.connect(username=SFTP_USER, password=SFTP_PASSWORD)
    sftp = paramiko.SFTPClient.from_transport(transport)

    downloaded_files = []
    for file_attr in sftp.listdir_attr(REMOTE_DIR):
        file_name = file_attr.filename
        if not file_name.endswith(".csv"):
            continue

        local_path = os.path.join(LOCAL_TMP, file_name)
        sftp.get(REMOTE_DIR + file_name, local_path)
        downloaded_files.append(local_path)
        logging.info(f"Downloaded: {file_name}")

    sftp.close()
    return downloaded_files

# === Load CSV Data into Existing Snowflake Tables ===
def load_csv_to_snowflake(file_path, conn):
    table_name = os.path.splitext(os.path.basename(file_path))[0].lower()
    df = pd.read_csv(file_path)

    logging.info(f"Uploading {file_path} → Snowflake table: {table_name}")
    try:
        success, nchunks, nrows, _ = write_pandas(
            conn,
            df,
            table_name=table_name,
            overwrite=False
        )
        logging.info(f"✅ Uploaded {nrows} rows to {table_name} in {nchunks} chunks")
    except Exception as e:
        logging.error(f"❌ Failed to upload to {table_name}: {str(e)}")

# === Main Pipeline ===
def run_pipeline():
    try:
        conn = connect_snowflake()
        csv_files = download_csvs_from_sftp()

        for file_path in csv_files:
            load_csv_to_snowflake(file_path, conn)
            os.remove(file_path)
            logging.info(f"Removed temp file: {file_path}")

        conn.close()
        logging.info("✅ Pipeline completed successfully.")
    except Exception as e:
        logging.error(f"❌ Pipeline failed: {str(e)}")
        raise

# === Run Script ===
if __name__ == "__main__":
    run_pipeline()

ModuleNotFoundError: No module named 'snowflake'