In [33]:
import boto3
import io
import csv
from snowflake.snowpark import Session
import datetime

connection_parameters = {
   "account": "dr31778.ca-central-1.aws",
   "user": "PRAVEEN11001",
   "password": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
   "role": "ACCOUNTADMIN",  # optional
   "warehouse": "COMPUTE_WH",  # optional
   "database": "DEMO",  # optional
   "schema": "s3"  # optional
}

session = Session.builder.configs(connection_parameters).create()

In [34]:
aws_access_key = 'XXXXXXXXXXXXXXXX'
aws_secret_key = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
aws_s3_bucket_name = 'praveeng-s3'  
aws_s3_object = 'chunked_csv/'

s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key)

In [35]:
response = s3.list_objects_v2(Bucket=aws_s3_bucket_name, Prefix='xmls')

for obj in response['Contents']:
    key = obj['Key']
    if key.endswith('set.csv'):
        obj_size = obj['Size']
        response = s3.get_object(Bucket=aws_s3_bucket_name, Key=key)
        file_content = response['Body'].read().decode('utf-8')

        if obj_size > 10 * 1024:
            chunk_size = 10 * 1024  
            reader = csv.reader(io.StringIO(file_content))
            headers = next(reader)  
            chunks = []
            current_chunk = [','.join(headers)]  # Include header in the first chunk
            current_chunk_size = len(','.join(headers).encode('utf-8'))

            for row in reader:
                row_str = ','.join(row)
                row_size = len(row_str.encode('utf-8'))
                if current_chunk_size + row_size <= chunk_size:
                    current_chunk.append(row_str)
                    current_chunk_size += row_size
                else:
                    chunks.append(current_chunk)
                    current_chunk = [','.join(headers), row_str]  
                    current_chunk_size = len(','.join(headers).encode('utf-8')) + row_size

            if current_chunk:
                chunks.append(current_chunk)

            for i, chunk in enumerate(chunks):
                partition_key = f"{aws_s3_object}{key.split('/')[1].split('.')[0]}_{i}.csv".replace('-', '_')
                partition_data = '\n'.join(chunk)
                s3.put_object(Bucket=aws_s3_bucket_name, Key=partition_key, Body=partition_data.encode('utf-8'))
                print(f"Uploaded partition {i + 1} of {key} to S3 with key: {partition_key}")
        else:
            s3.put_object(Bucket=aws_s3_bucket_name, Key=f"{aws_s3_object}{key.split('/')[1]}".replace('-', '_'), Body=file_content.encode('utf-8'))
            print(f"Uploaded {aws_s3_object}{key.split('/')[1].replace('-', '_')} to S3.")

Uploaded partition 1 of xmls/student-dataset.csv to S3 with key: chunked_csv/student_dataset_0.csv
Uploaded partition 2 of xmls/student-dataset.csv to S3 with key: chunked_csv/student_dataset_1.csv
Uploaded partition 3 of xmls/student-dataset.csv to S3 with key: chunked_csv/student_dataset_2.csv


In [36]:
def delete_s3_objects(bucket_name, prefix):
    try:
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        if 'Contents' in response:
            for obj in response['Contents']:
                s3.delete_object(Bucket=bucket_name, Key=obj['Key'])
                print(f"Deleted object: {obj['Key']}")
        else:
            print(f"No objects found with prefix: {prefix}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

In [37]:
def copy_into_sf(session, folder_name, filename):

    copy_date = datetime.datetime.utcnow()
    
    copy_result = session.sql(f"""
    COPY INTO AWS_STUDENT_DATASET_S3(filename, ID, NAME, NATIONALITY, CITY, 
                              LATITUDE, LONGITUDE, GENDER, ethinic_group, 
                              AGE, ENGLISH_GRADE, MATH_GRADE, SCIENCE_GRADE, 
                              LANGUAGE_GRADE, PORTFOLIO_RATING, COVERLETTER_RATING, 
                              REFLETTER_RATING, copy_date)
    FROM 
        (SELECT '{filename}' AS filename, 
            $1 AS ID, 
            $2 AS NAME, 
            $3 AS NATIONALITY, 
            $4 AS CITY, 
            $5 AS LATITUDE, 
            $6 AS LONGITUDE,
            $7 AS GENDER, 
            $8 AS ethinic_group, 
            $9 AS AGE, 
            $10 AS ENGLISH_GRADE, 
            $11 AS MATH_GRADE, 
            $12 AS SCIENCE_GRADE, 
            $13 AS LANGUAGE_GRADE, 
            $14 AS PORTFOLIO_RATING, 
            $15 AS COVERLETTER_RATING, 
            $16 AS REFLETTER_RATING,
            '{copy_date}' AS copy_date 
        FROM @s3_stage/{folder_name}{filename}.csv)
        
    ON_ERROR='SKIP_FILE'
    FILE_FORMAT= (FORMAT_NAME=CSV_FORMAT);
    """).collect()

    print(copy_result)

In [38]:
# for all formatted csv file 
response = s3.list_objects_v2(Bucket=aws_s3_bucket_name, Prefix=aws_s3_object)

for obj in response['Contents']:
    key = obj['Key']
    if key.endswith('.csv'):
        filename = key.split('/')[-1].split('.')[0]  
        copy_into_sf(session, aws_s3_object, filename)

[Row(file='s3://praveeng-s3/chunked_csv/student_dataset_0.csv', status='LOADED', rows_parsed=118, rows_loaded=118, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
[Row(file='s3://praveeng-s3/chunked_csv/student_dataset_1.csv', status='LOADED', rows_parsed=116, rows_loaded=116, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
[Row(file='s3://praveeng-s3/chunked_csv/student_dataset_2.csv', status='LOADED', rows_parsed=73, rows_loaded=73, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]


In [39]:
delete_s3_objects(aws_s3_bucket_name, aws_s3_object)

Deleted object: chunked_csv/student_dataset_0.csv
Deleted object: chunked_csv/student_dataset_1.csv
Deleted object: chunked_csv/student_dataset_2.csv
