In [None]:
import os
import snowflake.connector as sf_c
import boto3
import pandas as pd
from concurrent.futures import ThreadPoolExecutor

s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')

CEDL_HOME = os.environ['CEDL_HOME']
nexus_connectionProfile = CEDL_HOME + '/etc/.sf.nexus.profile'
s3_connectionProfile = CEDL_HOME + '/etc/.s3_connection_profile'



class SnowflakeLoader:
       def __init__(self, profile_path, snowflake_config):
            self.profile_path = profile_path
            self.snowflake_config = snowflake_config
            self.conn = None

       def snowFlake_Connection(self):
            try:
                pathExist = os.path.exists(nexus_connectionProfile)
                if (not pathExist):
                    print('The profile {} doesn''t exist'.format(nexus_connectionProfile))
                    exit(1)
                profileFile = open(nexus_connectionProfile)
                for line in profileFile:
                    if (line.split('=')[0] == 'snowflakeAccount'):
                        snowflakeAccount = line.split('=')[1].replace('\n', '')
                    elif (line.split('=')[0] == 'snowflakeUsername'):
                        snowflakeUsername = line.split('=')[1].replace('\n', '')
                    elif (line.split('=')[0] == 'snowflakePassword'):
                        snowflakePassword = line.split('=')[1].replace('\n', '')
                    elif (line.split('=')[0] == 'snowflakeRole'):
                        snowflakeRole = line.split('=')[1].replace('\n', '')
                    elif (line.split('=')[0] == 'snowflakeDBName'):
                        snowflakeDBName = line.split('=')[1].replace('\n', '')
                    elif (line.split('=')[0] == 'snowflakeWarehouse'):
                        snowflakeWarehouse = line.split('=')[1].replace('\n', '')
                    else:
                        pass
                profileFile.close()
                if (len(snowflakeAccount) == 0 or len(snowflakeUsername) == 0 or len(snowflakePassword) == 0 or len(
                        snowflakeRole) == 0 or len(snowflakeDBName) == 0 or len(snowflakeWarehouse) == 0):
                    print('some parameters are missing from {}'.format(nexus_connectionProfile))
                    exit(1)
                self.conn = sf_c.connect(user=snowflakeUsername, password=snowflakePassword, account=snowflakeAccount,
                                    warehouse=snowflakeWarehouse, database=snowflakeDBName)
                print("connected to SNOWFLAKE Database.")
            except sf_c.Error as e:
                print('Error connecting to SNOWFLAKE Database - {}'.format(e))
                exit(1)
            return self.conn

       def load_files_into_snowflake(self,folder_path, snowflake_config):
            # List all files in the specified folder
            files = os.listdir(folder_path)

            # Create a connection to Snowflake

            conn = self.snowFlake_Connection()
            print("Connection - > inside load_files_into_snowflake")
            cursor = conn.cursor()

            # Create a dictionary to store tables
            tables = {}

            # Iterate through each file in the folder
            for i, file in enumerate(files):
                # Check if the file is a CSV file (you can modify this condition based on your file types)
                if file.endswith(".csv"):
                    # Load the file into a DataFrame
                    file_path = os.path.join(folder_path, file)
                    data = pd.read_csv(file_path)

                    # Optionally, you can perform additional data processing or manipulation here

                    # Upload the DataFrame to Snowflake
                    table_name = file.split(".")[0]  # Use the file name as the table name
                    self.upload_to_snowflake(conn, data, table_name,snowflake_config.get('database'), snowflake_config.get('schema'))

                    # Store the DataFrame in the dictionary with the specified table name
                    tables[table_name] = data

            # Close the Snowflake connection
            conn.close()

            return tables


       def upload_to_snowflake(self,conn, data, table_name, database, schema):
            data = data.where(pd.notna(data), None)

            cursor = self.conn.cursor()

            create_table_sql = f"CREATE OR REPLACE TABLE {self.snowflake_config['database']}.{self.snowflake_config['schema']}.{table_name} ({', '.join([f'{col} STRING' for col in data.columns])})"
            cursor.execute(create_table_sql)

            cursor.executemany(f"INSERT INTO {self.snowflake_config['database']}.{self.snowflake_config['schema']}.{table_name} VALUES ({', '.join(['%s']*len(data.columns))})", data.values.tolist())

            add_load_ts_column_sql = f"ALTER TABLE {self.snowflake_config['database']}.{self.snowflake_config['schema']}.{table_name} ADD COLUMN LOAD_TS TIMESTAMP"
            cursor.execute(add_load_ts_column_sql)

            update_load_ts_sql = f"UPDATE {self.snowflake_config['database']}.{self.snowflake_config['schema']}.{table_name} SET LOAD_TS = CURRENT_TIMESTAMP()"
            cursor.execute(update_load_ts_sql)

            self.conn.commit()


# Example usage:
folder_path = "/AppDev/CEDL/etl/SrcFiles/lg/foo"
nexus_connectionProfile = CEDL_HOME + '/etc/.sf.nexus.profile'
s3_connectionProfile = CEDL_HOME + '/etc/.s3_connection_profile'

snowflake_config = {
    'database': 'NEXUS',
    'schema': 'TEMP'
}
loader = SnowflakeLoader(nexus_connectionProfile,snowflake_config)
loader.snowFlake_Connection()
loaded_tables = loader.load_files_into_snowflake(folder_path, snowflake_config)

# Access the loaded tables using their names
for table_name, table_data in loaded_tables.items():
    print(f"\nTable: {table_name}")
    print(table_data.head())
