# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [1]:
%help

UsageError: Line magic function `%help` not found.


####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 3e9c8a7e-66d3-4590-ae32-25a0b344e8bb
Applying the following default arguments:
--glue_kernel_version 1.0.2
--enable-glue-datacatalog true
Waiting for session 3e9c8a7e-66d3-4590-ae32-25a0b344e8bb to get into ready status...
Session 3e9c8a7e-66d3-4590-ae32-25a0b344e8bb has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [None]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='grocery-db', table_name='input')
dyf.printSchema()

root
|-- idx: long
|-- transaction_id: string
|-- timestamp: string
|-- product_id: string
|-- category: string
|-- customer_type: string
|-- unit_price: double
|-- quantity: long
|-- total: double
|-- payment_type: string


In [None]:
# Count The Number of Rows in a Dynamic Dataframe
dyf.count()

In [None]:
# Selecting certain fields from a Dynamic DataFrame
dyfSelect = dyf.select_fields(["timestamp", "category"])

# Show top 10
dyfSelect.show(10)

In [None]:
#Drop Fields of Dynamic Frame
dyfDrop = dyf.drop_fields(["transaction_id","payment_type"])

# Show Top 10 rows of dyfCustomerDropFields Dynamic Frame
dyfDrop.show(10)


In [None]:
# Mapping array for column rename fullname -> name
mapping=[("idx", "long", "index", "long")]

# Apply the mapping to rename fullname -> name
dfyMapping = ApplyMapping.apply(
                                frame = dyf,
                                mappings = mapping,
                                transformation_ctx = "applymapping1"
                                )

# show the new dynamic frame with name column
dfyMapping.show(10)

In [None]:
# Filter dynamicFrameCustomers for customers who have the last name Adams
dyfFilter=  Filter.apply(frame = dyf,
                                        f = lambda x: x["category"] in "fruit"
                                    )

# Show the top 10 customers  from the filtered Dynamic frame
dyfFilter.show(10)

#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [None]:
sparkDf = dyf.toDF()
sparkDf.show()

+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
|idx|      transaction_id|          timestamp|          product_id|category|customer_type|unit_price|quantity|total|payment_type|
+---+--------------------+-------------------+--------------------+--------+-------------+----------+--------+-----+------------+
|  0|a1c82654-c52c-45b...|2022-03-02 09:51:38|3bc6c1ea-0198-46d...|   fruit|         gold|      3.99|       2| 7.98|    e-wallet|
|  1|931ad550-09e8-4da...|2022-03-06 10:33:59|ad81b46c-bf38-41c...|   fruit|     standard|      3.99|       1| 3.99|    e-wallet|
|  2|ae133534-6f61-4cd...|2022-03-04 17:20:21|7c55cbd4-f306-4c0...|   fruit|      premium|      0.19|       2| 0.38|    e-wallet|
|  3|157cebd9-aaf0-475...|2022-03-02 17:23:58|80da8348-1707-403...|   fruit|         gold|      0.19|       4| 0.76|    e-wallet|
|  4|a81a6cd3-5e0c-44a...|2022-03-05 14:32:43|7f5e86e6-f06f-45f...|   fruit|        basic|

In [None]:
# Select columns from spark dataframe
dfSelect = sparkDf.select("transaction_id","timestamp")

# show selected
dfSelect.show()


In [None]:
from pyspark.sql.functions import month, year

sparkDf = sparkDf.withColumn("month", month("timestamp")).withColumn("year", year("timestamp"))

# Show DataFrame with new columns
sparkDf.show()


In [None]:
# Drop column from spark dataframe
dfDropCol = sparkDf.drop("product_id","idx")

#show dropped column df
dfDropCol.show()


In [None]:
# Rename column in Spark dataframe
dfRenameCol = sparkDf.withColumnRenamed("idx","index")

#show renamed column dataframe
dfRenameCol.show()


In [None]:
# Group by lastname then print counts of lastname and show
sparkDf.groupBy("month","year").count().show()


In [None]:
# Filter spark DataFrame for customers who have the last name Adams
sparkDf.filter(sparkDf["category"] == "fruit").show()

In [None]:
# Where clause spark DataFrame for customers who have the last name Adams
sparkDf.where("category =='fruit'").show()

In [None]:
from pyspark.sql.functions import year, month, sum

# Selecting the required columns and applying aggregations
resultDf = sparkDf.select(
    year("timestamp").alias("year"),
    month("timestamp").alias("month"),
    "category",
    sum("total").alias("total_amount"),
    sum("quantity").alias("total_quantity")
)

# Grouping by the specified columns
resultDf = resultDf.groupBy("year", "month", "category").agg(
    sum("total_amount").alias("total_amount"),
    sum("total_quantity").alias("total_quantity")
)

resultDf = resultDf.orderBy("year", "month", "category")

resultDf.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Convert PySpark DataFrame to Pandas DataFrame
resultPandas = resultDf.toPandas()

# Plotting total amount and total quantity for each category
plt.figure(figsize=(12, 8))
sns.barplot(x='month', y='total_amount', hue='category', data=resultPandas, ci=None)
plt.title('Total Amount by Category and Month')
plt.xlabel('Month')
plt.ylabel('Total Amount')
plt.legend(title='Category', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()

plt.figure(figsize=(12, 8))
sns.barplot(x='month', y='total_quantity', hue='category', data=resultPandas, ci=None)
plt.title('Total Quantity by Category and Month')
plt.xlabel('Month')
plt.ylabel('Total Quantity')
plt.legend(title='Category', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()

In [None]:
#convert from spark Dataframe to Glue Dynamic DataFrame
# Import Dynamic DataFrame class
from awsglue.dynamicframe import DynamicFrame

#Convert from Spark Data Frame to Glue Dynamic Frame
dyfConvert = DynamicFrame.fromDF(resultDf, glueContext, "convert")

#Show converted Glue Dynamic Frame
dyfConvert.show()

#### Example: Visualize data with matplotlib


#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
# write down the data in converted Dynamic Frame to S3 location.
glueContext.write_dynamic_frame.from_options(
                            frame = dyfConvert,
                            connection_type="s3",
                            connection_options = {"path": "s3://<YOUR_S3_BUCKET_NAME>/write_down_dyf_to_s3"},
                            format = "csv",
                            format_options={
                                "separator": ","
                                },
                            transformation_ctx = "datasink2")