In [1]:
spark

StatementMeta(sparkpool, 73, 1, Finished, Available)

In [14]:
spark.sql("""create table if not exists clickstream_test(
 time_stamp string,
 session_id string,
 session_user_id string,
 user_id string,
 search_term string,
 search_result_count string,
 autosuggest_prefix string,
 autosuggest_position integer,
 click_position string,
 device string,
 location string,
 sort string,
 product_id string,
 sale_info array<struct<price:string,product_id:string,qty:string>>,
 filter string,
 event_name string
)
USING PARQUET
OPTIONS(path='abfss://devsearchfs@devsearchcroma.dfs.core.windows.net/test/tul/*.parquet');""")

StatementMeta(sparkpool, 73, 14, Finished, Available)

DataFrame[]

In [2]:
spark.sql("""CREATE TABLE IF NOT EXISTS autosuggest
(
  date DATE,
  search_term STRING,
  autosuggest_prefix STRING,
  autosuggest_position INT,
  views INT,
  wishlists INT,
  cartadds INT,
  orders INT,
  order_units INT,
  gmv FLOAT,
  null_searches INT,
  created_at TIMESTAMP,
  updated_at TIMESTAMP,
  year INT,
  month INT,
  day INT
)
USING DELTA
PARTITIONED BY (year, month, day)
OPTIONS (description = 'This table is PARTITIONED  by (year, month, day)');""")

StatementMeta(sparkpool, 73, 2, Finished, Available)

DataFrame[]

In [13]:
spark.sql("""drop table clickstream_test""")

StatementMeta(sparkpool, 73, 13, Finished, Available)

DataFrame[]

In [None]:
spark.sql("""WITH target_max as (
        select max(updated_at) as target_max_time from autosuggest
    ),
    source_max as(
        select max(time_stamp) as source_max_time from clickstream_test
    )
MERGE INTO autosuggest as sink
USING (
    WITH source_data as
      (
        SELECT time_stamp, user_id, session_id, search_term, search_result_count, event_name, sale_info,
        cast(autosuggest_position as int) autosuggest_position,autosuggest_prefix
        FROM clickstream_test
        WHERE search_term IS NOT NULL and autosuggest_prefix is NOT NULL and
        CASE WHEN (select target_max_time from target_max) IS NULL THEN time_stamp <= CURRENT_TIMESTAMP() 
        ELSE time_stamp > (select target_max_time from target_max) END
      ),
    rest_metrics as
    (
      SELECT *, CONCAT(date, search_term, autosuggest_prefix, autosuggest_position) as joining_key FROM
      (
        SELECT date(time_stamp) as date, search_term, autosuggest_prefix, autosuggest_position,
        COUNT(search_term) as searches,
        SUM(case when cast(search_result_count as int)=0 then 1 else 0 end) as null_searches,
        SUM(CASE WHEN event_name = 'PDPEvent' THEN 1 ELSE 0 END) as views,
        SUM(CASE WHEN event_name = 'CartEvent' THEN 1 ELSE 0 END) as cartadds,
        SUM(CASE WHEN event_name = 'WishlistEvent' THEN 1 ELSE 0 END) as wishlists
        FROM (SELECT DISTINCT time_stamp,user_id,session_id,search_term,
        search_result_count,autosuggest_prefix, autosuggest_position,event_name FROM source_data) x
        GROUP BY 1,2,3,4
      )
    ),
    order_metrics as
    (
      SELECT *, CONCAT(date, search_term, autosuggest_prefix, autosuggest_position) as joining_key FROM
      (
        SELECT date(time_stamp) date, search_term, autosuggest_prefix, autosuggest_position,
        COUNT(*) as orders,
        SUM(qty) as order_units,
        SUM(gmv) as gmv
        FROM
        (
          SELECT DISTINCT time_stamp as time_stamp, user_id, session_id, search_term, 
          autosuggest_prefix, autosuggest_position,
          si.product_id as product_id,
          si.qty as qty,
          si.price as price,
          (si.qty * si.price) as gmv,
          event_name
          FROM source_data LATERAL VIEW EXPLODE (sale_info) as si
          WHERE event_name = 'order_event'
        ) as x
        GROUP BY 1,2,3,4
        )
    )
    SELECT DISTINCT rm.date, year(rm.date) as year, month(rm.date) as month, day(rm.date) as day, 
    rm.search_term, rm.autosuggest_prefix,  
    CAST(rm.autosuggest_position as INT) as autosuggest_position , 
    rm.views, rm.cartadds, rm.wishlists, 
    COALESCE(om.orders,0) as orders, COALESCE(om.order_units,0) as order_units, 
    COALESCE(om.gmv,0) as gmv, rm.null_searches
    FROM rest_metrics as rm
    LEFT JOIN order_metrics as om
    ON rm.joining_key = om.joining_key
    ) src
ON sink.date = src.date and sink.search_term = src.search_term and sink.autosuggest_prefix = src.autosuggest_prefix
   and sink.autosuggest_position = src.autosuggest_position
WHEN MATCHED THEN UPDATE SET sink.views = sink.views+src.views, sink.cartadds = sink.cartadds+src.cartadds,
                            sink.wishlists = sink.wishlists+src.wishlists, sink.gmv = sink.gmv+src.gmv,
                             sink.updated_at = (select source_max_time from source_max), 
                             sink.orders = sink.orders+src.orders,
                             sink.order_units = sink.order_units+src.order_units,
                             sink.null_searches = sink.null_searches+src.null_searches
WHEN NOT MATCHED THEN INSERT(date, search_term, autosuggest_prefix, autosuggest_position, views, cartadds, wishlists, orders, order_units, gmv, null_searches, created_at, updated_at, year, month, day)
                      VALUES(src.date, src.search_term, src.autosuggest_prefix, src.autosuggest_position, src.views, src.cartadds, src.wishlists, src.orders, src.order_units,
                             src.gmv, src.null_searches, 
                             (select source_max_time from source_max), 
                             (select source_max_time from source_max), src.year, src.month, src.day);
    """)

