# Data Preparation for Video Recommender

There are a few steps which were performed to prepare the data directly in a Jupyter notebook. These included removing empty rows, removing the customers who rated less than 5 movies, and removing videos which had less than 10 reviews.

In this section this pre-processing is doen using Amazon Athena instead. Athena is built using Presto and allows the user to run sql queries directly on data stored in s3 and more. This allows joining tables, and processing the data so it is in a form to be used by the algorithm. This allows easy processing of large amounts of data.

In a real business example data will we sourced from a large variety of various tables and databases. It is also likely that each of these sources will contain much more data than can fit in memory directly. This is where Athena can be very useful.

There are alternative approaches to achieve this such as processing files in chunks, using Hadoop or Spark, or accessing data from a datawarehouse such as Redshift. Athena has the advantage that it is very simple and cheap while covering a large number of use cases.


### Import requirements

In [1]:
import boto3
bucket = 'eduthie-sagemaker-1'
prefix = 'gluon_recommender'
client = boto3.client('athena')

### Start the query execution

In [2]:
query_string = '''
select a.customer_id, a.product_id, a.star_rating, a.product_title 
from gluon_recommender a
inner join
(
    select customer_id
    from gluon_recommender
    group by customer_id 
    having count(customer_id) >= 5
) b
on a.customer_id = b.customer_id
inner join
(
    select product_id
    from gluon_recommender
    group by product_id 
    having count(product_id) >= 10
) b
on a.product_id = b.product_id
where 
    a.customer_id is not null
'''
create_query_response = client.start_query_execution(
    QueryString=query_string,
    QueryExecutionContext={
        'Database': 'product_ratings'
    },
    ResultConfiguration={
        'OutputLocation': 's3://{}/{}/processed/'.format(bucket,prefix)
    }
)

After the query has been kicked off Athena gives us a QueryExecutionId which is then used to view the results.

Behind the scenes Athena is executing the query and outputing the results to the s3 bucket which was specified in the OutputLocation of the ResultConfiguration.

In [3]:
create_query_response

{'QueryExecutionId': '34db21b4-15ad-4669-9720-171927e2527a',
 'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
   'content-length': '59',
   'content-type': 'application/x-amz-json-1.1',
   'date': 'Mon, 04 Jun 2018 08:44:29 GMT',
   'x-amzn-requestid': 'bfcd4ba6-1897-478f-8eb2-090110cfa6ea'},
  'HTTPStatusCode': 200,
  'RequestId': 'bfcd4ba6-1897-478f-8eb2-090110cfa6ea',
  'RetryAttempts': 0}}

### Wait for the query to finish

The query is executed by Athena asynchronously. This is very useful, for example, if you want to trigger the job via a Lambda function. With Lambda you only pay for when the function is running and hence it is very cheap to just fire off the query and return.

In this case we poll Athena every second until the query has finished. It takes about 30 seconds to perform the job.

In [4]:
import time
waiting = True
query_execution_response = None
while(waiting):
    query_execution_response = client.get_query_execution(
        QueryExecutionId=create_query_response['QueryExecutionId']
    )
    if query_execution_response['QueryExecution']['Status']['State'] == 'SUCCEEDED':
        waiting = False
    else:
        time.sleep(1)

Once the query is finished a lot of information is provided, including the output location in s3 where the results are stored. You can also directly recieve the results in the notebook using the API if you wish. We will want to access the data from s3 when training and running our model and hence keep it in s3.

In [5]:
query_execution_response

{'QueryExecution': {'Query': 'select a.customer_id, a.product_id, a.star_rating, a.product_title \nfrom gluon_recommender a\ninner join\n(\n    select customer_id\n    from gluon_recommender\n    group by customer_id \n    having count(customer_id) >= 5\n) b\non a.customer_id = b.customer_id\ninner join\n(\n    select product_id\n    from gluon_recommender\n    group by product_id \n    having count(product_id) >= 10\n) b\non a.product_id = b.product_id\nwhere \n    a.customer_id is not null',
  'QueryExecutionContext': {'Database': 'product_ratings'},
  'QueryExecutionId': '34db21b4-15ad-4669-9720-171927e2527a',
  'ResultConfiguration': {'OutputLocation': 's3://eduthie-sagemaker-1/gluon_recommender/processed/34db21b4-15ad-4669-9720-171927e2527a.csv'},
  'Statistics': {'DataScannedInBytes': 5539313633,
   'EngineExecutionTimeInMillis': 32682},
  'Status': {'CompletionDateTime': datetime.datetime(2018, 6, 4, 8, 45, 3, 413000, tzinfo=tzlocal()),
   'State': 'SUCCEEDED',
   'SubmissionDat

### Copy the output

Finally we copy the output to the training directory with a predictable name which we will use for training.

In [6]:
file_location = query_execution_response['QueryExecution']['ResultConfiguration']['OutputLocation']
key = file_location[len('s3://{}/'.format(bucket)):]
boto3.client('s3').copy({'Bucket': bucket, 
                         'Key': key
                        },
                        bucket,
                        prefix + '/train_ready/amazon_reviews_us_Digital_Video_Download.csv')