In this project, you will be developing a Python script using boto3 to create buckets on S3, and to store the event logs of Sparkify as JSON objects. In much the same way that you inserted documents (i.e., log data from Sparkify) into MongoDB, you will instead be putting records into the S3 bucket as JSON objects.

In [1]:
import logging
import boto3
from botocore.exceptions import ClientError
import pickle
import json
import os
from datetime import datetime

In [2]:
# Check buckets
s3 = boto3.client('s3')
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
    print(bucket['CreationDate'].ctime(), bucket['Name'])

In [3]:
# CODE FROM BOTO3 DOCUMENTATION
# (https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-creating-buckets.html)

def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

In [4]:
# Specify file path for log data
log_path = r'C:\Users\aambr\OneDrive\Documents\UNH Fall 2019\Distributed_Scalable\Module 7\7_1-MessageBrokers\log_data'

In [52]:
# Function to loop through directories to get json files and add to database
def extract_log(filepath,bucket):
    
    # Connect to s3 bucket
    s3 = boto3.client('s3')

    with os.scandir(filepath) as entries:
        for entry in entries:

        # If the item in the folder is a directory, recursively apply the function
            if entry.is_dir():
                print('Dir',entry.name)
                extract_log(entry,bucket)

        # If the item in the folder is a file, extract json and insert into db
            elif entry.is_file():                           
                print('Filename:\n',entry.name)
                print('Filepath:\n',entry.path)
                with open(entry.path) as f:
                    data = [json.loads(line) for line in f] 
            # Add date and week number to document using filename
                    for x in data:                                                                               
                        date = datetime.strptime(entry.name[:10], "%Y-%m-%d")
                        x['week_num'] = int(datetime.date(date).strftime("%W"))
                    # Key will be name of the original json file
                    key = entry.name
                    # Convert back to json object
                    data = json.dumps(data)
                    # Upload to s3 bucket
                    s3.put_object(Bucket=bucket,Key=key,Body=data)
            else:            
                print('Error')

In [53]:
new_bucket = 'sparkify3bucket'

In [54]:
# create_bucket(new_bucket)

In [2]:
# Check buckets
s3 = boto3.client('s3')
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
    print(bucket['CreationDate'].ctime(), bucket['Name'])

Thu Apr  9 16:23:31 2020 sparkify3bucket


In [56]:
extract_log(log_path,new_bucket)

Dir 2018
Dir 11
Filename:
 2018-11-01-events.json
Filepath:
 C:\Users\aambr\OneDrive\Documents\UNH Fall 2019\Distributed_Scalable\Module 7\7_1-MessageBrokers\log_data\2018\11\2018-11-01-events.json
Filename:
 2018-11-02-events.json
Filepath:
 C:\Users\aambr\OneDrive\Documents\UNH Fall 2019\Distributed_Scalable\Module 7\7_1-MessageBrokers\log_data\2018\11\2018-11-02-events.json
Filename:
 2018-11-03-events.json
Filepath:
 C:\Users\aambr\OneDrive\Documents\UNH Fall 2019\Distributed_Scalable\Module 7\7_1-MessageBrokers\log_data\2018\11\2018-11-03-events.json
Filename:
 2018-11-04-events.json
Filepath:
 C:\Users\aambr\OneDrive\Documents\UNH Fall 2019\Distributed_Scalable\Module 7\7_1-MessageBrokers\log_data\2018\11\2018-11-04-events.json
Filename:
 2018-11-05-events.json
Filepath:
 C:\Users\aambr\OneDrive\Documents\UNH Fall 2019\Distributed_Scalable\Module 7\7_1-MessageBrokers\log_data\2018\11\2018-11-05-events.json
Filename:
 2018-11-06-events.json
Filepath:
 C:\Users\aambr\OneDrive\Docum

In [3]:
# Retrieve keys from bucket
def list_bucket_contents(bucket):
    keys = []
    bucket_objs = s3.list_objects_v2(Bucket=bucket)

    for obj in bucket_objs['Contents']:
        keys.append(obj['Key'])
        
    return keys

In [5]:
list_bucket_contents('sparkify3bucket')

['2018-11-01-events.json',
 '2018-11-02-events.json',
 '2018-11-03-events.json',
 '2018-11-04-events.json',
 '2018-11-05-events.json',
 '2018-11-06-events.json',
 '2018-11-07-events.json',
 '2018-11-08-events.json',
 '2018-11-09-events.json',
 '2018-11-10-events.json',
 '2018-11-11-events.json',
 '2018-11-12-events.json',
 '2018-11-13-events.json',
 '2018-11-14-events.json',
 '2018-11-15-events.json',
 '2018-11-16-events.json',
 '2018-11-17-events.json',
 '2018-11-18-events.json',
 '2018-11-19-events.json',
 '2018-11-20-events.json',
 '2018-11-21-events.json',
 '2018-11-22-events.json',
 '2018-11-23-events.json',
 '2018-11-24-events.json',
 '2018-11-25-events.json',
 '2018-11-26-events.json',
 '2018-11-27-events.json',
 '2018-11-28-events.json',
 '2018-11-29-events.json',
 '2018-11-30-events.json']