# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [28]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5
%connections redshift-demo-connection

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session a88fd810-5fc2-4bfa-bdb7-8a999a99f8a5.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session a88fd810-5fc2-4bfa-bdb7-8a999a99f8a5.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 3.0


You are already connected to a glueetl session a88fd810-5fc2-4bfa-bdb7-8a999a99f8a5.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session a88fd810-5fc2-4bfa-bdb7-8a999a99f8a5.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5


You are already connected to a glueetl session a88fd810-5fc2-4bfa-bdb7-8a999a99f8a5.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Connections to be included:
redshift-demo-connection



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [25]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='salesdb', table_name='sales_records_csv')
dyf = DropNullFields.apply(frame=dyf)
dyf.printSchema()

null_fields []
root
|-- id: long
|-- region: string
|-- country: string
|-- item_type: string
|-- sales_channel: string
|-- order_priority: string
|-- order_date: string
|-- order_id: long
|-- ship_date: string
|-- units_sold: long
|-- unit_price: double
|-- unit_cost: double
|-- total_revenue: double
|-- total_cost: double
|-- total_profit: double


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show()

+---+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
| id|              region|             country|      item_type|sales_channel|order_priority|order_date| order_id| ship_date|units_sold|unit_price|unit_cost|total_revenue|total_cost|total_profit|
+---+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  1|Central America a...|Antigua and Barbuda |      Baby Food|       Online|             M|12/20/2013|957081544| 1/11/2014|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|  2|Central America a...|              Panama|         Snacks|      Offline|             C|  7/5/2010|301644504| 7/26/2010|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|  3|              Europe

#### Example: Perform data transformations


In [11]:
sales_df = df.withColumn("Order_Date", to_date(unix_timestamp(col('order_date'), 'MM/dd/yyyy').cast('timestamp'))) \
             .withColumn("Ship_Date", to_date(unix_timestamp(col('ship_date'), 'MM/dd/yyyy').cast('timestamp')))

sales_df.show(10, True)


+---+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
| id|              region|             country|    item_type|sales_channel|order_priority|Order_Date| order_id| Ship_Date|units_sold|unit_price|unit_cost|total_revenue|total_cost|total_profit|
+---+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  1|Central America a...|Antigua and Barbuda |    Baby Food|       Online|             M|2013-12-20|957081544|2014-01-11|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|  2|Central America a...|              Panama|       Snacks|      Offline|             C|2010-07-05|301644504|2010-07-26|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|  3|              Europe|      Cze

#### Group by Region and Country and calculate aggregate metrics


In [22]:
aggregate_df = sales_df.groupBy("Region", "Country", year("order_date").alias('year'), quarter("order_date").alias('quarter')).agg(
    sum("Total_Revenue").alias("Total_Revenue_By_Region_Country"),
    sum("Total_Cost").alias("Total_Cost_By_Region_Country"),
    sum("Total_Profit").alias("Total_Profit_By_Region_Country")
)






#### Show the aggregated data (for demonstration purposes)


In [24]:
aggregate_df.orderBy("year","quarter").show()
aggregate_df.count()

+--------------------+-----------------+----+-------+-------------------------------+----------------------------+------------------------------+
|              Region|          Country|year|quarter|Total_Revenue_By_Region_Country|Total_Cost_By_Region_Country|Total_Profit_By_Region_Country|
+--------------------+-----------------+----+-------+-------------------------------+----------------------------+------------------------------+
|Middle East and N...|           Kuwait|2010|      1|                     1923024.24|                  1200910.86|                     722113.38|
|              Europe|         Slovenia|2010|      1|             402336.24000000005|          270036.38999999996|                     132299.85|
|  Sub-Saharan Africa|     Sierra Leone|2010|      1|                     6481276.65|          3926471.4699999997|            2554805.1799999997|
|              Europe|           Cyprus|2010|      1|                      165676.72|                   103463.58|          

In [20]:
df.count()

5000


In [29]:
dyf = DynamicFrame.fromDF(aggregate_df, glueContext, "dynamic_frame")




#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [30]:
dyf.show()

{"Region": "Sub-Saharan Africa", "Country": "Zambia", "year": 2013, "quarter": 2, "Total_Revenue_By_Region_Country": 1765523.1, "Total_Cost_By_Region_Country": 1005155.13, "Total_Profit_By_Region_Country": 760367.97}
{"Region": "Europe", "Country": "Kosovo", "year": 2012, "quarter": 4, "Total_Revenue_By_Region_Country": 4934580.33, "Total_Cost_By_Region_Country": 3958495.06, "Total_Profit_By_Region_Country": 976085.27}
{"Region": "Europe", "Country": "Spain", "year": 2011, "quarter": 4, "Total_Revenue_By_Region_Country": 4544537.69, "Total_Cost_By_Region_Country": 3633328.41, "Total_Profit_By_Region_Country": 911209.28}
{"Region": "Sub-Saharan Africa", "Country": "Senegal", "year": 2011, "quarter": 3, "Total_Revenue_By_Region_Country": 80527.23, "Total_Cost_By_Region_Country": 59726.52, "Total_Profit_By_Region_Country": 20800.71}
{"Region": "Middle East and North Africa", "Country": "Saudi Arabia", "year": 2013, "quarter": 4, "Total_Revenue_By_Region_Country": 9850970.7, "Total_Cost_By

In [33]:


Redshift_output = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=dyf,
    catalog_connection="redshift-demo-connection",
    connection_options={
        "redshiftTmpDir": "s3://aws-glue-assets-262136919150-us-east-1/temporary/",
        "dbtable": "public.Regionalsales",
        "database":"dev"
       }
)

Py4JJavaError: An error occurred while calling o287.getJDBCSink.
: java.sql.SQLException: [Amazon](500150) Error setting/closing connection: Connection timed out.
	at com.amazon.redshift.client.PGClient.connect(Unknown Source)
	at com.amazon.redshift.client.PGClient.<init>(Unknown Source)
	at com.amazon.redshift.core.PGJDBCConnection.connect(Unknown Source)
	at com.amazon.jdbc.common.BaseConnectionFactory.doConnect(Unknown Source)
	at com.amazon.jdbc.common.AbstractDriver.connect(Unknown Source)
	at com.amazon.redshift.jdbc.Driver.connect(Unknown Source)
	at com.amazonaws.services.glue.util.JDBCWrapper$$anonfun$12.apply(JDBCUtils.scala:968)
	at com.amazonaws.services.glue.util.JDBCWrapper$$anonfun$12.apply(JDBCUtils.scala:964)
	at com.amazonaws.services.glue.util.JDBCWrapper$$anonfun$connectWithSSLAttempt$1$$anonfun$apply$8.apply(JDBCUtils.scala:920)
	at scala.Option.getOrElse(Option.scala:121)
	at com.amazonaws.services.glue.util.JDBCWrapper$$anonfun$connectWithSSLAttempt$1.apply(JDBC