####  Run this cell to set up and start your interactive session.


In [7]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 20

%additional_python_modules delta-spark
%%configure
{
"conf":"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"datalake-formats":"delta"
}


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 20
Additional python modules to be included:
delta-spark
The following configurations have been updated: {'conf': 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog', 'datalake-formats': 'delta'}


In [1]:
import sys
import json 
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# load spark functions, types
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from delta.tables import *
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 20
Idle Timeout: 2880
Session ID: a6d3a1a5-7533-42b5-995d-09ef3bddd873
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
--datalake-formats delta
--additional-python-modules delta-spark
Waiting for session a6d3a1a5-7533-42b5-995d-09ef3bddd873 to get into ready status...
Session a6d3a1a5-7533-42b5-995d-09ef3bddd873 has been created.



In [2]:
bucket_name = "delta-poc-primary-061588638285"
bucket_prefix = ""
database_name = "delta_poc"
database_prefix = f"{bucket_prefix}/{database_name}"
database_location = f"s3://{bucket_name}/{database_prefix}/"
table_name = "customers"
table_prefix = f"{database_prefix}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}/"




In [3]:
spark.sql(f"CREATE DATABASE {database_name} LOCATION '{table_location}'")

DataFrame[]


In [4]:
# Define the schema of the raw customer table
customer_schema = StructType(
            [ StructField("c_customer_sk",IntegerType(),True),
              StructField("c_customer_id",StringType(),True),
              StructField("c_current_cdemo_sk",IntegerType(),True),
              StructField("c_current_hdemo_sk",IntegerType(),True),
              StructField("c_current_addr_sk",IntegerType(),True),
              StructField("c_first_shipto_date_sk",IntegerType(),True),
              StructField("c_first_sales_date_sk",IntegerType(),True),
              StructField("c_salutation",StringType(),True),
              StructField("c_first_name",StringType(),True),
              StructField("c_last_name",StringType(),True),
              StructField("c_preferred_cust_flag",StringType(),True),
              StructField("c_birth_day",IntegerType(),True),
              StructField("c_birth_month",IntegerType(),True),
              StructField("c_birth_year",IntegerType(),True),
              StructField("c_birth_country", StringType(), True),
              StructField("c_login", StringType(), True),
              StructField("c_email_address", StringType(), True),
              StructField("c_last_review_date_sk", IntegerType(), True)
              ]
              )




In [5]:
df_customer = spark.read.schema(customer_schema).\
                format("csv").options(header=True,delimiter="|").\
                load("s3://redshift-downloads/TPC-DS/2.13/100GB/customer/")
print(f'df_customer - rows: {df_customer.count()}, columns: {len(df_customer.columns)}')

df_customer - rows: 1999999, columns: 18


In [6]:
df_customer.write.format("delta"). \
  mode("overwrite"). \
  save(f"s3://{bucket_name}/{bucket_prefix}{table_name}")




In [8]:
f"s3://{bucket_name}/{bucket_prefix}{table_name}/"

's3://delta-poc-primary-061588638285/customers/'


In [9]:
# create table in metastore
query = f"""
CREATE TABLE {database_name}.{table_name}
USING delta
LOCATION 's3://delta-poc-primary-061588638285/customers/'
"""

spark.sql(query)

DataFrame[]


In [10]:
%%sql
USE delta_poc

++
||
++
++


In [11]:
%%sql
SHOW TABLES;

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|delta_poc|customers|      false|
+---------+---------+-----------+


In [15]:
deltaTable = DeltaTable.forPath(spark, 's3://delta-poc-primary-061588638285/customers/')




In [16]:
fullHistoryDF = deltaTable.history()   # display the table history
fullHistoryDF.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2024-09-04 18:04:38|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|  Serializable|        false|{numFiles -> 1, n...|        null|Apache-Spark/3.3....|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+


In [17]:
df_customer_100B = spark.read.schema(customer_schema).\
                format("csv").options(header=True,delimiter="|").\
                load("s3://redshift-downloads/TPC-DS/1TB/customer/")





In [18]:
print(f'df_customer - rows: {df_customer_100B.count()}, columns: {len(df_customer_100B.columns)}')

df_customer - rows: 11999960, columns: 18


In [19]:
df_customer_100B.write.format("delta"). \
  mode("append"). \
  save(f"s3://{bucket_name}/{bucket_prefix}/{table_name}")




In [34]:
11999960+1999999 =  13999959

13999959


In [20]:
fullHistoryDF = deltaTable.history()   # display the table history
fullHistoryDF.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      1|2024-09-04 18:07:32|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          0|  Serializable|         true|{numFiles -> 40, ...|        null|Apache-Spark/3.3....|
|      0|2024-09-04 18:04:38|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|  Serializable|        false|{numFiles -> 1, n...|        null|Apache-Spark/3.3....|
+-------+------

In [21]:
update_Df = df_customer.filter(df_customer.c_birth_country=='UNITED STATES')




In [22]:
update_Df = update_Df.withColumn("c_birth_country",lit("US"))




In [23]:
update_Df.select(['c_customer_id']).distinct().count()

9206


In [24]:
deltaTable.alias("events").merge(
    update_Df.alias("updates"),
    "events.c_customer_id = updates.c_customer_id") \
  .whenMatchedUpdate(set = { "c_birth_country" : "updates.c_birth_country" } ) \
  .execute()




In [25]:
fullHistoryDF = deltaTable.history()   # display the table history
fullHistoryDF.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      2|2024-09-04 18:09:13|  null|    null|    MERGE|{predicate -> (ev...|null|    null|     null|          1|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|      1|2024-09-04 18:07:32|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          0|  Serializable|         true|{numFiles -> 40, ...|        null|Apache-Spark/3.3....|
|      0|2024-0

In [27]:
deltaTable.delete("c_birth_country is null")  #498624




In [28]:
df = deltaTable.optimize().executeCompaction()




In [31]:
df.show(truncate=False)

+---------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|path                                         |metrics                                                                                                                                                                              |
+---------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3://delta-poc-primary-061588638285/customers|{1, 41, {729470213, 729470213, 7.29470213E8, 1, 729470213}, {4916639, 105952518, 1.7894429756097563E7, 41, 733671620}, 1, null, 1, 41, 0, false, 0, 0, 1725473499419, 0, 76, 0, null}|
+---------------------------------------------+---------------------------------

In [32]:
fullHistoryDF = deltaTable.history()   # display the table history
fullHistoryDF.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      4|2024-09-04 18:11:39|  null|    null| OPTIMIZE|{predicate -> [],...|null|    null|     null|          3|SnapshotIsolation|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|
|      3|2024-09-04 18:09:57|  null|    null|   DELETE|{predicate -> ["(...|null|    null|     null|          2|     Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|


In [33]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
deltaTable.vacuum(0)

DataFrame[]
