# Convert TSV Data To Parquet with Athena

In this notebook, we will show you how you can easily convert that data now into Apache Parquet file format.

<img src="img/athena_convert_parquet.png" width="60%" align="left">

In [None]:
# Parquet is a columnar data storage and can be used for analytical queries. Athena lets you query a parquet flat file stored
# in AWS. Glue catalog is used to apply schema information on top of s3 files.

In [1]:
import boto3
import sagemaker

session = boto3.session.Session()
region_name = session.region_name
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

In [2]:
ingest_create_athena_table_parquet_passed = False

In [3]:
%store -r ingest_create_athena_table_tsv_passed

In [4]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print('++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.')
    print('++++++++++++++++++++++++++++++++++++++++++++++')

In [5]:
print(ingest_create_athena_table_tsv_passed)

True


In [6]:
if not ingest_create_athena_table_tsv_passed:
    print('++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.')
    print('++++++++++++++++++++++++++++++++++++++++++++++')
else:
    print('[OK]')

[OK]


# Install PyAthena

In [7]:
from pyathena import connect
from pyathena.pandas_cursor import PandasCursor
from pyathena.util import as_pandas

# Create Parquet Files from TSV Table

As you can see from the query below, we’re also adding a new `year` column to our dataset by converting the `review_date` string to a date format, and then cast the year out of the date. Let’s store the year value as an integer. And let's partition the Parquet data by `Product Category`.

In [8]:
# Set S3 path to Parquet data
s3_path_parquet = 's3://{}/amazon-reviews-pds/parquet'.format(bucket)

# Set Athena parameters
database_name = 'dsoaws'
table_name_tsv = 'amazon_reviews_tsv'
table_name_parquet = 'amazon_reviews_parquet'

In [9]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = 's3://{0}/athena/staging'.format(bucket)

# Execute Statement
_This can take a few minutes.  Please be patient._

In [10]:
# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['product_category']) AS
SELECT marketplace,
         customer_id,
         review_id,
         product_id,
         product_parent,
         product_title,
         star_rating,
         helpful_votes,
         total_votes,
         vine,
         verified_purchase,
         review_headline,
         review_body,
         CAST(YEAR(DATE(review_date)) AS INTEGER) AS year,
         DATE(review_date) AS review_date,
         product_category
