# Sample - AWS Glue Studio Notebook

In [None]:
%help

# Prepare

## Set Magic

In [19]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2
%%configure
{
    "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false",
    "--datalake-formats": "hudi"
}

Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 2
Setting new number of workers to: 2
Current iam_role is arn:aws:iam::807388292768:role/all-services-admin-role
iam_role has been set to arn:aws:iam::807388292768:role/all-services-admin-role.
The following configurations have been updated: {'--conf': 'spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false', '--datalake-formats': 'hudi'}


## Create Spark Session, Spark Context, Glue Context and Glue Job

In [1]:
import sys

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

conf = (
    SparkConf()
    .setAppName("MyApp")
    .setAll(
        [
            ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
            ("spark.sql.hive.convertMetastoreParquet", "false"),
        ]
    )
)
spark_ses = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
spark_ctx = spark_ses.sparkContext
glue_ctx = GlueContext(spark_ctx)

Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: e2cb8d69-1f59-436a-aabf-3ebdd47af49e
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
--datalake-formats hudi
Waiting for session e2cb8d69-1f59-436a-aabf-3ebdd47af49e to get into ready status...
Session e2cb8d69-1f59-436a-aabf-3ebdd47af49e has been created.



In [2]:
import boto3

boto_ses = boto3.session.Session()
sts_client = boto_ses.client("sts")
aws_account_id = sts_client.get_caller_identity()["Account"]
aws_region = boto_ses.region_name

print(f"aws_account_id = {aws_account_id}")
print(f"aws_region = {aws_region}")

aws_account_id = 807388292768
aws_region = us-east-1


# ETL Logic

## Read Data

In [10]:
pdf_initial = glue_ctx.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [ 
            "s3://807388292768-us-east-1-data/projects/dynamodb_to_datalake/tables/transaction/dynamodb_export_processed/AWSDynamoDB/01690496214046-abc1da61/data/",
        ],
        "recurse": True,
    },
    format="json",
    format_options={"multiline": True},
).toDF()




In [11]:
pdf_initial.printSchema()

root
 |-- id: string (nullable = true)
 |-- account: string (nullable = true)
 |-- create_at: string (nullable = true)
 |-- create_year: string (nullable = true)
 |-- create_month: string (nullable = true)
 |-- create_day: string (nullable = true)
 |-- create_hour: string (nullable = true)
 |-- create_minute: string (nullable = true)
 |-- update_at: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- is_credit: integer (nullable = true)
 |-- note: string (nullable = true)


In [12]:
def show_df(pdf, n: int = 3):
    pdf.show(n, vertical=True, truncate=False)
    
show_df(pdf_initial)

-RECORD 0-----------------------------------------------------------------------
 id            | account:028-584-5611,create_at:2023-07-27T22:12:03.122158+0000 
 account       | 028-584-5611                                                   
 create_at     | 2023-07-27T22:12:03.122158+0000                                
 create_year   | 2023                                                           
 create_month  | 07                                                             
 create_day    | 27                                                             
 create_hour   | 22                                                             
 create_minute | 12                                                             
 update_at     | 2023-07-27T22:12:03.122158+0000                                
 entity        | Wong Inc                                                       
 amount        | 495                                                            
 is_credit     | 0          

In [14]:
pdf_initial.count()

7269


## Transform Data

## Write Data

In [13]:
database = "mydatabase"
table = "transaction"

additional_options={
    "hoodie.table.name": table,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "update_at",
    "hoodie.datasource.write.partitionpath.field": "create_year,create_month,create_day,create_hour,create_minute",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": database,
    "hoodie.datasource.hive_sync.table": table,
    "hoodie.datasource.hive_sync.partition_fields": "create_year,create_month,create_day,create_hour,create_minute",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.mode": "hms",
    "path": f"s3://{aws_account_id}-{aws_region}-data/projects/hudi-poc/databases/{database}/{table}"
}
(
    pdf_initial.write.format("hudi")
    .options(**additional_options)
    .mode("overwrite")
    .save()
)


