# Query Data with AWS Data Wrangler

**AWS Data Wrangler** is an open-source Python library that extends the power of the Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, Amazon QuickSight, etc).

* https://github.com/awslabs/aws-data-wrangler
* https://aws-data-wrangler.readthedocs.io

Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

_Note that AWS Data Wrangler is simply a Python library that uses existing AWS Services.  AWS Data Wrangler is not a separate AWS Service.  You install AWS Data Wrangler through `pip install` as we will see next._

# _Pre-Requisite: Make Sure You Created an Athena Table for Both TSV and Parquet in Previous Notebooks_

In [1]:
%store -r ingest_create_athena_table_tsv_passed

In [2]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print('++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.')
    print('++++++++++++++++++++++++++++++++++++++++++++++')

In [3]:
print(ingest_create_athena_table_tsv_passed)

True


In [4]:
if not ingest_create_athena_table_tsv_passed:
    print('++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.')
    print('++++++++++++++++++++++++++++++++++++++++++++++')
else:
    print('[OK]')

[OK]


In [5]:
%store -r ingest_create_athena_table_parquet_passed

In [6]:
try:
    ingest_create_athena_table_parquet_passed
except NameError:
    print('++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.')
    print('++++++++++++++++++++++++++++++++++++++++++++++')

In [7]:
print(ingest_create_athena_table_parquet_passed)

True


In [8]:
if not ingest_create_athena_table_parquet_passed:
    print('++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.') 
    print('++++++++++++++++++++++++++++++++++++++++++++++')
else:
    print('[OK]')

[OK]


# Setup

In [9]:
import sagemaker
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

In [10]:
import awswrangler as wr

# Query Parquet from S3 with Push-Down Filters

Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.

The concept of Dataset goes beyond the simple idea of files and enable more complex features like partitioning and catalog integration (AWS Glue Catalog): 

_dataset (bool)_ â€“ If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns.

In [11]:
p_filter = lambda x: True if x["product_category"] == "Digital_Software" else False

In [12]:
path = 's3://{}/amazon-reviews-pds/parquet/'.format(bucket)
df_parquet_results = wr.s3.read_parquet(path,
                                        columns=['star_rating', 'product_category', 'review_body'],
                                        partition_filter=p_filter,
                                        dataset=True)
df_parquet_results.shape

(102084, 3)

In [13]:
df_parquet_results.head(5)

Unnamed: 0,star_rating,review_body,product_category
0,4,So far so good,Digital_Software
1,3,Needs a little more work.....,Digital_Software
2,1,Please cancel.,Digital_Software
3,5,Works as Expected!,Digital_Software
4,4,I've had Webroot for a few years. It expired a...,Digital_Software


# Query Parquet from S3 in Chunks

Batching (chunked argument) (Memory Friendly):

Will enable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:
* If chunked=True, a new DataFrame will be returned for each file in your path/dataset.
* If chunked=INTEGER, Wrangler will iterate on the data by number of rows equal to the received INTEGER.

P.S. chunked=True if faster and uses less memory while chunked=INTEGER is more precise in number of rows for each Dataframe.

In [14]:
path = 's3://{}/amazon-reviews-pds/parquet/'.format(bucket)
chunk_iter = wr.s3.read_parquet(path,
                                columns=['star_rating', 'product_category', 'review_body'],
                                # filters=[("product_category", "=", "Digital_Software")],
                                partition_filter=p_filter,
                                dataset=True,
                                chunked=True)

In [15]:
print(next(chunk_iter))

       star_rating                                        review_body  \
0                4                                     So far so good   
1                3                      Needs a little more work.....   
2                1                                     Please cancel.   
3                5                                 Works as Expected!   
4                4  I've had Webroot for a few years. It expired a...   
...            ...                                                ...   
61440            1  I have used TT since the 1980s when it was on ...   
61441            5  I'm new to tax software and used TaxCut for a ...   
61442            1  Despite what their website says, the importing...   
61443            1  This has caused me more trouble than I care to...   
61444            1  The Amazon downloader does not work with ANY 6...   

       product_category  
