# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::778725022589:role/AWSGlueServiceRole-charlie
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 5ade9428-a300-472e-a53e-30cb13ce3176
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 5ade

In [7]:
# Define parameters
from datetime import datetime
now = datetime.now().strftime('%Y-%m-%d-%H:%M:%S')
bucket = 'imba-glue-output'
folder = 'sprint3-' + now

db = 'rdsdb'
aisles_table = 'postgres_public_aisles'
departments_table = 'postgres_public_departments'
orders_table = 'postgres_public_orders'
products_table = 'postgres_public_products'
order_product_table = 'postgres_public_order_product'




### Create DynamicFrame and convert to SparkDF


In [3]:
aisles_df = glueContext.create_dynamic_frame.from_catalog(database=db, table_name=aisles_table).toDF()
departments_df = glueContext.create_dynamic_frame.from_catalog(database=db, table_name=departments_table).toDF()
orders_df = glueContext.create_dynamic_frame.from_catalog(database=db, table_name=orders_table).toDF()
products_df = glueContext.create_dynamic_frame.from_catalog(database=db, table_name=products_table).toDF()
order_product_df = glueContext.create_dynamic_frame.from_catalog(database=db, table_name=order_product_table).toDF()




In [4]:
orders_df.show()

+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
|  393748| 185277|   prior|          78|        1|                9|                   5.0|
|  746725|  38109|    test|          13|        3|               16|                  10.0|
| 2005551|  63647|   prior|           8|        6|               13|                  12.0|
| 1910347|  76116|   prior|           6|        1|               16|                  15.0|
| 1790456|  13736|   prior|          34|        0|               11|                   3.0|
| 1409951| 184881|   prior|           8|        4|               10|                   2.0|
| 2529361|   3777|   prior|          13|        1|               12|                   6.0|
| 2420076|  22301|   prior|          36|        1|                9|            

### create view of order_products_prior

In [5]:
orders_prior_df = orders_df.filter(orders_df['eval_set']=='prior')
joined_order_product = orders_prior_df.join(order_product_df, 'order_id', 'left')

joined_order_product.createOrReplaceTempView('order_products_prior')
orders_df.createOrReplaceTempView('orders_view')




### user_features_1
Based on table orders, for each user, calculate the max order_number, the sum of days_since_prior_order and the average of days_since_prior_order

In [6]:
user_features_1 = spark.sql(
    'select user_id, \n'
    'max(order_number) max_order_number, \n'
    'sum(days_since_prior_order) sum_days_since_prior_order, \n'
    'avg(days_since_prior_order) avg_days_since_prior_order \n'
    'from orders_view \n'
    'group by user_id;'
)
user_features_1.show()

+-------+----------------+--------------------------+--------------------------+
|user_id|max_order_number|sum_days_since_prior_order|avg_days_since_prior_order|
+-------+----------------+--------------------------+--------------------------+
|  89655|              11|                     170.0|                      17.0|
|  58568|              27|                     364.0|                      14.0|
|  74467|              13|                     120.0|                      10.0|
| 104419|               7|                      99.0|                      16.5|
|  14127|              71|                     231.0|                       3.3|
| 194575|              74|                     363.0|         4.972602739726027|
|  26896|              28|                     340.0|        12.592592592592593|
|   9355|              15|                     224.0|                      16.0|
| 133847|              37|                     342.0|                       9.5|
|  18499|               9|  

In [8]:
print("Count: ", user_features_1.count())

Count:  206209


In [12]:
# write user_features_1 in parquet in S3
user_features_1.coalesce(1).write.parquet(f's3://{bucket}/sprint3-user-features-1/')




### user_features_2
Based on table order_products_prior, for each user calculate the total number of products, total number of distinct products, and user reorder ratio(number of reordered = 1 divided by number of order_number > 1)

In [13]:
user_features_2 = spark.sql(
    'select user_id, \n'
    'count(product_id) total_products_by_userid, \n'
    'count(distinct(product_id)) total_distinct_products_by_userid, \n'
    'sum(reordered) / count(case when order_number > 1 then 1 else 0 end) user_reorder_ratio \n'
    'from order_products_prior \n'
    'group by user_id;'
)

user_features_2.show()

+-------+------------------------+---------------------------------+-------------------+
|user_id|total_products_by_userid|total_distinct_products_by_userid| user_reorder_ratio|
+-------+------------------------+---------------------------------+-------------------+
|  97664|                     312|                               75| 0.7596153846153846|
| 192552|                     455|                              194| 0.5736263736263736|
| 179774|                     154|                               69|  0.551948051948052|
| 159694|                     660|                              157| 0.7621212121212121|
|  15182|                     219|                               11| 0.9497716894977168|
|  76863|                      48|                               25| 0.4791666666666667|
| 160975|                     542|                              226| 0.5830258302583026|
| 142249|                     449|                              124| 0.7238307349665924|
|  82480|            

