### Glue Interactive Notebook 
- magics 
- read data from s3 
- read data from catalog 
- write and create catalog 
- [Glue ETL Programming](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-parquet-home.html) 
- [Glue Create Table Catalog](https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html )
- [Create table by Athenq query](https://github.com/cdk-entest/lakeformation-demo/blob/master/query/create_tsv_table.sql)
- Create table using boto3 SDK

In [None]:
%help

### Parameters and Environment Variables 

In [None]:
# %region ap-southeast-1
%additional_python_modules matplotlib, numpy, pandas 
%idle_timeout 15 
%glue_version 3.0 
%number_of_workers 5
%iam_role arn:aws:iam::212776191100:role/LakeFormationWorkFlowRole

### Import, Create GlueContext, SparkSession 

In [None]:
from awsglue.context import GlueContext
from pyspark.context import SparkContext

In [None]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

In [None]:
# best practice passed by env or arguments
catalog_id = ""
lake_bucket_name = ""
table_name = "amazon_reviews_tsv_2"
database_name = "default"

### Create a Table using Boto3 SDK

In [None]:
import boto3
# create glue client
client = boto3.client(
    region_name="ap-southeast-1", service_name="glue"
)

# create tabe in catalog
resp = client.create_table(
    CatalogId=catalog_id,
    DatabaseName=database_name,
    TableInput={
        "Name": table_name,
        "Description": "test",
        "TableType": "EXTERNAL",
        "Parameters": {"classification": "parquet"},
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "marketplace",
                    "Type": "string",
                },
                {
                    "Name": "customer_id",
                    "Type": "string",
                },
                {
                    "Name": "review_id",
                    "Type": "string",
                },
                {
                    "Name": "product_id",
                    "Type": "string",
                },
                {
                    "Name": "product_parent",
                    "Type": "string",
                },
                {
                    "Name": "product_title",
                    "Type": "string",
                },
                {
                    "Name": "star_rating",
                    "Type": "int",
                },
                {
                    "Name": "helpful_votes",
                    "Type": "int",
                },
                {
                    "Name": "total_votes",
                    "Type": "int",
                },
                {
                    "Name": "vine",
                    "Type": "string",
                },
                {
                    "Name": "verified_purchase",
                    "Type": "string",
                },
                {
                    "Name": "review_headline",
                    "Type": "string",
                },
                {
                    "Name": "review_body",
                    "Type": "string",
                },
                {
                    "Name": "review_date",
                    "Type": "string",
                },
                {
                    "Name": "myyear",
                    "Type": "int",
                },
            ],
            "Location": "s3://{0}/{1}/".format(
                lake_bucket_name, table_name
            ),
            "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "SerdeInfo": {
                "Name": "ParquetHiveSerDe",
                "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
            },
            "Compressed": False,
        },
    },
)

print(resp)

### Read Data From S3 into a DataFrame  

In [None]:
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": "\t",
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://amazon-reviews-pds/tsv/amazon_reviews_us_Sports_v1_00.tsv.gz"],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

In [None]:
S3bucket_node1.show(10)

In [None]:
glueContext.write_dynamic_frame.from_catalog(
    frame=S3bucket_node1, 
    database= "default",
    table_name=table_name,
    transformation_ctx="S3bucket_node3",
)

In [None]:
# # fmt: off
# S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
#     frame=S3bucket_node1,
#     connection_type="s3",
#     format="glueparquet",
#     connection_options={
#         "path": "s3://{0}/parquet/".format(data_lake_bucket),
#         "partitionKeys": ["product_category"],
#         "enableUpdateCatalog": True,
#          "database":"default",
#          "table":"amazon_reviews_parquet_table",
#     },
#     format_options={"compression": "uncompressed"},
#     transformation_ctx="S3bucket_node3",
# )

In [None]:
# dynamicFrame = glueContext.create_dynamic_frame.from_options(
#     connection_type = "s3", 
#     connection_options = {"paths": ["s3://{0}/amazon-review-tsv-parquet/".format(data_lake_bucket)]}, 
#     format = "parquet"
# )
# # dataFrame = spark.read.parquet("s3://s3path/")
# dynamicFrame.show(10)

In [None]:
# # it take about 3 minutes haha 
# S3bucket_node5 = glueContext.getSink(
#     path="s3://{0}/amazon-review-tsv-parquet/".format(data_lake_bucket),
#     connection_type="s3",
#     updateBehavior="UPDATE_IN_DATABASE",
#     partitionKeys=[],
#     # compression="snappy",
#     enableUpdateCatalog=True,
#     transformation_ctx="write_sink",
# )
# S3bucket_node5.setCatalogInfo(
#     catalogDatabase="default", 
#     catalogTableName="amazon_review_tsv_parquet"
# )
# S3bucket_node5.setFormat("glueparquet")
# S3bucket_node5.writeFrame(S3bucket_node1)

### Read From Glue Catalog 

In [None]:
df2 = glueContext.create_dynamic_frame.from_catalog(database="default", table_name=table_name)

In [None]:
df2.show(10)

In [None]:
%stop_session