
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0                                                        |
| %security_configuration     |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.31 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::498938378154:role/service-role/AWSGlueServiceRole-gg
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 5091284e-6319-4e8f-bc75-d94489874a07
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
Waiting for session 5091284e-6319-4e8f-bc75-d944898

### Read data dynamic frames from data catalog and s3 parquet files and create **orders** and **order_products_prior** **TempView** to allow **sql** implementation

In [2]:
orders = glueContext.create_dynamic_frame.from_catalog(database="prd",table_name="orders")
order_products_prior = glueContext.create_dynamic_frame.from_options(connection_type = "parquet", 
                    connection_options = {"paths":["s3://imba-alan/features/order_products_prior/"]})

orders.toDF().createOrReplaceTempView("orders")
order_products_prior.toDF().createOrReplaceTempView("order_products_prior")

spark.sql("select distinct user_id from orders").show()

+-------+
|user_id|
+-------+
|     26|
|     29|
|    171|
|    364|
|    452|
|    474|
|    480|
|    501|
|    537|
|    631|
|    686|
|    701|
|    747|
|    792|
|    852|
|    870|
|    906|
|    964|
|   1016|
|   1177|
+-------+
only showing top 20 rows


### Create relevant dataframes for user_features_1, user_features_2, up_features and prd_features

In [9]:
spark.sql("select user_id, max(order_number) AS max_ord_num, sum(days_since_prior_order) AS sum_of_days, round(AVG(days_since_prior_order),2) AS average_days_since_prior_order from orders group by user_id").createOrReplaceTempView("user_features_1")




In [12]:
spark.sql('''
select user_id, count(product_id) AS total_products, count(distinct product_id) AS total_distinct_products, 
round(CAST(sum(CASE WHEN reordered = 1 THEN 1 ELSE 0 END) AS double)/count(CASE 
                   WHEN order_number > 1 THEN 1
                   END),4) AS reordered_ratio
from order_products_prior
group by user_id
''').createOrReplaceTempView("user_features_2")




In [13]:
spark.sql('''
select user_id, product_id, count(distinct order_id) AS total_number_of_orders, min(order_number) AS min_order_number, 
max(order_number) AS max_order_number, round(avg(add_to_cart_order),2) AS avg_add_to_cart_order
from order_products_prior
group by user_id, product_id
''').createOrReplaceTempView("up_features")




In [14]:
spark.sql('''
select product_id, count(*) AS cnt_prod_orders, sum(reordered) AS cnt_prod_reorders, 
count(case when product_seq_time = 1 then 1 end) AS cnt_product_first_orders,
count(case when product_seq_time = 2 then 1 end) AS cnt_product_second_orders
from 
(select user_id, order_number, product_id, reordered,
sum(1) over (partition by user_id, product_id order by order_number) AS product_seq_time
from order_products_prior) a
group by product_id
''').createOrReplaceTempView("prd_features")




### Create the final data set by joinning all features together

In [16]:
final_features = spark.sql('''
select *
from user_features_1 t1 inner join user_features_2 t2 using(user_id)
inner join up_features t3 using(user_id)
inner join prd_features t4 using(product_id)
''')




### Check the final features dataframe to make sure it presents the correct results

In [17]:
final_features.count()

13307953


In [18]:
final_features.printSchema()

root
 |-- product_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- max_ord_num: long (nullable = true)
 |-- sum_of_days: double (nullable = true)
 |-- average_days_since_prior_order: double (nullable = true)
 |-- total_products: long (nullable = false)
 |-- total_distinct_products: long (nullable = false)
 |-- reordered_ratio: double (nullable = true)
 |-- total_number_of_orders: long (nullable = false)
 |-- min_order_number: long (nullable = true)
 |-- max_order_number: long (nullable = true)
 |-- avg_add_to_cart_order: double (nullable = true)
 |-- cnt_prod_orders: long (nullable = false)
 |-- cnt_prod_reorders: long (nullable = true)
 |-- cnt_product_first_orders: long (nullable = false)
 |-- cnt_product_second_orders: long (nullable = false)


### Export it the correct s3 output location as a single CSV file

In [19]:
final_features.repartition(1).write.mode('overwrite').format('csv').save("s3://imba-alan/glue_output", header = 'true')


