In [None]:
SELECT CURRENT_ROLE()

In [None]:
CREATE OR REPLACE FUNCTION get_EN_secret()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
HANDLER = 'get_EN_secret'
EXTERNAL_ACCESS_INTEGRATIONS = (EN_INTEGRATION)
SECRETS = ('cred' = EN_API_DB_20241007)
AS
$$
import _snowflake

def get_EN_secret():   
  my_api_key = _snowflake.get_generic_secret_string('cred') 
  return my_api_key
$$;

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()
results = session.sql('SELECT get_EN_secret()').collect()
EN_secret = results[0][0]
#EN_secret # successfully returns secret string

In [None]:
# Import required packages
from snowflake.snowpark.context import get_active_session
import pandas as pd
import requests
import os
import re
from datetime import datetime, timezone
import logging
import tempfile

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def validate_email(email):
    """Validate email format"""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def get_api_token(session):
    """Retrieve API token from Snowflake using UDF"""
    try:
        results = session.sql('SELECT get_EN_secret()').collect()
        return results[0][0]
    except Exception as e:
        logger.error(f"Error retrieving API token: {str(e)}")
        raise

def get_most_recent_file(session):
    """Get the most recent CSV file from the stage"""
    try:
        file_query = "LIST @CLASSYDATA.EN_API.NEW_FILES"
        list_files = session.sql(file_query).collect()
        most_recent = max([f for f in list_files if f['name'].endswith('.csv')], 
                         key=lambda x: x['last_modified'])
        return most_recent['name']
    except Exception as e:
        logger.error(f"Error getting most recent file: {str(e)}")
        raise

def read_and_validate_data(session, file_path):
    """Read CSV file and validate email addresses"""
    try:
        # Read file into Snowpark DataFrame
        snowpark_df = session.read.options({"field_delimiter": ",", "skip_header": 1}).csv(f"@CLASSYDATA.EN_API.{file_path}")
        snowpark_df = snowpark_df.toDF("EMAIL_ADDRESS", "CLASSY_WALK_2024")
        
        # Convert to pandas and ensure no index
        df = snowpark_df.to_pandas().reset_index(drop=True)
        
        # Debug output
        print("\n=== Initial DataFrame ===")
        print(df.head())
        print("Columns:", df.columns.tolist())
        
        # Validate emails
        invalid_emails = [email for email in df['EMAIL_ADDRESS'] if not validate_email(email)]
        if invalid_emails:
            raise ValueError(f"Invalid email formats found: {invalid_emails}")
        
        return df
    except Exception as e:
        logger.error(f"Error reading/validating data: {str(e)}")
        raise

def process_through_api(api_token, df, file_name):
    """Process data through Engaging Networks API"""
    try:
        # Get just the base filename without directory path
        base_file_name = os.path.basename(file_name)
        print(f"Using filename: {base_file_name}")
        
        # Debug: Show DataFrame before CSV creation
        print("\n=== DataFrame Before CSV Creation ===")
        print("Shape:", df.shape)
        print("Columns:", df.columns.tolist())
        print("First few rows:\n", df.head())
        
        # Create temporary file
        with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', newline='', delete=False, suffix='.csv') as tmp_file:
            # Write DataFrame to CSV without index
            df.to_csv(tmp_file.name, index=False, encoding='utf-8')
            
            # Debug: Verify CSV content
            print("\n=== CSV File Content ===")
            with open(tmp_file.name, 'r', encoding='utf-8') as f:
                csv_content = f.read()
                print(csv_content)
            print("CSV file size:", os.path.getsize(tmp_file.name), "bytes")
            
            # Prepare API request
            url = "https://us.engagingnetworks.app/ea-dataservice/import.service"
            current_time = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
            
            data = {
                'token': api_token,
                'name': f'Import from Snowflake {current_time}',
                'formatName': 'classy_to_EN_API'
            }
            
            # Send request using base filename
            with open(tmp_file.name, 'rb') as f:
                files = {'file': (base_file_name, f)}  # Use base filename here
                logger.info("Sending API request...")
                response = requests.post(url, data=data, files=files)
                logger.info(f"Response status code: {response.status_code}")
                logger.info(f"Response content: {response.text}")
            
            # Clean up
            os.unlink(tmp_file.name)
            
            # Check for error in response
            if "Error" in response.text or "error" in response.text.lower():
                raise requests.exceptions.RequestException(f"API returned error: {response.text}")
            
            return {
                "success": True,
                "response": response
            }
            
    except Exception as e:
        error_message = str(e)
        if 'response' in locals():
            error_message += f"\nAPI Response: {response.text}"
        return {
            "success": False,
            "error": error_message,
            "response": response if 'response' in locals() else None
        }

def write_to_table(session, df):
    """Write data to Snowflake table with timestamp"""
    try:
        # Create new DataFrame with correct column order and types
        current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
        
        # Create new DataFrame with timestamp
        new_df = pd.DataFrame({
            'INSERTED_AT': [current_time] * len(df),
            'EMAIL_ADDRESS': df['EMAIL_ADDRESS'],
            'CLASSY_WALK_2024': df['CLASSY_WALK_2024']
        })
        
        # Create Snowpark DataFrame with explicit schema
        snowpark_df = session.create_dataframe(
            new_df,
            schema=["INSERTED_AT", "EMAIL_ADDRESS", "CLASSY_WALK_2024"]
        )
        
        # Write to table
        snowpark_df.write.mode("append").save_as_table("CLASSYDATA.EN_API.EN_PUSHED_FUNDRAISERS_WALK2024")
        
        return True
    except Exception as e:
        logger.error(f"Error writing to table: {str(e)}")
        raise

def main():
    """Main execution flow"""
    session = get_active_session()
    
    try:
        # Get API token
        api_token = get_api_token(session)
        logger.info("Retrieved API token")
        
        # Get most recent file
        file_name = get_most_recent_file(session)
        logger.info(f"Selected file: {file_name}")
        
        # Read and validate data
        df = read_and_validate_data(session, file_name)
        logger.info(f"Email addresses validated successfully. {len(df)} records to process")
        
        # Process through API
        api_result = process_through_api(api_token, df, file_name)
        
        if api_result["success"]:
            # Only write to table if API call was successful
            write_to_table(session, df)
            logger.info("Data written to table successfully")
            
            return {
                "status": "success",
                "file_processed": file_name,
                "records_processed": len(df),
                "api_response": api_result["response"].text if api_result["response"] else "No response text"
            }
        else:
            error_info = {
                "status": "error",
                "error_type": "API_Error",
                "error_message": api_result["error"],
                "file_attempted": file_name,
                "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
            }
            logger.error(f"API Processing failed: {error_info}")
            return error_info
            
    except Exception as e:
        # Handle non-API errors
        error_info = {
            "status": "error",
            "error_type": type(e).__name__,
            "error_message": str(e),
            "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
        }
        logger.error(f"Processing failed: {error_info}")
        return error_info

# Execute main function
if __name__ == "__main__":
    result = main()
    print(result)