In [14]:
# write user_features_2 in parquet in S3
user_features_2.coalesce(1).write.parquet(f's3://{bucket}/sprint3-user-features-2/')




### up_features
Based on table order_products_prior, for each user and product, calculate the total number of orders, minimum order_number, maximum order_number and average add_to_cart_order

In [15]:
up_features = spark.sql(
    'select user_id, product_id, \n'
    'count(order_id) total_orders_by_userprod, \n'
    'min(order_number) min_ordernumber_byuserprod, \n'
    'max(order_number) max_ordernumber_byuserprod, \n'
    'avg(add_to_cart_order) avg_addtocart_byuserprod \n'
    'from order_products_prior \n'
    'group by user_id, product_id;'
)

up_features.show()

+-------+----------+------------------------+--------------------------+--------------------------+------------------------+
|user_id|product_id|total_orders_by_userprod|min_ordernumber_byuserprod|max_ordernumber_byuserprod|avg_addtocart_byuserprod|
+-------+----------+------------------------+--------------------------+--------------------------+------------------------+
| 202279|     40141|                       5|                         2|                         6|                     5.8|
| 153404|     40545|                       6|                         2|                        27|       6.166666666666667|
|   5738|      5438|                       4|                         3|                        10|                   13.25|
| 102020|     46654|                       6|                         2|                        28|       8.666666666666666|
| 111475|     12276|                       3|                         7|                         9|      2.3333333333333335|


In [16]:
# write up_features in parquet in S3
up_features.coalesce(1).write.parquet(f's3://{bucket}/sprint3-up-features/')




### prd_features
Based on table order_products_prior,
- first write a sql query to calculate the sequence of product purchase for each user, and name it product_seq_time
- Then on top of this query, for each product, calculate the count, sum of reordered, count of product_seq_time = 1 and count of product_seq_time = 2.

In [17]:
prd_features = spark.sql(
    'with temp as ( \n'
    'select user_id, \n'
    'order_number, \n'
    'product_id, \n'
    'reordered, \n'
    'rank() over(partition by user_id, product_id order by order_number) product_seq_time \n'
     'from order_products_prior \n'
     ') \n'
     'select product_id, \n'
     'count(product_id) total_products_by_productid, \n'
     'sum(reordered) total_reordered, \n'
     'sum(case when product_seq_time = 1 then 1 else 0 end) product_seq_time_1, \n'
     'sum(case when product_seq_time = 2 then 1 else 0 end) product_seq_time_2 \n'
     'from temp \n'
     'group by product_id;')

prd_features.show()

+----------+---------------------------+---------------+------------------+------------------+
|product_id|total_products_by_productid|total_reordered|product_seq_time_1|product_seq_time_2|
+----------+---------------------------+---------------+------------------+------------------+
|     35951|                      57895|          43875|             14020|              8244|
|     13780|                       5136|           1712|              3424|               861|
|     10921|                         39|              8|                31|                 6|
|     47626|                     152657|         106255|             46402|             27195|
|     39275|                     100060|          62922|             37138|             19091|
|     43504|                      11110|           7378|              3732|              1852|
|     32912|                        520|            316|               204|                87|
|     33731|                      45238|          

In [18]:
# write prd_features in parquet in S3
prd_features.coalesce(1).write.parquet(f's3://{bucket}/sprint3-prd-features/')




### make some joins of the above features
- user_features_1 & user_features_2 join for user_features
- user_features join up_features for user_up_features
- user_up_features join prd_features for final_features

In [19]:
user_features = user_features_1.join(user_features_2, 'user_id', 'left')
user_up_features = user_features.join(up_features, 'user_id', 'left')
final_features = user_up_features.join(prd_features, 'product_id', 'left')

final_features.show()

+----------+-------+----------------+--------------------------+--------------------------+------------------------+---------------------------------+-------------------+------------------------+--------------------------+--------------------------+------------------------+---------------------------+---------------+------------------+------------------+
|product_id|user_id|max_order_number|sum_days_since_prior_order|avg_days_since_prior_order|total_products_by_userid|total_distinct_products_by_userid| user_reorder_ratio|total_orders_by_userprod|min_ordernumber_byuserprod|max_ordernumber_byuserprod|avg_addtocart_byuserprod|total_products_by_productid|total_reordered|product_seq_time_1|product_seq_time_2|
+----------+-------+----------------+--------------------------+--------------------------+------------------------+---------------------------------+-------------------+------------------------+--------------------------+--------------------------+------------------------+------------

### final_features write to S3 destination

In [14]:
final_features.coalesce(1).write.option('header', 'true').mode('overwrite').csv(f's3://{bucket}/{folder}/')




In [20]:
# write final_features in parquet in S3
final_features.coalesce(1).write.parquet(f's3://{bucket}/sprint3-final-features/')




#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)