In [1]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf,count,countDistinct
from datetime import datetime, timedelta
from urllib.parse import unquote
import sqlalchemy as sa
import pandas as pd
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from hashids import Hashids
import pyspark.sql.types as T
from pyspark.sql.functions import sum as s
from pyspark.sql import SparkSession
import sys
from pyspark.sql.functions import substring
from pyspark.sql import Window

In [2]:
spark = SparkSession.builder.appName("test").getOrCreate()

# Read Data

In [3]:
#Read SCV file in here

2023-01-02 22:03:33.703169
2023-01-01 22:03:45.298553


In [4]:
final_df.show()

+----------+-------------------+--------------------+--------------------+-----------------+--------------------+------+-----------+--------------------+--------------------+---------+
|    bucket|              time1|            vendorid|                udid|           source|              medium|action|actionparam|              userid|          vendorcode|supertype|
+----------+-------------------+--------------------+--------------------+-----------------+--------------------+------+-----------+--------------------+--------------------+---------+
|2023-01-02|2023-01-02 01:16:00|cf83e1357eefb8bdf...|d2bd98ed2cda72c42...|        home_page|          categories| click|          7|4d02d3c5ef2cd6ba7...|cf83e1357eefb8bdf...|        1|
|2023-01-02|2023-01-02 01:16:30|cf83e1357eefb8bdf...|e91098d52a8176993...|restaurant_detail|              render|  load|           |9f231ce7826cad2a4...|c54b50898369af54c...|         |
|2023-01-02|2023-01-02 01:17:00|cf83e1357eefb8bdf...|1b1a7b3475af4e41b...|r

In [24]:
final_df.head(2)

[Row(bucket='2022-11-20', time1='2022-11-20 00:19:14', vendorid='cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', udid='104d83082ccb6fa71c30f6ad8e4d42fca003122e891655cf65884d3aab36b16a103b79f5b12fcd706f687e7efbfa1079333f26b03c7a576be5d56b7bd80a27cc', source='home_page', medium='render', action='load', actionparam='', userid='a12e5525cd89ca46df4b188e72f3bca4a31d542ec011e54ae0bb40a68b8f31c38b032af1a614340dadbbf629a0a3cff9ade0a3da7956de02358f7916c8229e82', vendorcode='cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', supertype=''),
 Row(bucket='2022-11-20', time1='2022-11-20 00:19:29', vendorid='cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', udid='cf953be16313b96e355efab0b7b69a4d4fc8fec477a21ada3475a18fd1aa7d8f45b5f92732cc2d7c0ada590f18cbaecbfbd9

# take a look in Structuredf

In [5]:
final_df.printSchema()

root
 |-- bucket: string (nullable = true)
 |-- time1: string (nullable = true)
 |-- vendorid: string (nullable = true)
 |-- udid: string (nullable = true)
 |-- source: string (nullable = true)
 |-- medium: string (nullable = true)
 |-- action: string (nullable = true)
 |-- actionparam: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- vendorcode: string (nullable = true)
 |-- supertype: string (nullable = true)



In [6]:
final_df.select("source").distinct().show(truncate = False)

+-----------------+
|source           |
+-----------------+
|restaurant_detail|
|home_page        |
+-----------------+



In [7]:
final_df.select("medium").distinct().show(truncate = False)

+---------------------+
|medium               |
+---------------------+
|render               |
|categories           |
|services_see_more    |
|services             |
|services_see_more_btn|
+---------------------+



In [8]:
final_df.select("action").distinct().show(truncate = False)

+------+
|action|
+------+
|load  |
|click |
+------+



In [9]:
final_df.select("supertype").distinct().show(truncate = False)

+---------+
|supertype|
+---------+
|11       |
|3        |
|6        |
|1        |
|4        |
|         |
|2        |
|8        |
|5        |
|21       |
|22       |
|9        |
|7        |
+---------+



# Measures

In [25]:
enteredToHomePage = final_df.filter("action = 'load' and medium = 'render' and source = 'home_page'").groupBy("bucket")\
        .agg(countDistinct('userid').alias("distinctHomePageLoad"),count('userid').alias("HomePageLoad"))

In [26]:
enteredToHomePage.show()

+----------+--------------------+------------+
|    bucket|distinctHomePageLoad|HomePageLoad|
+----------+--------------------+------------+
|2022-11-19|              295699|      843730|
|2022-11-20|              204689|      553274|
+----------+--------------------+------------+



In [46]:
entered_to_list = final_df.filter("""action = 'click' and source = 'home_page' 
                                   and medium in ('services', 'services_see_more', 'services_see_more_btn', 'services_scroll') and actionParam !='' """).groupBy("bucket","actionParam")\
        .agg(countDistinct('userid').alias("distinctGoToService"),count('userid').alias("GoToService"))

In [47]:
entered_to_list.show()

+----------+-----------+-------------------+-----------+
|    bucket|actionParam|distinctGoToService|GoToService|
+----------+-----------+-------------------+-----------+
|2022-11-20|          4|              22588|      53355|
|2022-11-19|          8|               5703|       8239|
|2022-11-19|         11|              10366|      16029|
|2022-11-20|          8|               3445|       4774|
|2022-11-19|          4|              32288|      70847|
|2022-11-19|         21|                702|        979|
|2022-11-19|         23|                501|        616|
|2022-11-20|         21|                436|        644|
|2022-11-19|         18|               4010|       7112|
|2022-11-20|         24|                352|        425|
|2022-11-20|          6|               8278|      12340|
|2022-11-20|         11|               7128|      10694|
|2022-11-20|          2|               8366|      12098|
|2022-11-20|          9|               1006|       1173|
|2022-11-20|         22|       

In [48]:
final_df.filter("""source = 'restaurant_detail' 
                and medium = 'render' and vendorCode != '' and superType !=''""").groupBy("bucket","superType","vendorCode")\
        .agg(countDistinct('userid').alias("distinctVendorView"),count('userid').alias("VendorView")).show()

+----------+---------+--------------------+------------------+----------+
|    bucket|superType|          vendorCode|distinctVendorView|VendorView|
+----------+---------+--------------------+------------------+----------+
|2022-11-20|        1|320afa1765b50edba...|                 2|         9|
|2022-11-20|        1|e7e84e3e5953046f6...|                 2|         2|
|2022-11-20|        1|b22a679ead21bef83...|                 1|         5|
|2022-11-20|        1|843a14078c8df658c...|                 2|         3|
|2022-11-19|        6|d20358fcd1b1611e1...|                 4|         6|
|2022-11-20|        1|a560916f75cecaa61...|                 2|         4|
|2022-11-20|        1|d75ab4a5ab9d16a67...|                 2|         3|
|2022-11-20|        1|8e4dfb9d7ebe24d6c...|                 4|         6|
|2022-11-20|        1|25d4f03b3ff9a68d9...|                 2|         3|
|2022-11-20|        1|5052341368be4addf...|                 1|         3|
|2022-11-19|        1|7da4247262f7dd80