In [27]:
spark.sql("""WITH target_max as (
        select max(updated_at) as target_max_time from autosuggest
    ),
    source_max as(
        select max(time_stamp) as source_max_time from clickstream_test
    ), 
    source_data as
      (
        SELECT time_stamp, user_id, session_id, search_term, search_result_count, event_name, sale_info,
        cast(autosuggest_position as int) autosuggest_position,autosuggest_prefix
        FROM clickstream_test
        WHERE search_term IS NOT NULL and autosuggest_prefix is NOT NULL and
        CASE WHEN (select target_max_time from target_max) IS NULL THEN time_stamp <= CURRENT_TIMESTAMP() 
        ELSE time_stamp > (select target_max_time from target_max) END
      ),
    rest_metrics as
    (
      SELECT *, CONCAT(date, search_term, autosuggest_prefix, autosuggest_position) as joining_key FROM
      (
        SELECT date(time_stamp) as date, search_term, autosuggest_prefix, autosuggest_position,
        COUNT(search_term) as searches,
        SUM(case when cast(search_result_count as int)=0 then 1 else 0 end) as null_searches,
        SUM(CASE WHEN event_name = 'PDPEvent' THEN 1 ELSE 0 END) as views,
        SUM(CASE WHEN event_name = 'CartEvent' THEN 1 ELSE 0 END) as cartadds,
        SUM(CASE WHEN event_name = 'WishlistEvent' THEN 1 ELSE 0 END) as wishlists
        FROM (SELECT DISTINCT time_stamp,user_id,session_id,search_term,
        search_result_count,autosuggest_prefix, autosuggest_position,event_name FROM source_data) x
        GROUP BY 1,2,3,4
      )
    ),
    order_metrics as
    (
      SELECT *, CONCAT(date, search_term, autosuggest_prefix, autosuggest_position) as joining_key FROM
      (
        SELECT date(time_stamp) date, search_term, autosuggest_prefix, autosuggest_position,
        COUNT(*) as orders,
        SUM(qty) as order_units,
        SUM(gmv) as gmv
        FROM
        (
          SELECT DISTINCT time_stamp as time_stamp, user_id, session_id, search_term, 
          autosuggest_prefix, autosuggest_position,
          si.product_id as product_id,
          si.qty as qty,
          si.price as price,
          (si.qty * si.price) as gmv,
          event_name
          FROM source_data LATERAL VIEW EXPLODE (sale_info) as si
          WHERE event_name = 'order_event'
        ) as x
        GROUP BY 1,2,3,4
        )
    )
    SELECT DISTINCT rm.date, year(rm.date) as year, month(rm.date) as month, day(rm.date) as day, 
    rm.search_term, rm.autosuggest_prefix,  
    CAST(rm.autosuggest_position as INT) as autosuggest_position , 
    rm.views, rm.cartadds, rm.wishlists, 
    COALESCE(om.orders,0) as orders, COALESCE(om.order_units,0) as order_units, 
    COALESCE(om.gmv,0) as gmv, rm.null_searches
    FROM rest_metrics as rm
    LEFT JOIN order_metrics as om
    ON rm.joining_key = om.joining_key""").show(2, truncate=False)

