# Query Data with AWS Data Wrangler

**AWS Data Wrangler** is an open-source Python library that extends the power of the Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, Amazon QuickSight, etc).

* https://github.com/awslabs/aws-data-wrangler
* https://aws-data-wrangler.readthedocs.io

Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

_Note that AWS Data Wrangler is simply a Python library that uses existing AWS Services.  AWS Data Wrangler is not a separate AWS Service.  You install AWS Data Wrangler through `pip install` as we will see next._

# _Pre-Requisite: Make Sure You Created an Athena Table for Both TSV and Parquet in Previous Notebooks_

In [1]:
%store -r ingest_create_athena_table_tsv_passed

In [2]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [3]:
print(ingest_create_athena_table_tsv_passed)

True


In [4]:
if not ingest_create_athena_table_tsv_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


In [5]:
%store -r ingest_create_athena_table_parquet_passed

In [6]:
try:
    ingest_create_athena_table_parquet_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [7]:
print(ingest_create_athena_table_parquet_passed)

True


In [8]:
if not ingest_create_athena_table_parquet_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


# Setup

In [9]:
!pip install numpy==1.23.5
!pip install matplotlib==3.4.3
import sagemaker
import boto3

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

Collecting numpy==1.23.5
  Using cached numpy-1.23.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.3 kB)
Using cached numpy-1.23.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.1 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 2.2.3
    Uninstalling numpy-2.2.3:
      Successfully uninstalled numpy-2.2.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
autogluon-multimodal 1.2 requires nvidia-ml-py3==7.352.0, which is not installed.
autogluon-common 1.2 requires numpy<2.1.4,>=1.25.0, but you have numpy 1.23.5 which is incompatible.
autogluon-core 1.2 requires matplotlib<3.11,>=3.7.0, but you have matplotlib 3.4.3 which is incompatible.
autogluon-core 1.2 requires numpy<2.1.4,>=1.25.0, but you have numpy 1.23.5 which is incompatible.
autogluon-features 1.2

In [10]:
!pip uninstall -y pyarrow
!pip install "pyarrow>=8.0.0,<19.0.0"

Found existing installation: pyarrow 18.1.0
Uninstalling pyarrow-18.1.0:
  Successfully uninstalled pyarrow-18.1.0
Collecting pyarrow<19.0.0,>=8.0.0
  Using cached pyarrow-18.1.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Using cached pyarrow-18.1.0-cp311-cp311-manylinux_2_28_x86_64.whl (40.1 MB)
