In [1]:
import json
import boto3
import botocore
import pandas as pd
from pandas import DataFrame
import os
import logging

In [2]:
LOG_FORMAT = "%(levelname)s %(asctime)s %(message)s"
logging.basicConfig(filename="logs/download_sampler.log"
                   , format = LOG_FORMAT
                   , level = logging.INFO )
logger = logging.getLogger();

In [3]:
raw_data_urls_file = "config/raw_dataurls.json"
raw_data_urls = json.load(open(raw_data_urls_file))

In [4]:
s3_resource= boto3.resource('s3')

# Iterate over the array of objects to be procesed
1. Download each object from S3 raw data 
2. Sample the file currently uses head -10000
3. Read the CSV file and cleanse the file to remove ControlM characters. Done using pandas
4. Write the CSV from Pandas dataframe
5. Upload the cleansed object to your personal S3 bucket. 
6. Remove the following
   a. Cleansed sample file
   b. Sample file
   c. Raw data file downloaded

In [5]:
for index in range( len(raw_data_urls['rawdatafiles']) ):
    try:
        logger.info('Start Processing - ' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'])
        
        logger.info('Started Downloading ' 
                    + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] 
                    + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'] 
                    + ' --> '
                    + raw_data_urls['rawdatafiles'][index]['raw_data_download_loc']
                   )
        s3_resource.Bucket(raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt']
                          ).download_file(raw_data_urls['rawdatafiles'][index]['raw_data_s3_object']
                                   , raw_data_urls['rawdatafiles'][index]['raw_data_download_loc'])
        logger.info('Finished Downloading ' 
                    + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] 
                    + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'] 
                    + ' --> '
                    + raw_data_urls['rawdatafiles'][index]['raw_data_download_loc']
                   )
        samplefile_tocleanse = raw_data_urls['rawdatafiles'][index]['sample_processed_file'] + '.tocleanse'
        
        # Use os.system head command to sample the file and create a file for cleansing. 
        cmd = "head -10000 " + raw_data_urls['rawdatafiles'][index]['raw_data_download_loc'] + ' > ' + samplefile_tocleanse
        os.system (cmd)
        logger.info ('Generated sample file to be cleansed ' + samplefile_tocleanse )
        
        # Handle any cleansing activity using the Python dataframes
        df = pd.read_csv(samplefile_tocleanse)
        df.to_csv(raw_data_urls['rawdatafiles'][index]['sample_processed_file'], sep=',' , index=False)
        
        logger.info ('Generated cleansed sample file ' + raw_data_urls['rawdatafiles'][index]['sample_processed_file'] )
        
        logger.info ('Started uploading ' + raw_data_urls['rawdatafiles'][index]['sample_processed_file'] 
                     + ' --> ' 
                     + raw_data_urls['rawdatafiles'][index]['cleansed_sampled_data_s3_bkt']
                     + '/' +  raw_data_urls['rawdatafiles'][index]['cleansed_sampled_data_s3_objet'] )
        s3_resource.Bucket(raw_data_urls['rawdatafiles'][index]['cleansed_sampled_data_s3_bkt']
                          ).upload_file(raw_data_urls['rawdatafiles'][index]['sample_processed_file']
                                    , raw_data_urls['rawdatafiles'][index]['cleansed_sampled_data_s3_objet'])
        logger.info ('Finished uploading ' + raw_data_urls['rawdatafiles'][index]['sample_processed_file'] 
                     + ' --> ' 
                     + raw_data_urls['rawdatafiles'][index]['cleansed_sampled_data_s3_bkt']
                     + '/' +  raw_data_urls['rawdatafiles'][index]['cleansed_sampled_data_s3_objet'] )
        
        os.remove(raw_data_urls['rawdatafiles'][index]['sample_processed_file'])
        logger.info ('Removed sample cleansed file ' + raw_data_urls['rawdatafiles'][index]['sample_processed_file'])
        
        os.remove(samplefile_tocleanse)
        logger.info('Removed sample file to be cleansed - ' + samplefile_tocleanse)
        
        os.remove(raw_data_urls['rawdatafiles'][index]['raw_data_download_loc'])
        logger.info('Removed the downloaded from S3 - ' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'])
        
        logger.info('Finished Processing - ' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'])
    except botocore.exceptions.ClientError as downloadexception:
        if downloadexception.response['Error']['Code'] == "404":
            logger.exception ('The object + '  + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'] + ' does not exist.' )
        else:
            logger.exception ('Unhandled exception while processing ' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_bkt'] + '/' + raw_data_urls['rawdatafiles'][index]['raw_data_s3_object'])
    except Exception as e:
            logger.exception('Unhandled generic exception!')