StatementMeta(sparkpool, 73, 27, Finished, Available)

+----------+----+-----+---+----------------------+----------------------+--------------------+-----+--------+---------+------+-----------+---+-------------+
|date      |year|month|day|search_term           |autosuggest_prefix    |autosuggest_position|views|cartadds|wishlists|orders|order_units|gmv|null_searches|
+----------+----+-----+---+----------------------+----------------------+--------------------+-----+--------+---------+------+-----------+---+-------------+
|2022-09-01|2022|9    |1  |gaming tshirt for boys|gaming tshirt for boys|null                |0    |0       |0        |0     |0.0        |0.0|1            |
|2022-09-01|2022|9    |1  |mens sweatshirts      |mens sweatshirts      |null                |0    |0       |0        |0     |0.0        |0.0|1            |
+----------+----+-----+---+----------------------+----------------------+--------------------+-----+--------+---------+------+-----------+---+-------------+
only showing top 2 rows



In [21]:
spark.sql("""select * from autosuggest""").count()

StatementMeta(sparkpool, 73, 21, Finished, Available)

0

Autosuggest

In [None]:
WITH target_max as (
        select max(updated_at) as target_max_time from autosuggest
    ),
source_max as(
        select max(event_time) as source_max_time from clickstream
    ),
source_data as
      (
        SELECT event_time, user_id, session_id, search_term, search_result_count, event_type, sale_info,
        cast(autosuggest_position as int) autosuggest_position,autosuggest_prefix
        FROM clickstream_test
        WHERE search_term IS NOT NULL and autosuggest_prefix is NOT NULL and
        CASE WHEN (select target_max_time from target_max) IS NULL THEN event_time <= CURRENT_TIMESTAMP() 
        ELSE event_time > (select target_max_time from target_max) END
      ),
rest_metrics as
    (
      SELECT *, CONCAT(date, search_term, autosuggest_prefix, autosuggest_position) as joining_key FROM
      (
        SELECT date(event_time) as date, search_term, autosuggest_prefix, autosuggest_position,
        COUNT(search_term) as searches,
        SUM(case when cast(search_result_count as int)=0 then 1 else 0 end) as null_searches,
        SUM(CASE WHEN event_type = 'PDPEvent' THEN 1 ELSE 0 END) as views,
        SUM(CASE WHEN event_type = 'CartEvent' THEN 1 ELSE 0 END) as cartadds,
        SUM(CASE WHEN event_type = 'WishlistEvent' THEN 1 ELSE 0 END) as wishlists
        FROM (SELECT DISTINCT event_time,user_id,session_id,search_term,search_result_count,autosuggest_prefix, autosuggest_position,event_type FROM source_data) x
        GROUP BY 1,2,3,4
      )
    ),
order_metrics as
    (
      SELECT *, CONCAT(date, search_term, autosuggest_prefix, autosuggest_position) as joining_key FROM
      (
        SELECT date(event_time) date, search_term, autosuggest_prefix, autosuggest_position,
        COUNT(*) as orders,
        SUM(qty) as order_units,
        SUM(gmv) as gmv
        FROM
        (
          SELECT DISTINCT event_time as event_time, user_id, session_id, search_term, autosuggest_prefix, autosuggest_position,
          si.product_id as product_id,
          si.qty as qty,
          si.price as price,
          (si.qty * si.price) as gmv,
          event_type
          FROM source_data LATERAL VIEW EXPLODE (sale_info) as si
          WHERE event_type = 'OrderEvent'
        ) as x
        GROUP BY 1,2,3,4
        )
    )
SELECT DISTINCT rm.date, rm.search_term, rm.autosuggest_prefix,  CAST(rm.autosuggest_position as INT) as autosuggest_position , 
rm.views, rm.cartadds, rm.wishlists, 
COALESCE(om.orders,0) as orders, COALESCE(om.order_units,0) as order_units, 
COALESCE(om.gmv,0) as gmv, rm.null_searches
FROM rest_metrics as rm
LEFT JOIN order_metrics as om
ON rm.joining_key = om.joining_key