# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0 and 3.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X, G.2X, G.4X and G.8X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::014903016512:role/gavxro-pyspark-tutorialRole
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: f098004f-76f6-49fa-8926-002e1927a61a
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
Waiting for session f098004f-76f6-49fa-8926-002e1927a61a to get into ready status...
Session f098004f-76f6-49fa-8926-002e1927a61a has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [4]:
# 4.Read data from Customers Table using Notebook Using Dynamic Frame
dynamicFrameCustomers = glueContext.create_dynamic_frame.from_catalog(
database = "pyspark_tutorial_db", 
table_name = "customers"
)

# Show the top 10 rows from the dynamic dataframe
dynamicFrameCustomers.show(10)

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}


In [None]:
# s3output = glueContext.getSink(
#   path="s3://bucket_name/folder_name",
#   connection_type="s3",
#   updateBehavior="UPDATE_IN_DATABASE",
#   partitionKeys=[],
#   compression="snappy",
#   enableUpdateCatalog=True,
#   transformation_ctx="s3output",
# )
# s3output.setCatalogInfo(
#   catalogDatabase="demo", catalogTableName="populations"
# )
# s3output.setFormat("glueparquet")
# s3output.writeFrame(DyF)

In [5]:
# 5. Check data types in Dynamic Frame
# Check types in dynamic frame
dynamicFrameCustomers.printSchema()

root
|-- customerid: long
|-- firstname: string
|-- lastname: string
|-- fullname: string


In [6]:
# 6. Count The Number of Rows in a Dynamic DataFrame
# Count The Number of Rows in a Dynamic Dataframe 
dynamicFrameCustomers.count()

635


In [7]:
# 7. Select Fields From A Dynamic frame
# Selecting certain fields from a Dynamic DataFrame
dyfCustomerSelectFields = dynamicFrameCustomers.select_fields(["customerid", "fullname"])

# Show top 10
dyfCustomerSelectFields.show(10)

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}


In [8]:
# 8. Drop Columns in a Dynamic Frame
#Drop Fields of Dynamic Frame
dyfCustomerDropFields = dynamicFrameCustomers.drop_fields(["firstname","lastname"])

# Show Top 10 rows of dyfCustomerDropFields Dynamic Frame
dyfCustomerDropFields.show(10)

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}


In [9]:
# 9. Rename Columns in a Dynamic Frame
# Mapping array for column rename fullname -> name
mapping=[("customerid", "long", "customerid","long"),("fullname", "string", "name", "string")]

# Apply the mapping to rename fullname -> name
dfyMapping = ApplyMapping.apply(
                                frame = dyfCustomerDropFields, 
                                mappings = mapping, 
                                transformation_ctx = "applymapping1"
                                )

# show the new dynamic frame with name column 
dfyMapping.show(10)

{"customerid": 293, "name": "Catherine Abel"}
{"customerid": 295, "name": "Kim Abercrombie"}
{"customerid": 297, "name": "Humberto Acevedo"}
{"customerid": 291, "name": "Gustavo Achong"}
{"customerid": 299, "name": "Pilar Ackerman"}
{"customerid": 305, "name": "Carla Adams"}
{"customerid": 301, "name": "Frances Adams"}
{"customerid": 307, "name": "Jay Adams"}
{"customerid": 309, "name": "Ronald Adina"}
{"customerid": 311, "name": "Samuel Agcaoili"}


In [10]:
# 10. Filter data in a Dynamic Frame
# Filter dynamicFrameCustomers for customers who have the last name Adams
dyfFilter=  Filter.apply(frame = dynamicFrameCustomers, 
                                        f = lambda x: x["lastname"] in "Adams"
                                    )

# Show the top 10 customers  from the filtered Dynamic frame 
dyfFilter.show(10)

{"lastname": "Adams", "firstname": "Carla", "customerid": 305, "fullname": "Carla Adams"}
{"lastname": "Adams", "firstname": "Frances", "customerid": 301, "fullname": "Frances Adams"}
{"lastname": "Adams", "firstname": "Jay", "customerid": 307, "fullname": "Jay Adams"}


