### Glue Interactive Notebook 
- magics 
- read data from s3 
- read data from catalog 
- write and create catalog 
- [Glue ETL Programming](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-parquet-home.html) 
- [Glue Create Table Catalog](https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html )
- [Create table by Athenq query](https://github.com/cdk-entest/lakeformation-demo/blob/master/query/create_tsv_table.sql)
- Create table using boto3 SDK

In [4]:
%help


# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0 and 3.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X and G.2X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



### Parameters, and Additional Libraries 

In [10]:
# %region ap-southeast-1
%additional_python_modules matplotlib, numpy, pandas 
%idle_timeout 60
%glue_version 3.0 
%number_of_workers 5
%iam_role arn:aws:iam::212776191100:role/RoleForGlueNotebook

Additional python modules to be included:
matplotlib
numpy
pandas
Current idle_timeout is 2880 minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 3.0
Previous number of workers: 5
Setting new number of workers to: 5
Current iam_role is arn:aws:iam::212776191100:role/OpsRole
iam_role has been set to arn:aws:iam::212776191100:role/RoleForGlueNotebook.


In [1]:
from awsglue.context import GlueContext
from pyspark.context import SparkContext

Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: e2814ec9-c7f5-48ce-91a5-ba96fe01112b
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.2
--enable-glue-datacatalog true
--additional-python-modules matplotlib,numpy,pandas
Waiting for session e2814ec9-c7f5-48ce-91a5-ba96fe01112b to get into ready status...
Session e2814ec9-c7f5-48ce-91a5-ba96fe01112b has been created.



In [2]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)




In [3]:
# best practice passed by env or arguments
catalog_id = "212776191100"
lake_bucket_name = "athena-query-result-athena-glue-access-demo"
table_name = "amazon_reviews_tsv_88"
database_name = "default"




### Create a Table using Boto3 SDK

In [4]:
import boto3
# create glue client
client = boto3.client(
    region_name="ap-southeast-1", service_name="glue"
)

# create tabe in catalog
resp = client.create_table(
    CatalogId=catalog_id,
    DatabaseName=database_name,
    TableInput={
        "Name": table_name,
        "Description": "test",
        "TableType": "EXTERNAL",
        "Parameters": {"classification": "parquet"},
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "marketplace",
                    "Type": "string",
                },
                {
                    "Name": "customer_id",
                    "Type": "string",
                },
                {
                    "Name": "review_id",
                    "Type": "string",
                },
                {
                    "Name": "product_id",
                    "Type": "string",
                },
                {
                    "Name": "product_parent",
                    "Type": "string",
                },
                {
                    "Name": "product_title",
                    "Type": "string",
                },
                {
                    "Name": "product_category",
                    "Type": "string",
                },
                {
                    "Name": "star_rating",
                    "Type": "string",
                },
                {
                    "Name": "helpful_vote",
                    "Type": "string",
                },
                {
                    "Name": "total_vote",
                    "Type": "string",
                },
                {
                    "Name": "vine",
                    "Type": "string",
                },
                {
                    "Name": "verified_purchase",
                    "Type": "string",
                },
                {
                    "Name": "review_headline",
                    "Type": "string",
                },
                {
                    "Name": "review_body",
                    "Type": "string",
                },
                {
                    "Name": "myyear",
                    "Type": "string",
                },
            ],
            "Location": "s3://{0}/{1}/".format(
                lake_bucket_name, table_name
            ),
            "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "SerdeInfo": {
                "Name": "ParquetHiveSerDe",
                "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
            },
            "Compressed": False,
        },
    },
)

print(resp)

{'ResponseMetadata': {'RequestId': 'c73307f8-039d-478c-935a-d301a9af46e4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Mon, 15 May 2023 07:11:11 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': 'c73307f8-039d-478c-935a-d301a9af46e4'}, 'RetryAttempts': 0}}


### Read Data From S3 into a DataFrame  

In [5]:
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": "\t",
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://amazon-reviews-pds/tsv/amazon_reviews_us_Sports_v1_00.tsv.gz"],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)




In [6]:
S3bucket_node1.show(10)

