# JSONからParquetに変換

割り当てられた12桁のAccountIDを設定してください

In [None]:
accountid = 'xxxxxxxxxxxx'

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format

### Initialize

In [None]:
## initialize
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init('sh10sales_parquet')

### DataSource

In [None]:
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "workshop", 
    table_name = "json_sales",
    transformation_ctx = "datasource0")

In [None]:
print("Count: ", datasource0.count() )

In [None]:
datasource0.printSchema()

### Convert 
Convert to standard Spark DataFrame to do trasformation to be continued

In [None]:
df = datasource0.toDF()

For large data sets, try to cache the data will accelerate later execution.

In [None]:
df.cache()

### Repatition

In [None]:
yearAddedDf = df.withColumn("year", year(df.time_id))
monthAddedDf = yearAddedDf.withColumn("month", month(yearAddedDf.time_id))
yyyymmAddedDf = monthAddedDf.withColumn("yyyymm", date_format(monthAddedDf.time_id, 'yyyyMM'))

repartitionedDf = yyyymmAddedDf.repartition("yyyymm")
droppedDf = repartitionedDf.drop("yyyymm")

In [None]:
castedDf = droppedDf.withColumn("prod_id", droppedDf.prod_id.cast("decimal(38,0)")) \
    .withColumn("cust_id", droppedDf.cust_id.cast("decimal(38,0)")) \
    .withColumn("time_id", droppedDf.time_id.cast("timestamp")) \
    .withColumn("channel_id", droppedDf.channel_id.cast("decimal(38,0)")) \
    .withColumn("promo_id", droppedDf.promo_id.cast("decimal(38,0)")) \
    .withColumn("quantity_sold", droppedDf.quantity_sold.cast("decimal(38,2)")) \
    .withColumn("seller", droppedDf.seller.cast("int")) \
    .withColumn("fulfillment_center", droppedDf.fulfillment_center.cast("int")) \
    .withColumn("courier_org", droppedDf.courier_org.cast("int")) \
    .withColumn("tax_country", droppedDf.tax_country.cast("varchar(3)")) \
    .withColumn("tax_region", droppedDf.tax_region.cast("varchar(3)")) \
    .withColumn("amount_sold", droppedDf.amount_sold.cast("decimal(38,2)")) \
    .withColumn("year", droppedDf.year.cast("int")) \
    .withColumn("month", droppedDf.month.cast("int"))

### Write in S3
S3にYear/monthのパーティショニングしたデータを出力

In [None]:
castedDf.write.partitionBy(  ["year", "month"]).mode("overwrite").parquet(
    "s3://bigdata-handson-{accountid}/data/parquet/sh10/sales",compression='snappy')

In [None]:
job.commit()