# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
import pyspark.sql.functions as F
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import json
import pandas as pd
from pandas import json_normalize

from pyspark.sql import SparkSession

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::252017777664:role/service-role/AWSGlueServiceRole-mailchimp
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 326028c5-00ed-447d-a44b-6b8d90af0482
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
Waiting f

In [10]:
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, StructType, DoubleType




In [3]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [5]:
# dataframe = glueContext.create_dynamic_frame.from_catalog(database = "mailchimp", table_name = "campaign", transformation_ctx = "dataframe")

dataframe = spark.read.format("csv").option("header","true").option('escape', '"').option("multiLine","true").load("s3://datalakemb-ops/airbyte/Mailchimp/campaigns/2023_07_25_1690267628759_0.csv")
# Print schema to see column names
dataframe.printSchema()

root
 |-- _airbyte_ab_id: string (nullable = true)
 |-- _airbyte_emitted_at: string (nullable = true)
 |-- ab_split_opts: string (nullable = true)
 |-- archive_url: string (nullable = true)
 |-- content_type: string (nullable = true)
 |-- create_time: string (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- emails_sent: string (nullable = true)
 |-- id: string (nullable = true)
 |-- long_archive_url: string (nullable = true)
 |-- needs_block_refresh: string (nullable = true)
 |-- parent_campaign_id: string (nullable = true)
 |-- recipients: string (nullable = true)
 |-- report_summary: string (nullable = true)
 |-- resendable: string (nullable = true)
 |-- rss_opts: string (nullable = true)
 |-- send_time: string (nullable = true)
 |-- settings: string (nullable = true)
 |-- social_card: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tracking: string (nullable = true)
 |-- type: string (nullable = true)
 |-- variate_settings: string (nullable = 

In [10]:
# dataframe.toDF()

IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (11): emails_sent, report_summary, send_time, settings, social_card, status, tracking, type, variate_settings, web_id, enabled
New column names (0): 


In [6]:
columns_to_remove = ["_airbyte_ab_id", "_airbyte_emitted_at", "archive_url", "ab_split_opts", "content_type", "create_time", "id", "long_archive_url", "needs_block_refresh", "parent_campaign_id", "resendable", "rss_opts", "recipients"]
dataframe = dataframe.drop(*columns_to_remove)




In [7]:
dataframe.show()

+--------------------+-----------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|     delivery_status|emails_sent|      report_summary|           send_time|        settings|         social_card|              status|      tracking|                type|    variate_settings|              web_id|
+--------------------+-----------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|"{""enabled"":fal...|       1315|""list_is_active"...|""recipient_count...| "{""opens"":856|""unique_opens"":237|""open_rate"":0.1...| ""clicks"":23|""subscriber_clic...|""click_rate"":0....|""ecommerce"":{""...|
|"{""enabled"":fal...|       1288|""list_is_active"...|""recipient_count...|"{""opens"":1462|""unique_opens"":399|""open_rate"":0.3...| ""clicks

In [7]:
json_schema_delivery_status = StructType([
    StructField("enabled", BooleanType())
])
dataframe = dataframe.withColumn('delivery_status', F.from_json('delivery_status', json_schema_delivery_status)).select('*', 'delivery_status.*').drop('delivery_status')




In [8]:
dataframe.show()

+-----------+--------------------+--------------------+--------------------+-----------+------+--------------------+-------+----------------+-------+-------+
|emails_sent|      report_summary|           send_time|            settings|social_card|status|            tracking|   type|variate_settings| web_id|enabled|
+-----------+--------------------+--------------------+--------------------+-----------+------+--------------------+-------+----------------+-------+-------+
|       1315|{"opens":856,"uni...|2016-08-29T14:20:...|{"subject_line":"...|       null|  sent|{"opens":true,"ht...|regular|            null| 464221|  false|
|       1288|{"opens":1462,"un...|2016-12-07T20:15:...|{"subject_line":"...|       null|  sent|{"opens":true,"ht...|regular|            null| 675177|  false|
|       1242|{"opens":1949,"un...|2017-01-05T20:59:...|{"subject_line":"...|       null|  sent|{"opens":true,"ht...|regular|            null| 718509|  false|
|        156|{"opens":292,"uni...|2017-02-28T13:35:.

In [16]:
json_schema_report_summary = StructType([
    StructField("opens", IntegerType()),
    StructField("unique_opens", IntegerType()),
    StructField("open_rate", DoubleType()),
    StructField("clicks", IntegerType()),
    StructField("subscriber_clicks", IntegerType()),
    StructField("click_rate", DoubleType()),
    StructField("ecommerce", StructType([
        StructField("total_orders", IntegerType()),
        StructField("total_spent", DoubleType()),
        StructField("total_revenue", DoubleType())
    ]))
])

# Convert the 'report_summary' column from string to json
dataframe_df = dataframe.withColumn('report_summary', F.from_json('report_summary', json_schema_report_summary)).select('*', 'report_summary.*', 'report_summary.ecommerce.*').drop('report_summary', 'ecommerce')
dataframe_df.show()

+-----------+--------------------+--------------------+-----------+------+--------------------+-------+----------------+-------+-------+-----+------------+-------------------+------+-----------------+--------------------+------------+-----------+-------------+
|emails_sent|           send_time|            settings|social_card|status|            tracking|   type|variate_settings| web_id|enabled|opens|unique_opens|          open_rate|clicks|subscriber_clicks|          click_rate|total_orders|total_spent|total_revenue|
+-----------+--------------------+--------------------+-----------+------+--------------------+-------+----------------+-------+-------+-----+------------+-------------------+------+-----------------+--------------------+------------+-----------+-------------+
|       1315|2016-08-29T14:20:...|{"subject_line":"...|       null|  sent|{"opens":true,"ht...|regular|            null| 464221|  false|  856|         237|0.18573667711598746|    23|               10|0.007836990595611

In [38]:
dataframe.select(col("recipients")).show(truncate=False)
# Let's assume the schema of your JSON string. You have to replace it with the actual schema.
json_schema = StructType([
    StructField('list_id', StringType()),
    StructField("list_is_active", BooleanType()),
    StructField("list_name", StringType()),
    StructField("segment_text", StringType()),
    StructField("recipient_count", IntegerType())
])

# Parse 'recipients' column from JSON string to more structured data
dataframe2 = dataframe.withColumn('recipients', from_json(col('recipients'), json_schema))
# dataframe2.show()

AnalysisException: cannot resolve 'from_json(recipients)' due to data type mismatch: argument 1 requires string type, however, 'recipients' is of struct<list_id:string> type.;
'Project [_airbyte_ab_id#259, _airbyte_emitted_at#260, ab_split_opts#261, archive_url#262, content_type#263, create_time#264, delivery_status#265, emails_sent#266, id#267, long_archive_url#268, needs_block_refresh#269, parent_campaign_id#270, from_json(StructField(list_id,StringType,true), StructField(list_is_active,BooleanType,true), StructField(list_name,StringType,true), StructField(segment_text,StringType,true), StructField(recipient_count,IntegerType,true), recipients#785, Some(UTC)) AS recipients#999, report_summary#272, resendable#273, rss_opts#274, send_time#275, settings#276, social_card#277, status#278, tracking#279, type#280, variate_settings#281, web_id#282, list_id#810]
+- Project [_airbyte_ab_id#259, _airbyte_emitted_at#260, ab_split_opts#261, archive_url#262, content_type#263, create_time#264, deli

In [12]:
dataframe_df.show()

+--------------------+-------------------+-------------+--------------------+------------+--------------------+--------------------+-----------+----------+--------------------+-------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|      _airbyte_ab_id|_airbyte_emitted_at|ab_split_opts|         archive_url|content_type|         create_time|     delivery_status|emails_sent|        id|    long_archive_url|needs_block_refresh|parent_campaign_id|          recipients|      report_summary|          resendable|            rss_opts|           send_time|        settings|         social_card|              status|      tracking|                type|    variate_settings|              web_id|
+--------------------+-------------------+-------------+--------------------+-------

#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [16]:
# dataframe_df = dataframe.toDF()
# dataframe_df = dataframe_df.na.drop(subset=["report_summary", "send_time"])
# dataframe = DynamicFrame.fromDF(dataframe_df, glueContext, "dataframe")
# dataframe_df = dataframe.na.drop(subset=["report_summary", "send_time"])
# dataframe_df.show()

+--------------------+-------------------+-------------+--------------------+------------+--------------------+--------------------+-----------+----------+--------------------+-------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|      _airbyte_ab_id|_airbyte_emitted_at|ab_split_opts|         archive_url|content_type|         create_time|     delivery_status|emails_sent|        id|    long_archive_url|needs_block_refresh|parent_campaign_id|          recipients|      report_summary|          resendable|            rss_opts|           send_time|        settings|         social_card|              status|      tracking|                type|    variate_settings|              web_id|
+--------------------+-------------------+-------------+--------------------+-------

In [18]:
ecommerce_schema = StructType([
    StructField("total_orders", IntegerType()),
    StructField("total_spent", IntegerType()),
    StructField("total_revenue", IntegerType())
])

report_summary_schema = StructType([
    StructField("opens", IntegerType()),
    StructField("unique_opens", IntegerType()),
    StructField("clicks", IntegerType()),
    StructField("subscriber_clicks", IntegerType()),
    StructField("click_rate", IntegerType()),
    StructField("ecommerce", ecommerce_schema)
])

# Convert the 'report_summary' column from string to json
dataframe_df = dataframe_df.withColumn('report_summary', from_json(col('report_summary'), report_summary_schema))




In [21]:
dataframe = dataframe_df.select(
    col("id"), 
    col("report_summary.opens"),
    col("report_summary.unique_opens"),
    col("report_summary.clicks"),
    col("report_summary.subscriber_clicks"),
    col("report_summary.click_rate"),
    col("report_summary.ecommerce.total_orders"),
    col("report_summary.ecommerce.total_spent"),
    col("report_summary.ecommerce.total_revenue")
)




In [23]:
dataframe_df.show()

+--------------------+-------------------+-------------+--------------------+------------+--------------------+--------------------+-----------+----------+--------------------+-------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|      _airbyte_ab_id|_airbyte_emitted_at|ab_split_opts|         archive_url|content_type|         create_time|     delivery_status|emails_sent|        id|    long_archive_url|needs_block_refresh|parent_campaign_id|          recipients|      report_summary|          resendable|            rss_opts|           send_time|        settings|         social_card|              status|      tracking|                type|    variate_settings|              web_id|
+--------------------+-------------------+-------------+--------------------+-------

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)