
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: f6c87ded-26d4-49c7-a3d0-28e5b6c8a691
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session f6c87ded-26d4-49c7-a3d0-28e5b6c8a691 to get into ready status...
Session f6c87ded-26d4-49c7-a3d0-28e5b6c8a691 has been created.



In [2]:
# Note: the above code block i.e. spark session initiation fails if the IAM role does not have iamPassRole action
# 
# custom policy
# {
#     "Version": "2012-10-17",
#     "Statement": [
#         {
#             "Effect": "Allow",
#             "Action": [
#                 "iam:GetRole",
#                 "iam:PassRole"
#             ],
#             "Resource": [
#                 "*"
#             ]
#         }
#     ]
# }

In [2]:
df = spark.read.load("s3://mybucket**Redacted/website_visit_data.csv", 
                          format="csv", 
                          sep=",", 
                          inferSchema="true",
                          header="true")

type(df)

<class 'pyspark.sql.dataframe.DataFrame'>


In [3]:
# Print the schema in a tree format

df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- CustomerType: integer (nullable = true)
 |-- DateStarted: timestamp (nullable = true)
 |-- Duration: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- ReviewDuration: integer (nullable = true)
 |-- RelatedDuration: integer (nullable = true)
 |-- Purchased: integer (nullable = true)


In [4]:
## get total count

df.count()

10000


In [7]:
df.show()

+-----+------------------+------------+-------------------+--------+------+---+------+--------------+---------------+---------+
|   ID|      CustomerName|CustomerType|        DateStarted|Duration|gender|Age|Salary|ReviewDuration|RelatedDuration|Purchased|
+-----+------------------+------------+-------------------+--------+------+---+------+--------------+---------------+---------+
|10000|    Halloran,Ford |           3|2018-03-09 11:33:00|  565.81|  Male| 26|   198|            60|           null|        1|
|10001|      Heidie Durno|           1|2018-03-09 00:10:00|   573.0|Female| 30|    95|            93|           null|        0|
|10002|       Chad Rodden|           4|2018-03-09 13:48:00|  134.39|Female| 67|    31|            68|             92|        0|
|10003|     Mathon,Bjorn |           2|2018-03-09 13:27:00|  605.32|  Male| 22|    50|          null|           null|        0|
|10004|   Rentoll,Truman |           2|2018-03-09 14:01:00|   533.4|  Male| 67|   105|          null|   

In [18]:
# get unique (i.e. distinct) values

df.dropDuplicates(["CustomerType"]).select("CustomerType").show()

+------------+
|CustomerType|
+------------+
|           1|
|           3|
|           2|
|           4|
+------------+


In [21]:
# get unique (i.e. distinct) values

df.dropDuplicates(["Salary"]).count()

191


In [16]:
# using sql with spark
df.createOrReplaceTempView("tempTabledf")
spark.sql("select customername from tempTabledf").show()

+------------------+
|      customername|
+------------------+
|    Halloran,Ford |
|      Heidie Durno|
|       Chad Rodden|
|     Mathon,Bjorn |
|   Rentoll,Truman |
|    Darelle Murkus|
|   Eddy Jellicorse|
|  Van Dijk,Waylin |
|      Luise Tenant|
|    Regnard,Keven |
|    Dorri Cuardall|
|    Haydon,Gerick |
|      Heddi Brauns|
|     Bonnin,Bryon |
|Melisenda Colquitt|
|    Helaina Ragate|
|    Barukh,Sutton |
|   Babette Laslett|
|     Coriss Skiggs|
|   Gretta Milstead|
+------------------+
only showing top 20 rows


In [25]:
# filter by multiple column values 
# filter() and where() are synonymous

df.filter((df["customertype"] > 2) & (df["salary"] > 100)).show(truncate=False)

+-----+---------------------+------------+-------------------+--------+------+---+------+--------------+---------------+---------+
|ID   |CustomerName         |CustomerType|DateStarted        |Duration|gender|Age|Salary|ReviewDuration|RelatedDuration|Purchased|
+-----+---------------------+------------+-------------------+--------+------+---+------+--------------+---------------+---------+
|10000|Halloran,Ford        |3           |2018-03-09 11:33:00|565.81  |Male  |26 |198   |60            |null           |1        |
|10005|Darelle Murkus       |3           |2018-03-09 09:03:00|871.81  |Female|29 |182   |null          |89             |1        |
|10006|Eddy Jellicorse      |4           |2018-03-09 02:56:00|642.57  |Female|74 |115   |null          |71             |0        |
|10013|Bonnin,Bryon         |4           |2018-03-09 16:02:00|370.41  |Male  |74 |126   |82            |44             |0        |
|10016|Barukh,Sutton        |3           |2018-03-09 22:09:00|720.61  |Male  |20 |1

In [30]:
# need to import length() from pyspark
from pyspark.sql.functions import length

df.filter((length(df["CustomerName"]) > 20) & (df["salary"] > 150)).show(truncate=False)