Installing collected packages: pyarrow
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
mlflow 2.20.3 requires sqlalchemy<3,>=1.4.0, but you have sqlalchemy 1.3.22 which is incompatible.
awswrangler 3.11.0 requires numpy<3.0,>=1.26; python_version >= "3.10", but you have numpy 1.23.5 which is incompatible.[0m[31m
[0mSuccessfully installed pyarrow-18.1.0


In [11]:
import pyarrow
print(pyarrow.__version__)

18.1.0


In [12]:
import awswrangler as wr
print(wr.__version__)

3.11.0


# Query Parquet from S3 with Push-Down Filters

Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.

The concept of Dataset goes beyond the simple idea of files and enable more complex features like partitioning and catalog integration (AWS Glue Catalog): 

_dataset (bool)_ – If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns.

In [14]:
p_filter = lambda x: x["product_category"] == "Digital_Software"

In [15]:
path = "s3://{}/amazon-reviews-pds/parquet/".format(bucket)
df_parquet_results = wr.s3.read_parquet(
    path, columns=["star_rating", "product_category", "review_body"], partition_filter=p_filter, dataset=True
)
df_parquet_results.shape

2025-03-15 19:31:39,594	INFO worker.py:1786 -- Started a local Ray instance.


(102084, 3)

In [16]:
df_parquet_results.head(5)

Unnamed: 0,star_rating,review_body,product_category
0,4,"very easy to use, got my refund quickly, what ...",Digital_Software
1,5,I have been using this software for about 4 mo...,Digital_Software
2,5,A perfect antivirus software for people from a...,Digital_Software
3,5,I love this product because clean very well my...,Digital_Software
4,5,Best deal I could find when my 2013 Norton Ant...,Digital_Software


# Query Parquet from S3 in Chunks

Batching (chunked argument) (Memory Friendly):

Will enable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:
* If chunked=True, a new DataFrame will be returned for each file in your path/dataset.
* If chunked=INTEGER, Wrangler will iterate on the data by number of rows equal to the received INTEGER.

P.S. chunked=True if faster and uses less memory while chunked=INTEGER is more precise in number of rows for each Dataframe.

In [17]:
path = "s3://{}/amazon-reviews-pds/parquet/".format(bucket)
chunk_iter = wr.s3.read_parquet(
    path,
    # columns=["star_rating", "product_category", "review_body"],
    # filters=[("product_category", "=", "Digital_Software")],
    partition_filter=p_filter,
    dataset=True,
    chunked=True,
)

In [18]:
print(next(chunk_iter))

      marketplace customer_id       review_id  product_id product_parent  \
0              US    32572750   RVWIB4R40ZCAE  B00FFINRG6      805918609   
1              US    41068344  R19M409QGULM0O  B00H9A60O4      608720080   
2              US    39012669  R3O7SSVGRUP1CM  B00H9A60O4      608720080   
3              US    25668354  R24ENTF1YFYILT  B00DH3S702      944756462   
4              US    52047892  R2LMLS4DI1SRNF  B00FGDE64C      245923760   
...           ...         ...             ...         ...            ...   
65531          US     1458781   RP0DSCBZ8MOTD  B00FMAPXAU      403322852   
65532          US    22706705  R128DAZ8CYHRV6  B00RDN32RA      692962854   
65533          US    26886928  R3JSFNQOLK9LK1  B00S76XVBO      228807938   
65534          US    51195376   RCXYDRH6U5BAN  B00ICPVFO0      358290633   
65535          US    11104860  R2GAS4SKKYW4HB  B00G0DXA9Y      595906078   

                                   product_title  star_rating  helpful_votes  \
0      

# Query the Glue Catalog (ie. Hive Metastore)
Get an iterator of tables.

In [19]:
database_name = "dsoaws"
table_name_tsv = "amazon_reviews_tsv"
table_name_parquet = "amazon_reviews_parquet"

In [20]:
for table in wr.catalog.get_tables(database="dsoaws"):
    print(table["Name"])

amazon_reviews_parquet
amazon_reviews_tsv


# Query from Athena
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.  


In [21]:
%%time
df = wr.athena.read_sql_query(sql="SELECT * FROM {} LIMIT 5000".format(table_name_parquet), database=database_name)

CPU times: user 1.51 s, sys: 197 ms, total: 1.71 s
Wall time: 5.52 s


In [22]:
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,24371595,R27ZP1F1CD0C3Y,B004LLIL5A,346014806,Amazon eGift Card - Celebrate,5,0,0,N,Y,Five Stars,Great birthday gift for a young adult.,2015,2015-08-31,Gift Card
1,US,25283295,R2HAXF0IIYQBIR,B00IX1I3G6,926539283,Amazon.com Gift Card Balance Reload,1,0,0,N,Y,One Star,Fair,2015,2015-08-31,Gift Card
2,US,397970,RNYLPX611NB7Q,B005ESMGV4,379368939,"Amazon.com Gift Cards, Pack of 3 (Various Desi...",5,0,0,N,Y,Five Stars,I can't believe how quickly Amazon can get the...,2015,2015-08-31,Gift Card
3,US,18513645,R3ALA9XXMBEDZR,B004KNWWU4,326384774,Amazon Gift Card - Print - Happy Birthday (Birds),5,0,0,N,Y,Perfect for most every occasion!,Perfect! Nice & easy plus everyone loves them...,2015,2015-08-31,Gift Card
4,US,22484620,R3R8PHAVJFTPDF,B004LLIKVU,473048287,Amazon.com eGift Cards,5,0,0,N,Y,Five Stars,excelent,2015,2015-08-31,Gift Card


# Query from Athena in Chunks
Retrieving in chunks can help reduce memory requirements.  

_This will take a few seconds._

In [23]:
%%time

chunk_iter = wr.athena.read_sql_query(
    sql="SELECT * FROM {} LIMIT 5000".format(table_name_parquet),
    database="{}".format(database_name),
    chunksize=64_000,  # 64 KB Chunks
)

CPU times: user 1.45 s, sys: 173 ms, total: 1.62 s
Wall time: 4.26 s


In [24]:
print(next(chunk_iter))

     marketplace customer_id       review_id  product_id product_parent  \
0             US    24371595  R27ZP1F1CD0C3Y  B004LLIL5A      346014806   
1             US    42489718   RJ7RSBCHUDNNE  B004LLIKVU      473048287   
2             US      861463  R1HVYBSKLQJI5S  B00IX1I3G6      926539283   
3             US    25283295  R2HAXF0IIYQBIR  B00IX1I3G6      926539283   
4             US      397970   RNYLPX611NB7Q  B005ESMGV4      379368939   
...          ...         ...             ...         ...            ...   
4995          US    13423949   R1SV5GFHJV0SA  B00CT780C2      473048287   
4996          US    30261441  R27U6BVER2H8BD  B00IX1I3G6      926539283   
4997          US    10806552  R2JBZ6KDMCJINS  B007V6ETDK      924812503   
4998          US    22839152   RAYQZ2BKPQD9S  B00IX1I3G6      926539283   
4999          US    13041749  R2MKLSNFA5M1KM  B00H5BMF00      373287760   

                                          product_title  star_rating  \
0                         A

# Release Resources

In [2]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [1]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>