FROM {}.{}""".format(database_name, table_name_parquet, s3_path_parquet, database_name, table_name_tsv)

print(statement)

CREATE TABLE IF NOT EXISTS dsoaws.amazon_reviews_parquet
WITH (format = 'PARQUET', external_location = 's3://sagemaker-us-east-1-251166678609/amazon-reviews-pds/parquet', partitioned_by = ARRAY['product_category']) AS
SELECT marketplace,
         customer_id,
         review_id,
         product_id,
         product_parent,
         product_title,
         star_rating,
         helpful_votes,
         total_votes,
         vine,
         verified_purchase,
         review_headline,
         review_body,
         CAST(YEAR(DATE(review_date)) AS INTEGER) AS year,
         DATE(review_date) AS review_date,
         product_category
FROM dsoaws.amazon_reviews_tsv


In [11]:
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

<pyathena.cursor.Cursor at 0x7f1ced273240>

# Load partitions by running `MSCK REPAIR TABLE`

As a last step, we need to load the Parquet partitions. To do so, just issue the following SQL command: 

In [12]:
statement = 'MSCK REPAIR TABLE {}.{}'.format(database_name, table_name_parquet)

print(statement)

MSCK REPAIR TABLE dsoaws.amazon_reviews_parquet


In [13]:
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

<pyathena.cursor.Cursor at 0x7f1cecf6b160>

# Show the Partitions

In [14]:
statement = 'SHOW PARTITIONS {}.{}'.format(database_name, table_name_parquet)

print(statement)

SHOW PARTITIONS dsoaws.amazon_reviews_parquet


In [15]:
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

df_partitions = as_pandas(cursor)
df_partitions.head(5)

Unnamed: 0,partition
0,product_category=Digital_Software
1,product_category=Digital_Video_Games


# Show the Tables

In [16]:
statement = 'SHOW TABLES in {}'.format(database_name)
cursor.execute(statement)

df_tables = as_pandas(cursor)
df_tables.head(5)

Unnamed: 0,tab_name
0,amazon_reviews_parquet
1,amazon_reviews_tsv


In [17]:
if table_name_parquet in df_tables.values:
    ingest_create_athena_table_parquet_passed = True

# Run Sample Query

In [18]:
product_category = 'Digital_Software'

statement = """SELECT * FROM {}.{}
    WHERE product_category = '{}' LIMIT 100""".format(database_name, table_name_parquet, product_category)

print(statement)

SELECT * FROM dsoaws.amazon_reviews_parquet
    WHERE product_category = 'Digital_Software' LIMIT 100


In [19]:
# Execute statement using connection cursor
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

df = as_pandas(cursor)
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,15257192,R204T21JTCZ8XG,B00E7XA7KY,189774198,Quickbooks Pro,1,1,1,N,N,Chase bank Web Connect issues,Major problems with Chase bank intergration. C...,2014,2014-02-16,Digital_Software
1,US,50690611,R2PCSOYZZ5IAL1,B00FFINUJK,866839083,"TurboTax Home and Business Fed, Efile and Stat...",5,2,2,N,Y,Long time user: It does what it's supposed to ...,Initial note: This is a review of the download...,2014,2014-02-16,Digital_Software
2,US,2481589,R3N8V9RJWPDXOA,B00B1TGHXS,954368001,Microsoft Word 2013 (1PC/1User),1,0,0,N,Y,was not able to download this program. i have ...,please refund for this program. I can not down...,2014,2014-02-16,Digital_Software
3,US,42625099,R2LV5O1P2A5GFX,B00F8LJU9S,627104528,Adobe Photoshop Elements 12,5,0,0,N,Y,Easy to use,Super easy to download and great software prog...,2014,2014-02-16,Digital_Software
4,US,51561267,R2D8O9VZOL67J6,B00FGDEPDY,991059534,Norton Internet Security 1 User 3 Licenses,5,0,1,N,Y,Long Time Norton Fan,I've been using Norton Internet Security for y...,2014,2014-02-16,Digital_Software


In [20]:
if not df.empty:
    print('[OK]')
else:
    print('++++++++++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOUR DATA HAS NOT BEEN CONVERTED TO PARQUET. LOOK IN PREVIOUS CELLS TO FIND THE ISSUE.')
    print('++++++++++++++++++++++++++++++++++++++++++++++++++++++')

[OK]


In just a few steps we have set up Amazon Athena to connect to our Amazon Customer Reviews TSV files, and transformed them into Apache Parquet file format. 

You might have noticed that our second sample query finished in a fraction of the time compared to the one before we ran on the TSV table. We sped up our query results by leveraging our data being stored as Parquet and partitioned by `product_category`. 


# Store Variables for the Next Notebooks

In [21]:
%store ingest_create_athena_table_parquet_passed

Stored 'ingest_create_athena_table_parquet_passed' (bool)


In [22]:
%store

Stored variables and their in-db values:
ingest_create_athena_db_passed                        -> True
ingest_create_athena_table_parquet_passed             -> True
ingest_create_athena_table_tsv_passed                 -> True
s3_private_path_tsv                                   -> 's3://sagemaker-us-east-1-251166678609/amazon-revi
s3_public_path_tsv                                    -> 's3://amazon-reviews-pds/tsv'
setup_dependencies_passed                             -> True
setup_iam_roles_passed                                -> True
setup_instance_check_passed                           -> True
setup_s3_bucket_passed                                -> True


In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();

<IPython.core.display.Javascript object>