+-----+--------------------------+------------+-------------------+--------+------+---+------+--------------+---------------+---------+
|ID   |CustomerName              |CustomerType|DateStarted        |Duration|gender|Age|Salary|ReviewDuration|RelatedDuration|Purchased|
+-----+--------------------------+------------+-------------------+--------+------+---+------+--------------+---------------+---------+
|10031|Stradling,Constantin      |3           |2018-03-09 14:04:00|677.48  |Male  |29 |156   |null          |36             |1        |
|10388|Andromache Rutherford     |2           |2018-03-09 19:34:00|584.41  |Female|17 |176   |93            |69             |1        |
|10508|Marylynne Bartkiewicz     |3           |2018-03-09 00:19:00|644.61  |Female|19 |167   |78            |18             |0        |
|10662|Bellchamber,Llywellyn     |1           |2018-03-09 23:34:00|113.63  |Male  |58 |160   |null          |20             |0        |
|11383|Leathwood,Shellysheldon   |3           |2

In [34]:
# Aggregate functionss

df.groupBy("CustomerType").count().show()

+------------+-----+
|CustomerType|count|
+------------+-----+
|           1| 2437|
|           3| 2550|
|           2| 2501|
|           4| 2512|
+------------+-----+


In [39]:
# renaming aggregate column from sum(salary) to salaryTotal
df.groupBy("CustomerType").sum("salary").withColumnRenamed("sum(salary)","salaryTotal").show()

+------------+-----------+
|CustomerType|salaryTotal|
+------------+-----------+
|           1|     255773|
|           3|     267285|
|           2|     262075|
|           4|     268636|
+------------+-----------+


In [50]:
# Now to test UDF vs spark functions

from pyspark.sql.functions import udf, split
from pyspark.sql.types import StringType
import datetime

def name_splitter(name):
    split_name_temp = name.split()
    
    # Title case for name
    split_name = [x.title() for x in split_name_temp]
    return split_name
    
split_name_udf = udf(lambda x: name_splitter(x), StringType())




In [68]:
# Test udf
start = datetime.datetime.now()
df.select("CustomerName", split_name_udf("CustomerName")).show()
# end = start = datetime.datetime.now()

udf_execution_time = datetime.datetime.now()-start

+------------------+----------------------+
|      CustomerName|<lambda>(CustomerName)|
+------------------+----------------------+
|    Halloran,Ford |       [Halloran,Ford]|
|      Heidie Durno|       [Heidie, Durno]|
|       Chad Rodden|        [Chad, Rodden]|
|     Mathon,Bjorn |        [Mathon,Bjorn]|
|   Rentoll,Truman |      [Rentoll,Truman]|
|    Darelle Murkus|     [Darelle, Murkus]|
|   Eddy Jellicorse|    [Eddy, Jellicorse]|
|  Van Dijk,Waylin |    [Van, Dijk,Waylin]|
|      Luise Tenant|       [Luise, Tenant]|
|    Regnard,Keven |       [Regnard,Keven]|
|    Dorri Cuardall|     [Dorri, Cuardall]|
|    Haydon,Gerick |       [Haydon,Gerick]|
|      Heddi Brauns|       [Heddi, Brauns]|
|     Bonnin,Bryon |        [Bonnin,Bryon]|
|Melisenda Colquitt|  [Melisenda, Colqu...|
|    Helaina Ragate|     [Helaina, Ragate]|
|    Barukh,Sutton |       [Barukh,Sutton]|
|   Babette Laslett|    [Babette, Laslett]|
|     Coriss Skiggs|      [Coriss, Skiggs]|
|   Gretta Milstead|    [Gretta,

In [69]:
# using spark split 
start = datetime.datetime.now()
df.select("CustomerName", split(df["CustomerName"], ' ')).show()
# end = start = datetime.datetime.now()

sparkSplit_execution_time = datetime.datetime.now()-start

+------------------+----------------------+
|      CustomerName|split(CustomerName,  )|
+------------------+----------------------+
|    Halloran,Ford |     [Halloran,Ford, ]|
|      Heidie Durno|       [Heidie, Durno]|
|       Chad Rodden|        [Chad, Rodden]|
|     Mathon,Bjorn |      [Mathon,Bjorn, ]|
|   Rentoll,Truman |    [Rentoll,Truman, ]|
|    Darelle Murkus|     [Darelle, Murkus]|
|   Eddy Jellicorse|    [Eddy, Jellicorse]|
|  Van Dijk,Waylin |  [Van, Dijk,Waylin, ]|
|      Luise Tenant|       [Luise, Tenant]|
|    Regnard,Keven |     [Regnard,Keven, ]|
|    Dorri Cuardall|     [Dorri, Cuardall]|
|    Haydon,Gerick |     [Haydon,Gerick, ]|
|      Heddi Brauns|       [Heddi, Brauns]|
|     Bonnin,Bryon |      [Bonnin,Bryon, ]|
|Melisenda Colquitt|  [Melisenda, Colqu...|
|    Helaina Ragate|     [Helaina, Ragate]|
|    Barukh,Sutton |     [Barukh,Sutton, ]|
|   Babette Laslett|    [Babette, Laslett]|
|     Coriss Skiggs|      [Coriss, Skiggs]|
|   Gretta Milstead|    [Gretta,

In [70]:
# comparing the two
print(udf_execution_time)
print(sparkSplit_execution_time)
print('Time difference : ', udf_execution_time - sparkSplit_execution_time) 

0:00:00.282939
0:00:00.153676
Time difference :  0:00:00.129263
