## Convert TSV Data To Parquet

It's simple to convert TSV data into [Apache Parquet](https://parquet.apache.org/) file format.

In [1]:
import boto3
import sagemaker

# Get region 
session = boto3.session.Session()
region_name = session.region_name

# Get SageMaker session & default S3 bucket
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

# Set S3 prefixes
parquet_prefix = 'amazon-reviews-pds/parquet'

# Set S3 path to Parquet data
s3_path_parquet = 's3://{}/{}'.format(bucket, parquet_prefix)

# Set Athena parameters
database_name = 'coeaws'
table_name_tsv = 'amazon_reviews_tsv'
table_name_parquet = 'amazon_reviews_parquet'

In [2]:
# install pyathena
!pip install -q PyAthena==1.8.0

In [3]:
from pyathena import connect
from pyathena.pandas_cursor import PandasCursor
from pyathena.util import as_pandas

### Create Parquet Files from TSV Table

As you can see from the query below, we’re also adding a new `year` column to our dataset by converting the `review_date` string to a date format, and then cast the year out of the date. Let’s store the year value as an integer. And let's partition the Parquet data by `Product Category`.

In [4]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = 's3://{0}/athena/staging'.format(bucket)

In [5]:
# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['product_category']) AS
SELECT marketplace,
         customer_id,
         review_id,
         product_id,
         product_parent,
         product_title,
         star_rating,
         helpful_votes,
         total_votes,
         vine,
         verified_purchase,
         review_headline,
         review_body,
         CAST(YEAR(DATE(review_date)) AS INTEGER) AS year,
         DATE(review_date) AS review_date,
         product_category
FROM {}.{}""".format(database_name, table_name_parquet, s3_path_parquet, database_name, table_name_tsv)

print(statement)

CREATE TABLE IF NOT EXISTS coeaws.amazon_reviews_parquet
WITH (format = 'PARQUET', external_location = 's3://sagemaker-us-east-2-533787958253/amazon-reviews-pds/parquet', partitioned_by = ARRAY['product_category']) AS
SELECT marketplace,
         customer_id,
         review_id,
         product_id,
         product_parent,
         product_title,
         star_rating,
         helpful_votes,
         total_votes,
         vine,
         verified_purchase,
         review_headline,
         review_body,
         CAST(YEAR(DATE(review_date)) AS INTEGER) AS year,
         DATE(review_date) AS review_date,
         product_category
FROM coeaws.amazon_reviews_tsv


### Execute statement using connection cursor
This can take a few minutes.  Please be patient.

In [6]:
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

<pyathena.cursor.Cursor at 0x7f67c1e305c0>

### Load partitions by running `MSCK REPAIR TABLE`
to load the Parquet partitions. To do so, just issue the following SQL command: 

In [7]:
statement = 'MSCK REPAIR TABLE {}.{}'.format(database_name, table_name_parquet)

print(statement)

MSCK REPAIR TABLE coeaws.amazon_reviews_parquet


In [8]:
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

<pyathena.cursor.Cursor at 0x7f67c1b68668>

In [9]:
# Show the partitions
statement = 'SHOW PARTITIONS {}.{}'.format(database_name, table_name_parquet)

print(statement)

SHOW PARTITIONS coeaws.amazon_reviews_parquet


In [10]:
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

df_partitions = as_pandas(cursor)
df_partitions.head(5)

Unnamed: 0,partition
0,product_category=Digital_Software
1,product_category=Digital_Video_Games


In [11]:
# Run a sample query
product_category = 'Digital_Software'

statement = """SELECT * FROM {}.{}
    WHERE product_category = '{}' LIMIT 100""".format(database_name, table_name_parquet, product_category)

print(statement)

SELECT * FROM coeaws.amazon_reviews_parquet
    WHERE product_category = 'Digital_Software' LIMIT 100


In [12]:
# Execute statement using connection cursor
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

df = as_pandas(cursor)
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,18640556,RWKGA4N6R4KMY,B00C1QCBY4,640306152,Sketchbook Pro 6,5,0,0,N,Y,Oustanding,It's a much simpler application than Painter o...,2014,2014-05-14,Digital_Software
1,US,11866626,RF8V3GF4MGT6M,B00H9A60O4,608720080,Avast Free Antivirus 2015 [Download],5,0,0,N,N,Great Antivirus!!!,This program has been keeping my computer runn...,2014,2014-05-14,Digital_Software
2,US,43410578,R2KX41Y1J5PPA6,B0064PFB9U,232554866,Office Mac Home and Student 2011 - 1PC/1User [...,3,0,0,N,Y,Does the job,"It does the job but it's missing or rather, th...",2014,2014-05-14,Digital_Software
3,US,10783062,R3UPVHGBYFE6UQ,B004KPKSRQ,306022575,Pc Matic Performance Utility Suite (5 User Edi...,5,4,4,N,N,Love this new software that is keeping my syst...,"I have the PC Matic on my comp, and couldn't b...",2014,2014-05-14,Digital_Software
4,US,51046135,R22P9WNTXY0VY6,B00FFIO0NA,386795082,"TurboTax Premier Fed, Efile and State 2013",4,0,0,N,N,Same as Last Year,Performed as expected from years of experience...,2014,2014-05-14,Digital_Software


So, in just a few steps we have set up Amazon Athena to connect to our Amazon Customer Reviews TSV files, and transformed them into Apache Parquet file format. 

You might have noticed that our second sample query finished in a fraction of the time compared to the one before we ran on the TSV table. We speeded up our query results by leveraging our data being stored as Parquet and partitioned by `product_category`. 