{"marketplace": "US", "customer_id": "48945260", "review_id": "R1WBPB8MDCCN8F", "product_id": "B012P7UPSM", "product_parent": "409940130", "product_title": "Chicago Blackhawks Adult Cuff Knit Beanie w/ Pom One Size Fits All NHL Authentic Hat Cap - OSFA", "product_category": "Sports", "star_rating": "5", "helpful_votes": "0", "total_votes": "0", "vine": "N", "verified_purchase": "N", "review_headline": "LOVE IT. 6 stars!", "review_body": "Bought this last winter I love it! I'm female and the hat is so comfy and fits over my ears perfectly to keep me warm! Love love love this hat! Can't wait for this winter  :)", "review_date": "2015-08-31"}
{"marketplace": "US", "customer_id": "5782091", "review_id": "R32M0YEWV77XG8", "product_id": "B001GQ3VHG", "product_parent": "657746679", "product_title": "Copag Poker Size Regular Index 1546 Playing Cards 2 decks (Black Gold Setup)", "product_category": "Sports", "star_rating": "5", "helpful_votes": "1", "total_votes": "1", "vine": "N", "verified_pu

### Enforce Schema and Data Type using PySpark DataFrame 
Glue Dynamic DataFrame is a wrapper of Spark DataFrame: 
- separator => delimiter 
- withHeader => header 

In [7]:
spark_session = glueContext.spark_session




In [8]:
from pyspark.sql.types import StructType, StringType, IntegerType 

schema = StructType() \
      .add("marketplace",StringType(),True) \
      .add("customer_id",StringType(),True) \
      .add("review_id",StringType(),True) \
      .add("product_id",StringType(),True) \
      .add("product_parent",StringType(),True) \
      .add("product_title",StringType(),True) \
      .add("product_category",StringType(),True) \
      .add("star_rating",StringType(),True) \
      .add("helpful_vote",StringType(),True) \
      .add("total_vote",StringType(),True) \
      .add("vine",StringType(),True) \
      .add("verified_purchase",StringType(),True) \
      .add("review_headline",StringType(),True) \
      .add("review_body",StringType(),True) \
      .add("myyear",StringType(),True)




In [9]:
df = spark_session.read.format("csv")\
.option("header", False)\
.option("delimiter", "\t")\
.option("quote", '"')\
.schema(schema)\
.load("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Sports_v1_00.tsv.gz")

# df.selectExpr("cast(star_rating as int) star_rating")
# df.selectExpr("cast(helpful_vote as int) helpful_vote")
# df.selectExpr("cast(total_vote as int) total_vote")

print(df.schema)

StructType(List(StructField(marketplace,StringType,true),StructField(customer_id,StringType,true),StructField(review_id,StringType,true),StructField(product_id,StringType,true),StructField(product_parent,StringType,true),StructField(product_title,StringType,true),StructField(product_category,StringType,true),StructField(star_rating,StringType,true),StructField(helpful_vote,StringType,true),StructField(total_vote,StringType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline,StringType,true),StructField(review_body,StringType,true),StructField(myyear,StringType,true)))


In [10]:
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating| helpful_vote| total_vote|vine|verified_purchase|     review_headline|         review_body|     myyear|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
|         US|   48945260|R1WBPB8MDCCN8F|B012P7UPSM|     409940130|Chicago Blackhawk...|          Sports|          5|    

### Write to Table's underlying Data 

In [11]:
from awsglue.dynamicframe import DynamicFrame
glue_df = DynamicFrame.fromDF(df, glueContext, "GlueDF")




In [12]:
glueContext.write_dynamic_frame.from_catalog(
    frame=glue_df, 
    database= "default",
    table_name=table_name,
    transformation_ctx="S3bucket_node3",
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f4351de7b10>


In [None]:
# # fmt: off
# S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
#     frame=S3bucket_node1,
#     connection_type="s3",
#     format="glueparquet",
#     connection_options={
#         "path": "s3://{0}/parquet/".format(data_lake_bucket),
#         "partitionKeys": ["product_category"],
#         "enableUpdateCatalog": True,
#          "database":"default",
#          "table":"amazon_reviews_parquet_table",
#     },
#     format_options={"compression": "uncompressed"},
#     transformation_ctx="S3bucket_node3",
# )

In [None]:
# dynamicFrame = glueContext.create_dynamic_frame.from_options(
#     connection_type = "s3", 
#     connection_options = {"paths": ["s3://{0}/amazon-review-tsv-parquet/".format(data_lake_bucket)]}, 
#     format = "parquet"
# )
# # dataFrame = spark.read.parquet("s3://s3path/")
# dynamicFrame.show(10)

In [None]:
# # it take about 3 minutes haha 
# S3bucket_node5 = glueContext.getSink(
#     path="s3://{0}/amazon-review-tsv-parquet/".format(data_lake_bucket),
#     connection_type="s3",
#     updateBehavior="UPDATE_IN_DATABASE",
#     partitionKeys=[],
#     # compression="snappy",
#     enableUpdateCatalog=True,
#     transformation_ctx="write_sink",
# )
# S3bucket_node5.setCatalogInfo(
#     catalogDatabase="default", 
#     catalogTableName="amazon_review_tsv_parquet"
# )
# S3bucket_node5.setFormat("glueparquet")
# S3bucket_node5.writeFrame(S3bucket_node1)

### Read From Glue Catalog 

In [13]:
df2 = glueContext.create_dynamic_frame.from_catalog(database="default", table_name=table_name)




In [11]:
df2.show(10)

### Plot in Glue Notebook 

In [None]:
import matplotlib.pyplot as plt
fig,axe = plt.subplots(1,1)
axe.plot([1,2,3,4,5],'b')
%matplot plt

In [None]:
%stop_session