In [11]:
# 11. Join Two Dynamic frames on a equality join
# read up orders dynamic frame

# Read from the customers table in the glue data catalog using a dynamic frame
dynamicFrameOrders = glueContext.create_dynamic_frame.from_catalog(
database = "pyspark_tutorial_db", 
table_name = "orders"
)

# show top 10 rows of orders table
dynamicFrameOrders.show(10)

{"salesorderid": 43659, "salesorderdetailid": 1, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 776, "orderqty": 1, "unitprice": 2024.9940, "unitpricediscount": 0.0000, "linetotal": 2024.9940}
{"salesorderid": 43659, "salesorderdetailid": 2, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 777, "orderqty": 3, "unitprice": 2024.9940, "unitpricediscount": 0.0000, "linetotal": 6074.9820}
{"salesorderid": 43659, "salesorderdetailid": 3, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 778, "order

In [12]:
# join customers and orders dynamic frame

# Join two dynamic frames on an equality join
dyfjoin = dynamicFrameCustomers.join(["customerid"],["customerid"],dynamicFrameOrders)

# show top 10 rows for the joined dynamic 
dyfjoin.show(10)

{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1628, "productid": 754, "linetotal": 874.7940, "employeeid": 277, ".customerid": 671, "taxamt": 579.2061, "salesorderid": 44097, "duedate": "8/13/2011", "orderqty": 1, "shipdate": "8/8/2011", "lastname": "Chapla", "firstname": "Lee", "totaldue": 6796.0326, "unitprice": 874.7940, "orderdate": "8/1/2011", "unitpricediscount": 0.0000, "customerid": 671, "fullname": "Lee Chapla"}
{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1629, "productid": 760, "linetotal": 419.4589, "employeeid": 277, ".customerid": 671, "taxamt": 579.2061, "salesorderid": 44097, "duedate": "8/13/2011", "orderqty": 1, "shipdate": "8/8/2011", "lastname": "Chapla", "firstname": "Lee", "totaldue": 6796.0326, "unitprice": 419.4589, "orderdate": "8/1/2011", "unitpricediscount": 0.0000, "customerid": 671, "fullname": "Lee Chapla"}
{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1630, "productid": 762, "linetotal": 838.

In [13]:
# 12. Write Down Data from a Dynamic Frame To S3

# Create a folder in the S3 bucket created by Cloudformation to use as a location 
# to write the data down to write_down_dyf_to_s3
# Write down data to S3 using the dynamic DataFrame writer class for an S3 path.
# write down the data in a Dynamic Frame to S3 location. 
glueContext.write_dynamic_frame.from_options(
                        frame = dynamicFrameCustomers,
                        connection_type="s3", 
                        connection_options = {"path": "s3://gavxro-pyspark-tutorial/write_down_dyf_to_s3"}, 
                        format = "csv", 
                        format_options={
                            "separator": ","
                            },
                        transformation_ctx = "datasink2")

<awsglue.dynamicframe.DynamicFrame object at 0x7f2a85707c10>


In [14]:
# 13. Write Down Data from a Dynamic Frame using Glue Data Catalog

# write data from the dynamicFrameCustomers to customers_write_dyf table using the meta data stored in the glue data catalog 
glueContext.write_dynamic_frame.from_catalog(
    frame=dynamicFrameCustomers,
    database = "pyspark_tutorial_db",  
    table_name = "customers_write_dyf"
)


<awsglue.dynamicframe.DynamicFrame object at 0x7f2a85707250>


In [15]:
# 14. Convert from Dynamic Frame To Spark DataFrame

# Dynamic Frame to Spark DataFrame 
sparkDf = dynamicFrameCustomers.toDF()

#show spark DF
sparkDf.show()


+----------+----------+-----------+--------------------+
|customerid| firstname|   lastname|            fullname|
+----------+----------+-----------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|
|       313|     James|    Aguilar|       James Aguilar|
|       315|    Robert|   Ahlering|     Robert Ahlering|
|       319|       Kim|      Akers|           Kim Akers|
|       441|   Stanley|       Alan|        Stanley Alan|
|       323|       Amy|    Albe

In [16]:
# Selecting Columns In a Spark DataFrame

# Select columns from spark dataframe
dfSelect = sparkDf.select("customerid","fullname")

# show selected
dfSelect.show()

+----------+--------------------+
|customerid|            fullname|
+----------+--------------------+
|       293|      Catherine Abel|
|       295|     Kim Abercrombie|
|       297|    Humberto Acevedo|
|       291|      Gustavo Achong|
|       299|      Pilar Ackerman|
|       305|         Carla Adams|
|       301|       Frances Adams|
|       307|           Jay Adams|
|       309|        Ronald Adina|
|       311|     Samuel Agcaoili|
|       313|       James Aguilar|
|       315|     Robert Ahlering|
|       319|           Kim Akers|
|       441|        Stanley Alan|
|       323|         Amy Alberts|
|       325|       Anna Albright|
|       327|       Milton Albury|
|       329|         Paul Alcorn|
|       331|    Gregory Alderson|
|       333|J. Phillip Alexander|
+----------+--------------------+
only showing top 20 rows


In [17]:
# 16. Add Columns In A Spark Dataframe

# creating a new column with a literal string
#import lit from sql functions 
from  pyspark.sql.functions import lit

# Add new column to spark dataframe
dfNewColumn = sparkDf.withColumn("date", lit("2023-07-12"))

# show df with new column
dfNewColumn.show()

+----------+----------+-----------+--------------------+----------+
|customerid| firstname|   lastname|            fullname|      date|
+----------+----------+-----------+--------------------+----------+
|       293| Catherine|       Abel|      Catherine Abel|2023-07-12|
|       295|       Kim|Abercrombie|     Kim Abercrombie|2023-07-12|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|2023-07-12|
|       291|   Gustavo|     Achong|      Gustavo Achong|2023-07-12|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|2023-07-12|
|       305|     Carla|      Adams|         Carla Adams|2023-07-12|
|       301|   Frances|      Adams|       Frances Adams|2023-07-12|
|       307|       Jay|      Adams|           Jay Adams|2023-07-12|
|       309|    Ronald|      Adina|        Ronald Adina|2023-07-12|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|2023-07-12|
|       313|     James|    Aguilar|       James Aguilar|2023-07-12|
|       315|    Robert|   Ahlering|     Robert A

In [18]:
# Using concat to concatenate two columns together

#import concat from functions 
from  pyspark.sql.functions import concat

# create another full name column
dfNewFullName = sparkDf.withColumn("new_full_name",concat("firstname",concat(lit(' '),"lastname")))

#show full name column 
dfNewFullName.show()

+----------+----------+-----------+--------------------+--------------------+
|customerid| firstname|   lastname|            fullname|       new_full_name|
+----------+----------+-----------+--------------------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|     Sam

In [19]:
# 19. Dropping Columns

# Drop column from spark dataframe
dfDropCol = sparkDf.drop("firstname","lastname")

#show dropped column df
dfDropCol.show()

+----------+--------------------+
|customerid|            fullname|
+----------+--------------------+
|       293|      Catherine Abel|
|       295|     Kim Abercrombie|
|       297|    Humberto Acevedo|
|       291|      Gustavo Achong|
|       299|      Pilar Ackerman|
|       305|         Carla Adams|
|       301|       Frances Adams|
|       307|           Jay Adams|
|       309|        Ronald Adina|
|       311|     Samuel Agcaoili|
|       313|       James Aguilar|
|       315|     Robert Ahlering|
|       319|           Kim Akers|
|       441|        Stanley Alan|
|       323|         Amy Alberts|
|       325|       Anna Albright|
|       327|       Milton Albury|
|       329|         Paul Alcorn|
|       331|    Gregory Alderson|
|       333|J. Phillip Alexander|
+----------+--------------------+
only showing top 20 rows


In [20]:
# 20. Renaming columns

# Rename column in Spark dataframe
dfRenameCol = sparkDf.withColumnRenamed("fullname","full_name")

#show renamed column dataframe
dfRenameCol.show()

+----------+----------+-----------+--------------------+
|customerid| firstname|   lastname|           full_name|
+----------+----------+-----------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|
|       313|     James|    Aguilar|       James Aguilar|
|       315|    Robert|   Ahlering|     Robert Ahlering|
|       319|       Kim|      Akers|           Kim Akers|
|       441|   Stanley|       Alan|        Stanley Alan|
|       323|       Amy|    Albe

In [21]:
# 21. GroupBy and Aggregate Operations

# Group by lastname then print counts of lastname and show
sparkDf.groupBy("lastname").count().show()

+--------+-----+
|lastname|count|
+--------+-----+
|  Achong|    1|
|  Bailey|    1|
|   Caron|    1|
|   Casts|    1|
|   Curry|    1|
| Desalvo|    1|
| Dockter|    1|
|    Dyck|    1|
|  Farino|    1|
| Fluegel|    1|
|   Ganio|    1|
|   Gimmi|    1|
|Gonzales|    2|
|   Graff|    1|
|   Groth|    1|
|    Hass|    1|
| Hassall|    1|
|Hillmann|    1|
| Hodgson|    1|
|   Ihrig|    1|
+--------+-----+
only showing top 20 rows


In [22]:
# 22. Filtering Columns and Where clauses

# Filter the spark dataframe
# Filter spark DataFrame for customers who have the last name Adams
sparkDf.filter(sparkDf["lastname"] == "Adams").show()

+----------+---------+--------+-------------+
|customerid|firstname|lastname|     fullname|
+----------+---------+--------+-------------+
|       305|    Carla|   Adams|  Carla Adams|
|       301|  Frances|   Adams|Frances Adams|
|       307|      Jay|   Adams|    Jay Adams|
+----------+---------+--------+-------------+


In [23]:
# Where clause
# Where clause spark DataFrame for customers who have the last name Adams
sparkDf.where("lastname =='Adams'").show()

+----------+---------+--------+-------------+
|customerid|firstname|lastname|     fullname|
+----------+---------+--------+-------------+
|       305|    Carla|   Adams|  Carla Adams|
|       301|  Frances|   Adams|Frances Adams|
|       307|      Jay|   Adams|    Jay Adams|
+----------+---------+--------+-------------+


In [25]:
# 23. Joins
# Read from the customers table in the glue data catalog using a dynamic frame and convert to spark dataframe
dfOrders = glueContext.create_dynamic_frame.from_catalog(
                                        database = "pyspark_tutorial_db", 
                                        table_name = "orders"
                                    ).toDF()




In [26]:
# Inner join for Spark Dataframe All Data
# Inner Join Customers Spark DF to Orders Spark DF
sparkDf.join(dfOrders,sparkDf.customerid ==  dfOrders.customerid,"inner").show(truncate=False)

+----------+---------+--------+--------------+------------+------------------+---------+---------+--------+----------+----------+----------+---------+--------+----------+---------+--------+---------+-----------------+---------+
|customerid|firstname|lastname|fullname      |salesorderid|salesorderdetailid|orderdate|duedate  |shipdate|employeeid|customerid|subtotal  |taxamt   |freight |totaldue  |productid|orderqty|unitprice|unitpricediscount|linetotal|
+----------+---------+--------+--------------+------------+------------------+---------+---------+--------+----------+----------+----------+---------+--------+----------+---------+--------+---------+-----------------+---------+
|517       |Richard  |Bready  |Richard Bready|43665       |61                |5/31/2011|6/12/2011|6/7/2011|283       |517       |14352.7713|1375.9427|429.9821|16158.6961|711      |2       |20.1865  |0.0000           |40.3730  |
|517       |Richard  |Bready  |Richard Bready|43665       |62                |5/31/2011|

In [27]:
# Inner Join Adams only

#Get customers that only have surname Adams
dfAdams = sparkDf.where("lastname =='Adams'")

# inner join on Adams DF and orders
dfAdams.join(dfOrders,dfAdams.customerid ==  dfOrders.customerid,"inner").show()

+----------+---------+--------+---------+------------+------------------+----------+----------+---------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+---------+
|customerid|firstname|lastname| fullname|salesorderid|salesorderdetailid| orderdate|   duedate| shipdate|employeeid|customerid|  subtotal|   taxamt|  freight|  totaldue|productid|orderqty|unitprice|unitpricediscount|linetotal|
+----------+---------+--------+---------+------------+------------------+----------+----------+---------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+---------+
|       307|      Jay|   Adams|Jay Adams|       48382|             23857|10/30/2012|11/11/2012|11/6/2012|       277|       307|   20.5200|   2.0246|   0.6327|   23.1773|      805|       1|  20.5200|           0.0000|  20.5200|
|       307|      Jay|   Adams|Jay Adams|       50734|             34874| 4/30/2013| 5/12/20

In [28]:
# Left Join

#left join on orders and adams df
dfOrders.join(dfAdams,dfAdams.customerid ==  dfOrders.customerid,"left").show(100)


+------------+------------------+----------+----------+---------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+----------+----------+---------+--------+--------+
|salesorderid|salesorderdetailid| orderdate|   duedate| shipdate|employeeid|customerid|  subtotal|   taxamt|  freight|  totaldue|productid|orderqty|unitprice|unitpricediscount| linetotal|customerid|firstname|lastname|fullname|
+------------+------------------+----------+----------+---------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+----------+----------+---------+--------+--------+
|       44110|              1732|  8/1/2011| 8/13/2011| 8/8/2011|       277|       295|16667.3077|1600.6864| 500.2145|18768.2086|      765|       2| 419.4589|           0.0000|  838.9178|      null|     null|    null|    null|
|       44110|              1733|  8/1/2011| 8/13/2011| 8/8/2011|       277|       295|16667

In [29]:
# 24. Writing data down using the Glue Data Catalog
# delete data from S3 in customers_write_dyf and write_down_dyf_to_s3
# convert from spark Dataframe to Glue Dynamic DataFrame

# Import Dynamic DataFrame class
from awsglue.dynamicframe import DynamicFrame

#Convert from Spark Data Frame to Glue Dynamic Frame
dyfCustomersConvert = DynamicFrame.fromDF(sparkDf, glueContext, "convert")

#Show converted Glue Dynamic Frame
dyfCustomersConvert.show()

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}
{"customerid": 313, "firstname": "James", "lastname": 

In [30]:
# Write Dynamic DataFrame down to S3 location

# write down the data in converted Dynamic Frame to S3 location. 
glueContext.write_dynamic_frame.from_options(
                            frame = dyfCustomersConvert,
                            connection_type="s3", 
                            connection_options = {"path": "s3://gavxro-pyspark-tutorial/write_down_dyf_to_s3"}, 
                            format = "csv", 
                            format_options={
                                "separator": ","
                                },
                            transformation_ctx = "datasink2")

<awsglue.dynamicframe.DynamicFrame object at 0x7f2a85675b50>


In [31]:
# Write Dynamic DataFrame using Glue Data Catalog

# write data from the converted to customers_write_dyf table using the meta data stored in the glue data catalog 
glueContext.write_dynamic_frame.from_catalog(
    frame = dyfCustomersConvert,
    database = "pyspark_tutorial_db",  
    table_name = "customers_write_dyf")

<awsglue.dynamicframe.DynamicFrame object at 0x7f2a8567a250>
