# Notebook for Loading SQS with Messages for Lambda Processing

The Million Song Dataset has an associated fileset of links for each song in JSON format. It's a nested format that's not very  useful for processing. We need it to be in a CSV format for easy processing.

So in order to accomplish that we've built a serverless function that will take data from one S3 bucket that's in JSON format and combine the files and convert them to csv format. The way we initiate that function is through an SQS queue. The benefit of this is that the load processes can co-currently and asyrchronously process the data.

This notebook reviews the files in the first S3 bucket and then parses their names/key values and then batches in them in SQS messages 2,000 records at a time.

## Setup

We are loading the required libraries and using boto3 and the s3 client to create a list of all the files in our input bucket for use later.

In [2]:
import boto3
import json

bucket = 'echonest-intermediate-json'

s3 = boto3.client('s3')

paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket)

files = [obj['Key'] for page in pages for obj in page['Contents']]

## Clean up the files for processing

We only want to process files that are the  JSON extension anything else is outside the scope of this function.

In [5]:
json_files = [f for f in files if f.endswith('.json') and not f.startswith("zip")]

## Slice the List

We have a million files we have to process. We can turn that list into a list of slices - each 2,000 files long. Then later we can just iterate through these slices

In [16]:
group_size = 2000

slices_files = [json_files[i:i+group_size] for i in range(0, len(json_files), group_size)]

## Queue the Messages

Now we'll iterate through each slice and create a SQS message with the 2,000 associated files and some other information such as the bucket and final file name we want

In [19]:
# Get the service resource
sqs = boto3.resource('sqs')

# Get the queue
queue = sqs.get_queue_by_name(QueueName = 'msd-jsontocsv')

for i, slice in enumerate(slices_files):
    message_body = {'bucket': bucket, 'files': slice, 'fileName': f'data{i}.csv'}

    # Create a new message
    response = queue.send_message(MessageBody = json.dumps(message_body))