# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X, G.2X, G.4X and G.8X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::755670014224:role/aws-glue-sessions-ii
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 123f6aa8-8299-43e3-8173-c30477d891b0
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 123f6aa8-8299-43e3-8173-c30477d891b0 to get into ready status...
Session 123f6aa8-8299-43e3-8173-c30477d891b0 has been created.



In [2]:
import pyspark
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType




# Define the context

In [3]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




# Reading data from S3

In [11]:
dir_data_customers = r's3://app-planejamento-estrategico/data/tutorial_johnny/customers/customers.csv'




## Define the schema

In [12]:
custom_schema_customers = StructType([
    StructField("customerid", IntegerType(), True),
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("fullname", StringType(), True)
])




## Reading the file

In [13]:
def describe_dataframe(data, n_rows=5):
    
    # VALIDATE IF DATA IS PYSPARK DATAFRAME
    if isinstance(data, pyspark.sql.DataFrame):
        
        print("PYSPARK DATAFRAME")
        print("-"*50)
        
        # SHOW DATA
        print("SHOWING DATA")
        data.show(n_rows)
        
        # COUNT ROWS
        print("COUNTING ROWS")
        print(data.count())
        
        # LIST COLUMNS
        print("LISTING COLUMNS")
        print(data.columns)
        
        # DESCRIBING DATA
        print("DESCRIBING DATA")
        data.describe().show()
        
    elif isinstance(data, pd.DataFrame):
        
        print("PANDAS DATAFRAME")
        print("-"*50)
        
        # SHOW DATA
        print("SHOWING DATA")
        print(data.head(n_rows))
        
        # COUNT ROWS
        print("COUNTING ROWS")
        print(len(data))
        
        # LIST COLUMNS
        print("LISTING COLUMNS")
        print(data.columns)
        
        # DESCRIBING DATA
        print("DESCRIBING DATA")
        print(data.describe())
        
    else:
        print("INVALID DATA TYPE")




## SPARK

In [14]:
df_customers = spark.read.csv(dir_data_customers, 
                              header=True, 
                              schema=custom_schema_customers)




In [15]:
describe_dataframe(data=df_customers)

PYSPARK DATAFRAME
--------------------------------------------------
SHOWING DATA
+----------+---------+-----------+----------------+
|customerid|firstname|   lastname|        fullname|
+----------+---------+-----------+----------------+
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|
+----------+---------+-----------+----------------+
only showing top 5 rows

COUNTING ROWS
634
LISTING COLUMNS
['customerid', 'firstname', 'lastname', 'fullname']
DESCRIBING DATA
+-------+------------------+---------+-----------+------------+
|summary|        customerid|firstname|   lastname|    fullname|
+-------+------------------+---------+-----------+------------+
|  count|               634|      634|        634|         634|
|   mean|1039.7917981072555|     null|       null|        null|
| s

## PANDAS

In [16]:
column_names_customers = [field.name for field in custom_schema_customers.fields]




In [17]:
df_pandas_customers = pd.read_csv(dir_data_customers, 
                                  header='infer', 
                                  names=column_names_customers)




In [18]:
describe_dataframe(data=df_pandas_customers)

PANDAS DATAFRAME
--------------------------------------------------
SHOWING DATA
   customerid  firstname     lastname          fullname
0         293  Catherine         Abel    Catherine Abel
1         295        Kim  Abercrombie   Kim Abercrombie
2         297   Humberto      Acevedo  Humberto Acevedo
3         291    Gustavo       Achong    Gustavo Achong
4         299      Pilar     Ackerman    Pilar Ackerman
COUNTING ROWS
635
LISTING COLUMNS
Index(['customerid', 'firstname', 'lastname', 'fullname'], dtype='object')
DESCRIBING DATA
        customerid
count   635.000000
mean   1038.615748
std     474.257783
min     291.000000
25%     654.000000
50%     993.000000
75%    1340.000000
max    1993.000000


## Spark - Operations with columns

## DEFINING NAME OF COLUMNS USING A LIST

In [19]:
df_customers.toDF(*['customerID', 'firstname', 'lastname', 'fullname']).show()

+----------+----------+-----------+--------------------+
|customerID| firstname|   lastname|            fullname|
+----------+----------+-----------+--------------------+
|       295|       Kim|Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|
|       313|     James|    Aguilar|       James Aguilar|
|       315|    Robert|   Ahlering|     Robert Ahlering|
|       319|       Kim|      Akers|           Kim Akers|
|       441|   Stanley|       Alan|        Stanley Alan|
|       323|       Amy|    Alberts|         Amy Alberts|
|       325|      Anna|   Albri

