# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [14]:
%idle_timeout 60
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2
%connections "adriano_redshift_cluster"

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session b048a821-d526-4688-91e4-7c118e856076.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 60 minutes.
idle_timeout has been set to 60 minutes.


You are already connected to a glueetl session b048a821-d526-4688-91e4-7c118e856076.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 3.0


You are already connected to a glueetl session b048a821-d526-4688-91e4-7c118e856076.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session b048a821-d526-4688-91e4-7c118e856076.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 2
Setting new number of workers to: 2


You are already connected to a glueetl session b048a821-d526-4688-91e4-7c118e856076.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Connections to be included:
adriano_redshift_cluster



#### Create a DynamicFrame from the target redshift table in the AWS Glue Data Catalog and display its schema


In [20]:
tmp_dir = "s3://adriano-datalake-us-east-1/raw/temp_dir/"
dyf_target = glueContext.create_dynamic_frame.from_catalog(database='adriano-redshift', table_name='dev_public_category_1', redshift_tmp_dir=tmp_dir)
dyf_target.printSchema()

root
|-- catid: int
|-- catgroup: string
|-- catname: string
|-- catdesc: string
|-- date_modified: date


#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [21]:
df_target = dyf_target.toDF()
df_target.show()

+-----+--------+-------------+--------------------+-------------+
|catid|catgroup|      catname|             catdesc|date_modified|
+-----+--------+-------------+--------------------+-------------+
|   10|Concerts|         Jazz|All jazz singers ...|   2021-03-11|
|   13|Concerts|        House|electronic dance ...|   2023-03-05|
|    4|  Sports|          NBA|National Basketba...|   2021-03-11|
|   11|Concerts|    Classical|All symphony, con...|   2021-03-11|
|    6|   Shows|     Musicals|     Musical theatre|   2021-03-11|
|    7|   Shows|        Plays|All non-musical t...|   2021-03-11|
|    8|   Shows|        Opera|All opera and lig...|   2021-03-11|
|   12|Concerts|Electro Swing|Mix of Jazz and E...|   2023-03-05|
|    5|  Sports|          MLS| Major League Soccer|   2021-03-11|
|    1|  Sports|          MLB|Major League Base...|   2021-03-11|
|    2|  Sports|          NHL|National Hockey L...|   2021-03-11|
|    9|Concerts|  Country pop| a fusion genre o...|   2023-03-05|
|    3|  S

#### Read the data in into a DynamicFrame from a csv file in Amazon S3


In [17]:
s3_path = 's3://adriano-datalake-us-east-1/raw/categories/categories.csv'
dyf_categories = glueContext.create_dynamic_frame_from_options(connection_type= 's3',
                                                               connection_options={"paths": [s3_path]},
                                                               format='csv', format_options = {"withHeader": True, "optimizePerformance": True})





#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [18]:
df_categories = dyf_categories.toDF()
df_categories.show()

+------+--------+-------------+--------------------+--------------+
|﻿catid|catgroup|      catname|             catdesc|date_modified
+------+--------+-------------+--------------------+--------------+
|     9|Concerts|  Country pop| a fusion genre o...|   2023-03-05
|    13|Concerts|        House|electronic dance ...|   2023-03-05
|    12|Concerts|Electro Swing|Mix of Jazz and E...|   2023-03-05
+------+--------+-------------+--------------------+--------------+


#### Upsert records to existing target redshift table using a staging table

In [19]:
pre_query = """drop table if exists public.stage_table_category_1;
create table public.stage_table_category_1 as select * from public.category_1 where 1=2;"""
post_query = """begin;delete from public.category_1 using public.stage_table_category_1 where public.stage_table_category_1.catid = public.category_1.catid;
insert into public.category_1 select * from public.stage_table_category_1;
drop table public.stage_table_category_1; end;"""
RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=dyf_categories,
    catalog_connection="adriano_redshift_cluster",
    connection_options={
        "database": "dev",
        "dbtable": "public.stage_table_category_1",
        "preactions": pre_query,
        "postactions": post_query,
    },
    redshift_tmp_dir=tmp_dir,
    transformation_ctx="upsert_to_redshift",
)


