# JSON to Parquet and Reduce Number of Files

DISCLAIMER: This is sample code and is provided 'as is'. Should be used as a reference only.


### 1. Import the required libraries:

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame


In [None]:
glueContext = GlueContext(SparkContext.getOrCreate())
block_size = 128*1024*1024
page_size = 1024*1024

### 2. Create dynamic frame from glue catalog table:

Using push-down-predicates to read only the required source partitions. Job bookmarks could work here as well. 

In [None]:
rawdatasource = glueContext.create_dynamic_frame.from_catalog(
    database = "energy", 
    table_name = "raw_energy_json", 
    transformation_ctx = "rawdatasource",
    push_down_predicate = "(month == '7' and day in ('4','5','6'))",
    additional_options={
        'groupFiles':'inPartition',
        'groupSize':128*1024*1024
    })

In [None]:
rawdatasource.count()

In [None]:
rawdatasource.printSchema()

### 3. Collate
Using repartition, if using scala coalesce would be a better choice. The right number of partitions depends on your data profile and desired output file size.

In [None]:
df = rawdatasource.toDF().repartition(2)
repartitioned = DynamicFrame.fromDF(dataframe=df, glue_ctx=glueContext, name="repartitioned")

### 4. Write to a curated bucket with parquet format. 
Using glueParquet vs Parquet as it is using optimized file item writer. Use either of this two options:

- OPTION 1: Write data into s3. Handle updating partitions externally ( athena alter command or glue crawler):

In [None]:
datasink = glueContext.write_dynamic_frame_from_options(
    frame = repartitioned, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://pablo.data.samples/energy/parquet_from_json/",
        "partitionKeys": ["month","day"]}, 
    format = "glueparquet",
    format_options = {
        "compression": "snappy", 
        "blockSize": block_size, 
        "pageSize" : page_size},
    transformation_ctx = "datasink")

- OPTION 2: Write and create/update catalog table partitions. See for details and limitations: https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html

In [None]:
datasink = glueContext.getSink(connection_type="s3",
                               path="s3://pablo.data.samples/energy/curated_glueparquet/",
                               enableUpdateCatalog=True,
                               updateBehavior="UPDATE_IN_DATABASE",
                               partitionKeys=["month", "day"],
                               transformation_ctx = "datasink")
datasink.setFormat(format="glueparquet", 
                   format_options = {
                        "compression": "snappy", 
                        "blockSize": block_size, 
                        "pageSize" : page_size})
datasink.setCatalogInfo(catalogDatabase="energy", catalogTableName="energy_table_curated_parquet")
datasink.writeFrame(repartitioned)