## Selecting columns

In [20]:
list_columns_to_select = ['customerid', 'firstname', 'lastname']




In [21]:
def filter_columns(data, list_columns):
    
    # GET INTERSECTION COLUMNS
    list_columns_select = [column for column in list_columns_to_select if column in df_customers.columns]
    
    if isinstance(data, pyspark.sql.DataFrame):
        print("PYSPARK - FILTERING COLUMNS - {}".format(list_columns_select))
        return data.select(*list_columns_select)
    
    elif isinstance(data, pd.DataFrame):
        print("PANDAS - FILTERING COLUMNS - {}".format(list_columns_select))
        return data[list_columns_select]
    
    else:
        print("INVALID DATA TYPE")
        return data




In [22]:
df_customers_selected = filter_columns(data=df_customers, list_columns=list_columns_to_select)

PYSPARK - FILTERING COLUMNS - ['customerid', 'firstname', 'lastname']


In [23]:
df_customers_selected.show()

+----------+----------+-----------+
|customerid| firstname|   lastname|
+----------+----------+-----------+
|       295|       Kim|Abercrombie|
|       297|  Humberto|    Acevedo|
|       291|   Gustavo|     Achong|
|       299|     Pilar|   Ackerman|
|       305|     Carla|      Adams|
|       301|   Frances|      Adams|
|       307|       Jay|      Adams|
|       309|    Ronald|      Adina|
|       311|    Samuel|   Agcaoili|
|       313|     James|    Aguilar|
|       315|    Robert|   Ahlering|
|       319|       Kim|      Akers|
|       441|   Stanley|       Alan|
|       323|       Amy|    Alberts|
|       325|      Anna|   Albright|
|       327|    Milton|     Albury|
|       329|      Paul|     Alcorn|
|       331|   Gregory|   Alderson|
|       333|J. Phillip|  Alexander|
|      1149|      Mary|  Alexander|
+----------+----------+-----------+
only showing top 20 rows


# Current date

In [24]:
from pyspark.sql.functions import current_date, date_format




In [25]:
current_date()

Column<'current_date()'>


In [26]:
# Get the current date using the current_date() function
current_date_df = spark.range(1).select(current_date().alias("current_date"))




In [27]:
# Show the result
current_date_df.show()

+------------+
|current_date|
+------------+
|  2023-08-11|
+------------+


In [28]:
# Convert the date to string format "year_month_day"
date_string_df = current_date_df.select(date_format("current_date", "yyyy_MM_dd").alias("formatted_date"))




In [29]:
# Show the result
date_string_df.show()

+--------------+
|formatted_date|
+--------------+
|    2023_08_11|
+--------------+


# Converting types

In [52]:
def convert_pyspark_to_pandas(data):
    
    if isinstance(data, pyspark.sql.DataFrame):
        df_converted_pandas = data.toPandas()
        return df_converted_pandas
    else:
        print("INVALID DATA TYPE")
        return data




In [54]:
from awsglue.dynamicframe import DynamicFrame

def convert_pyspark_to_dynamicframe(data):
    
    if isinstance(data, pyspark.sql.DataFrame):
        df_converted_dynamic_frame = DynamicFrame.fromDF(data, 
                                                         glueContext, 
                                                         "pyspark_to_dynamicframe")
        return df_converted_dynamic_frame
    else:
        print("INVALID DATA TYPE")
        return data
    
def convert_dynamicframe_to_pyspark(data):
    
    if isinstance(data, DynamicFrame):
        df_converted_pyspark = data.toDF()
        return df_converted_pyspark
    else:
        print("INVALID DATA TYPE")
        return data




## Pyspark to Pandas

In [43]:
df_customers_converted_pandas = convert_pyspark_to_pandas(data=df_customers)




In [44]:
type(df_customers_converted_pandas)

<class 'pandas.core.frame.DataFrame'>


## Pyspark to DynamicFrame

In [46]:
df_customers_converted_dynamic_frame = convert_pyspark_to_dynamicframe(data=df_customers)




In [47]:
type(df_customers_converted_dynamic_frame)

<class 'awsglue.dynamicframe.DynamicFrame'>


## DynamicFrame to Pyspark

In [56]:
df_customers_converted_pyspark = convert_dynamicframe_to_pyspark(df_customers_converted_dynamic_frame)




In [57]:
type(df_customers_converted_pyspark)

<class 'pyspark.sql.dataframe.DataFrame'>