0      Digital_Software  
1      Digital_Software  
2      Digital_Software  
3      Digital_Soft

# Query the Glue Catalog (ie. Hive Metastore)
Get an iterator of tables.

In [16]:
database_name = 'dsoaws'
table_name_tsv = 'amazon_reviews_tsv'
table_name_parquet = 'amazon_reviews_parquet'

In [17]:
for table in wr.catalog.get_tables(database="dsoaws"):
    print(table['Name'])

amazon_reviews_parquet
amazon_reviews_tsv


# Query from Athena
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.  


In [18]:
%%time
df = wr.athena.read_sql_query(
    sql='SELECT * FROM {} LIMIT 5000'.format(table_name_parquet),
    database=database_name
)

CPU times: user 376 ms, sys: 21.7 ms, total: 398 ms
Wall time: 8.24 s


In [19]:
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,44527232,R30F6LJKSIUEF3,B005D9LU9I,784934191,FileCenter Professional 7 for 1 PC [Download],5,12,12,N,N,Lucion FileCenter Pro - Paperless Office at it...,I've been looking for years for a solution lik...,2012,2012-05-13,Digital_Software
1,US,12284345,R3VAXHCO2LYWUB,B0064PFB9U,232554866,Office Mac Home and Student 2011 - 1PC/1User [...,4,0,0,N,Y,Office for MacBook Air,Downloaded from amazon for my MacBook Air. Qui...,2012,2012-05-13,Digital_Software
2,US,16312016,R2ZUAFRNU9M1JA,B005V583KI,449921105,AVG 2012 PC Tuneup - 3 user,1,1,4,N,Y,OK,AVG PC TUNE UP IS A KILLER FOR WINDOWS 7.ALSO ...,2012,2012-05-13,Digital_Software
3,US,15217489,R2IY9NAGYYWBZY,B004X0DDOI,958139200,Riddle Me Scavenger and Treasure Hunt Clue Sof...,1,0,0,N,Y,Disappointed in purchase,"Lacked creativity. Very, very basic. Descripti...",2012,2012-05-12,Digital_Software
4,US,27305412,R3PHXPN1U8PQ3,B007C89B44,78829106,Norton 360 6.0 - 1 User/3PC,1,2,3,N,Y,never try,"after set up Norton 360, my computer never did...",2012,2012-05-12,Digital_Software


# Query from Athena in Chunks
Retrieving in chunks can help reduce memory requirements.  

_This will take a few seconds._

In [20]:
%%time

chunk_iter = wr.athena.read_sql_query(
    sql='SELECT * FROM {} LIMIT 5000'.format(table_name_parquet),
    database='{}'.format(database_name),
    chunksize=64_000  # 64 KB Chunks
)

CPU times: user 207 ms, sys: 14.2 ms, total: 221 ms
Wall time: 6.81 s


In [21]:
print(next(chunk_iter))

     marketplace customer_id       review_id  product_id product_parent  \
0             US    41754720  R19OFJV91M7D8X  B000YMR61A      141393130   
1             US    51669529  R1I6G894K5AGG5  B000YMR61A      141393130   
2             US    24731012  R17OE43FFEP81I  B000YMR5X4      234295632   
3             US    16049580  R15MGDDK63B52Z  B000YMR61A      141393130   
4             US    46098046  R1GGJJA2R68033  B000YMNI2Q      847631772   
...          ...         ...             ...         ...            ...   
4995          US    46442566  R3F12LC5JMD2DS  B007ZUPBCU      747702221   
4996          US    18387234  R1XGZ7CODVV1M9  B0091T6LFE      496203253   
4997          US    10244666  R2ORK5O6KA99P2  B004IZL5FM      318994260   
4998          US    43040052  R2F0GPLQTTIZ4G  B007ZUPBCU      747702221   
4999          US    14732793   RKK9IGW8PW65I  B0047T74CW      968207612   

                                          product_title  star_rating  \
0                  TurboTax

In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();