### Confugration

In [0]:
# Source Location
sourceLocation = "dbfs:/clickStream/source"
try:
    dbutils.fs.ls(sourceLocation)
    print("Directory Exists")
except:
    dbutils.fs.mkdirs(sourceLocation)



Directory Exists


In [0]:
# load files to source location
dbutils.fs.cp('dbfs:/FileStore/clickStream/psDataset.txt',sourceLocation+'/psDataset.txt')

Out[3]: True

### Bronze Layer

In [0]:
# Read Data into Data frame from source Location
df = spark.read.csv(f'{sourceLocation}/*.txt')
df.show(truncate=False)

+------------------------+
|_c0                     |
+------------------------+
|Given Dataset:          |
|Timestamp\t     User_id |
|2021-05-01T11:00:00Z\tu1|
|2021-05-01T13:13:00Z\tu1|
|2021-05-01T15:00:00Z\tu2|
|2021-05-01T11:25:00Z\tu1|
|2021-05-01T11:50:00Z\tu1|
|2021-05-01T15:15:00Z\tu2|
|2021-05-01T02:13:00Z\tu3|
|2021-05-03T02:15:00Z\tu4|
|2021-05-02T11:45:00Z\tu1|
|2021-05-02T11:00:00Z\tu3|
|2021-05-03T12:15:00Z\tu3|
|2021-05-03T11:00:00Z\tu4|
|2021-05-03T21:00:00Z\tu4|
|2021-05-04T19:00:00Z\tu2|
|2021-05-04T09:00:00Z\tu3|
|2021-05-04T08:15:00Z\tu1|
+------------------------+



In [0]:
from pyspark.sql.functions import split,upper, monotonically_increasing_id,col
dfWithHeader = df.withColumn('id',monotonically_increasing_id()).filter('id>1').select(col('_c0').alias('data'))\
                .withColumn('timeStamp',split('data','\t')[0])\
                .withColumn('User_id',split('data','\t')[1])\
                .drop('data')

dfWithHeader.show(truncate=False)

+--------------------+-------+
|timeStamp           |User_id|
+--------------------+-------+
|2021-05-01T11:00:00Z|u1     |
|2021-05-01T13:13:00Z|u1     |
|2021-05-01T15:00:00Z|u2     |
|2021-05-01T11:25:00Z|u1     |
|2021-05-01T11:50:00Z|u1     |
|2021-05-01T15:15:00Z|u2     |
|2021-05-01T02:13:00Z|u3     |
|2021-05-03T02:15:00Z|u4     |
|2021-05-02T11:45:00Z|u1     |
|2021-05-02T11:00:00Z|u3     |
|2021-05-03T12:15:00Z|u3     |
|2021-05-03T11:00:00Z|u4     |
|2021-05-03T21:00:00Z|u4     |
|2021-05-04T19:00:00Z|u2     |
|2021-05-04T09:00:00Z|u3     |
|2021-05-04T08:15:00Z|u1     |
+--------------------+-------+



#### Loading data into Bronze Tables

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS clickstream;
USE clickstream

In [0]:
# Create Bronze Table

dfWithHeader.write.format('delta').mode('append').saveAsTable('clickstream.bronze_Tabe')


### Silver Layer

In [0]:
# Read from Bronze Table
dfBronze = spark.table('clickstream.bronze_tabe')
dfBronze.show()

+--------------------+-------+
|           timeStamp|User_id|
+--------------------+-------+
|2021-05-01T11:00:00Z|     u1|
|2021-05-01T13:13:00Z|     u1|
|2021-05-01T15:00:00Z|     u2|
|2021-05-01T11:25:00Z|     u1|
|2021-05-01T11:50:00Z|     u1|
|2021-05-01T15:15:00Z|     u2|
|2021-05-01T02:13:00Z|     u3|
|2021-05-03T02:15:00Z|     u4|
|2021-05-02T11:45:00Z|     u1|
|2021-05-02T11:00:00Z|     u3|
|2021-05-03T12:15:00Z|     u3|
|2021-05-03T11:00:00Z|     u4|
|2021-05-03T21:00:00Z|     u4|
|2021-05-04T19:00:00Z|     u2|
|2021-05-04T09:00:00Z|     u3|
|2021-05-04T08:15:00Z|     u1|
+--------------------+-------+



In [0]:
# casting timestamp col
import  pyspark.sql.functions as F
dfCorrectDataType = dfBronze.withColumn('timeStampCast',F.to_timestamp(F.col('timeStamp'),"yyyy-MM-dd'T'HH:mm:ss'Z'"))
dfCorrectDataType.show()

+--------------------+-------+-------------------+
|           timeStamp|User_id|      timeStampCast|
+--------------------+-------+-------------------+
|2021-05-01T11:00:00Z|     u1|2021-05-01 11:00:00|
|2021-05-01T13:13:00Z|     u1|2021-05-01 13:13:00|
|2021-05-01T15:00:00Z|     u2|2021-05-01 15:00:00|
|2021-05-01T11:25:00Z|     u1|2021-05-01 11:25:00|
|2021-05-01T11:50:00Z|     u1|2021-05-01 11:50:00|
|2021-05-01T15:15:00Z|     u2|2021-05-01 15:15:00|
|2021-05-01T02:13:00Z|     u3|2021-05-01 02:13:00|
|2021-05-03T02:15:00Z|     u4|2021-05-03 02:15:00|
|2021-05-02T11:45:00Z|     u1|2021-05-02 11:45:00|
|2021-05-02T11:00:00Z|     u3|2021-05-02 11:00:00|
|2021-05-03T12:15:00Z|     u3|2021-05-03 12:15:00|
|2021-05-03T11:00:00Z|     u4|2021-05-03 11:00:00|
|2021-05-03T21:00:00Z|     u4|2021-05-03 21:00:00|
|2021-05-04T19:00:00Z|     u2|2021-05-04 19:00:00|
|2021-05-04T09:00:00Z|     u3|2021-05-04 09:00:00|
|2021-05-04T08:15:00Z|     u1|2021-05-04 08:15:00|
+--------------------+-------+-

In [0]:
#reorder
dfReOrdered = dfCorrectDataType.orderBy('User_id','timeStamp')
dfReOrdered.show()

+--------------------+-------+-------------------+
|           timeStamp|User_id|      timeStampCast|
+--------------------+-------+-------------------+
|2021-05-01T11:00:00Z|     u1|2021-05-01 11:00:00|
|2021-05-01T11:25:00Z|     u1|2021-05-01 11:25:00|
|2021-05-01T11:50:00Z|     u1|2021-05-01 11:50:00|
|2021-05-01T13:13:00Z|     u1|2021-05-01 13:13:00|
|2021-05-02T11:45:00Z|     u1|2021-05-02 11:45:00|
|2021-05-04T08:15:00Z|     u1|2021-05-04 08:15:00|
|2021-05-01T15:00:00Z|     u2|2021-05-01 15:00:00|
|2021-05-01T15:15:00Z|     u2|2021-05-01 15:15:00|
|2021-05-04T19:00:00Z|     u2|2021-05-04 19:00:00|
|2021-05-01T02:13:00Z|     u3|2021-05-01 02:13:00|
|2021-05-02T11:00:00Z|     u3|2021-05-02 11:00:00|
|2021-05-03T12:15:00Z|     u3|2021-05-03 12:15:00|
|2021-05-04T09:00:00Z|     u3|2021-05-04 09:00:00|
|2021-05-03T02:15:00Z|     u4|2021-05-03 02:15:00|
|2021-05-03T11:00:00Z|     u4|2021-05-03 11:00:00|
|2021-05-03T21:00:00Z|     u4|2021-05-03 21:00:00|
+--------------------+-------+-

In [0]:
# add lag time stamp per user
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType
w = Window.partitionBy('User_id').orderBy('timeStampCast')
dfTimeDiff = dfReOrdered.withColumn('lagTimeStamp', F.lag('timeStampCast').over(w))\
            .withColumn('timeDiff',(F.unix_timestamp(col('timeStampCast'))-F.unix_timestamp(col('lagTimeStamp')))/60)\
            .fillna(0,subset=['timeDiff'])
dfTimeDiff.show(truncate=False)

+--------------------+-------+-------------------+-------------------+--------+
|timeStamp           |User_id|timeStampCast      |lagTimeStamp       |timeDiff|
+--------------------+-------+-------------------+-------------------+--------+
|2021-05-01T11:00:00Z|u1     |2021-05-01 11:00:00|null               |0.0     |
|2021-05-01T11:25:00Z|u1     |2021-05-01 11:25:00|2021-05-01 11:00:00|25.0    |
|2021-05-01T11:50:00Z|u1     |2021-05-01 11:50:00|2021-05-01 11:25:00|25.0    |
|2021-05-01T13:13:00Z|u1     |2021-05-01 13:13:00|2021-05-01 11:50:00|83.0    |
|2021-05-02T11:45:00Z|u1     |2021-05-02 11:45:00|2021-05-01 13:13:00|1352.0  |
|2021-05-04T08:15:00Z|u1     |2021-05-04 08:15:00|2021-05-02 11:45:00|2670.0  |
|2021-05-01T15:00:00Z|u2     |2021-05-01 15:00:00|null               |0.0     |
|2021-05-01T15:15:00Z|u2     |2021-05-01 15:15:00|2021-05-01 15:00:00|15.0    |
|2021-05-04T19:00:00Z|u2     |2021-05-04 19:00:00|2021-05-01 15:15:00|4545.0  |
|2021-05-01T02:13:00Z|u3     |2021-05-01

In [0]:
from pyspark.sql.functions import col, when, lag, sum
lagFun = lag('timeDiff').over(w)<=30
sumFun = sum(col('timeDiff')).over(w)
dfAbsTimeDiff =dfTimeDiff.withColumn('absTimeDiff',when(lagFun,sumFun).otherwise(col('timeDiff')))
dfAbsTimeDiff.show()

+--------------------+-------+-------------------+-------------------+--------+-----------+
|           timeStamp|User_id|      timeStampCast|       lagTimeStamp|timeDiff|absTimeDiff|
+--------------------+-------+-------------------+-------------------+--------+-----------+
|2021-05-01T11:00:00Z|     u1|2021-05-01 11:00:00|               null|     0.0|        0.0|
|2021-05-01T11:25:00Z|     u1|2021-05-01 11:25:00|2021-05-01 11:00:00|    25.0|       25.0|
|2021-05-01T11:50:00Z|     u1|2021-05-01 11:50:00|2021-05-01 11:25:00|    25.0|       50.0|
|2021-05-01T13:13:00Z|     u1|2021-05-01 13:13:00|2021-05-01 11:50:00|    83.0|      133.0|
|2021-05-02T11:45:00Z|     u1|2021-05-02 11:45:00|2021-05-01 13:13:00|  1352.0|     1352.0|
|2021-05-04T08:15:00Z|     u1|2021-05-04 08:15:00|2021-05-02 11:45:00|  2670.0|     2670.0|
|2021-05-01T15:00:00Z|     u2|2021-05-01 15:00:00|               null|     0.0|        0.0|
|2021-05-01T15:15:00Z|     u2|2021-05-01 15:15:00|2021-05-01 15:00:00|    15.0| 

In [0]:
# identify session number

dfSessions = dfAbsTimeDiff.withColumn('incrementNum',when((col('timeDiff')>30) | (col('absTimeDiff')>120),1).otherwise(0))\
                          .withColumn('cumNum',F.sum('incrementNum').over(w))\
                              .withColumn('day',F.to_date('timeStampCast'))\
                          .withColumn('Session_id',F.concat(col('User_id'),F.lit('_s'),col('cumNum')+1))\
                              .drop('incrementNum','cumNum','absTimeDiff','lagTimeStamp')
dfSessions.show()

+--------------------+-------+-------------------+--------+----------+----------+
|           timeStamp|User_id|      timeStampCast|timeDiff|       day|Session_id|
+--------------------+-------+-------------------+--------+----------+----------+
|2021-05-01T11:00:00Z|     u1|2021-05-01 11:00:00|     0.0|2021-05-01|     u1_s1|
|2021-05-01T11:25:00Z|     u1|2021-05-01 11:25:00|    25.0|2021-05-01|     u1_s1|
|2021-05-01T11:50:00Z|     u1|2021-05-01 11:50:00|    25.0|2021-05-01|     u1_s1|
|2021-05-01T13:13:00Z|     u1|2021-05-01 13:13:00|    83.0|2021-05-01|     u1_s2|
|2021-05-02T11:45:00Z|     u1|2021-05-02 11:45:00|  1352.0|2021-05-02|     u1_s3|
|2021-05-04T08:15:00Z|     u1|2021-05-04 08:15:00|  2670.0|2021-05-04|     u1_s4|
|2021-05-01T15:00:00Z|     u2|2021-05-01 15:00:00|     0.0|2021-05-01|     u2_s1|
|2021-05-01T15:15:00Z|     u2|2021-05-01 15:15:00|    15.0|2021-05-01|     u2_s1|
|2021-05-04T19:00:00Z|     u2|2021-05-04 19:00:00|  4545.0|2021-05-04|     u2_s2|
|2021-05-01T02:1

In [0]:
# Loading/merging data into Silver Table
silverLocation = "dbfs:/user/hive/warehouse/clickstream.db/silver_table"

from delta.tables import DeltaTable
if DeltaTable.isDeltaTable(spark,silverLocation):
    #merge data into existing table
    DeltaTable.forName(spark,'silver_table').alias('target').merge(
        source=dfSessions.alias('source'),
        condition="target.User_id=source.User_id and target.timeStamp=source.timeStamp"
    )\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    #if not a delta table, create one & overwrite all data
    dfSessions.write.format('delta').mode('overwrite').saveAsTable('silver_table')

In [0]:
%sql
DESCRIBE HISTORY silver_table

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2024-03-01T14:42:25.000+0000,3575659572783655,jadhavprem22@gmail.com,MERGE,"Map(predicate -> [""((User_id#5655 = User_id#878) AND (timeStamp#5654 = timeStamp#877))""], matchedPredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(3150524556320814),0301-132924-1r6htkhh,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 3422, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 0, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 16, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 3349)",,Databricks-Runtime/12.2.x-scala2.12
1,2024-03-01T14:16:04.000+0000,3575659572783655,jadhavprem22@gmail.com,MERGE,"Map(predicate -> [""((User_id#3195 = User_id#878) AND (timeStamp#3194 = timeStamp#877))""], matchedPredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(3150524556320814),0301-132924-1r6htkhh,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 5211, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 0, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 16, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 5109)",,Databricks-Runtime/12.2.x-scala2.12
0,2024-03-01T14:08:35.000+0000,3575659572783655,jadhavprem22@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(3150524556320814),0301-132924-1r6htkhh,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 16, numOutputBytes -> 2429)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%fs
ls dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log

path,name,size,modificationTime
dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log/00000000000000000000.crc,00000000000000000000.crc,3118,1709302119000
dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log/00000000000000000000.json,00000000000000000000.json,2169,1709302115000
dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log/00000000000000000001.crc,00000000000000000001.crc,3118,1709302568000
dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log/00000000000000000001.json,00000000000000000001.json,1215,1709302564000
dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log/00000000000000000002.crc,00000000000000000002.crc,3118,1709304147000
dbfs:/user/hive/warehouse/clickstream.db/silver_table/_delta_log/00000000000000000002.json,00000000000000000002.json,1215,1709304145000


In [0]:
%sql
select * from clickstream.silver_table limit 10

timeStamp,User_id,timeStampCast,timeDiff,day,Session_id
2021-05-01T11:00:00Z,u1,2021-05-01T11:00:00.000+0000,0.0,2021-05-01,u1_s1
2021-05-01T11:25:00Z,u1,2021-05-01T11:25:00.000+0000,25.0,2021-05-01,u1_s1
2021-05-01T11:50:00Z,u1,2021-05-01T11:50:00.000+0000,25.0,2021-05-01,u1_s1
2021-05-01T13:13:00Z,u1,2021-05-01T13:13:00.000+0000,83.0,2021-05-01,u1_s2
2021-05-02T11:45:00Z,u1,2021-05-02T11:45:00.000+0000,1352.0,2021-05-02,u1_s3
2021-05-04T08:15:00Z,u1,2021-05-04T08:15:00.000+0000,2670.0,2021-05-04,u1_s4
2021-05-01T15:00:00Z,u2,2021-05-01T15:00:00.000+0000,0.0,2021-05-01,u2_s1
2021-05-01T15:15:00Z,u2,2021-05-01T15:15:00.000+0000,15.0,2021-05-01,u2_s1
2021-05-04T19:00:00Z,u2,2021-05-04T19:00:00.000+0000,4545.0,2021-05-04,u2_s2
2021-05-01T02:13:00Z,u3,2021-05-01T02:13:00.000+0000,0.0,2021-05-01,u3_s1


### Gold Layer

In [0]:
# read from silver table
dfSilverTable = spark.table('clickstream.silver_table')
dfSilverTable.show()

+--------------------+-------+-------------------+--------+----------+----------+
|           timeStamp|User_id|      timeStampCast|timeDiff|       day|Session_id|
+--------------------+-------+-------------------+--------+----------+----------+
|2021-05-01T11:00:00Z|     u1|2021-05-01 11:00:00|     0.0|2021-05-01|     u1_s1|
|2021-05-01T11:25:00Z|     u1|2021-05-01 11:25:00|    25.0|2021-05-01|     u1_s1|
|2021-05-01T11:50:00Z|     u1|2021-05-01 11:50:00|    25.0|2021-05-01|     u1_s1|
|2021-05-01T13:13:00Z|     u1|2021-05-01 13:13:00|    83.0|2021-05-01|     u1_s2|
|2021-05-02T11:45:00Z|     u1|2021-05-02 11:45:00|  1352.0|2021-05-02|     u1_s3|
|2021-05-04T08:15:00Z|     u1|2021-05-04 08:15:00|  2670.0|2021-05-04|     u1_s4|
|2021-05-01T15:00:00Z|     u2|2021-05-01 15:00:00|     0.0|2021-05-01|     u2_s1|
|2021-05-01T15:15:00Z|     u2|2021-05-01 15:15:00|    15.0|2021-05-01|     u2_s1|
|2021-05-04T19:00:00Z|     u2|2021-05-04 19:00:00|  4545.0|2021-05-04|     u2_s2|
|2021-05-01T02:1

#### Load/merge 3 Gold Tables

In [0]:
dfUserSession = dfSilverTable.select('timeStamp','User_id','Session_id')
dfUserSession.show()

+--------------------+-------+----------+
|           timeStamp|User_id|Session_id|
+--------------------+-------+----------+
|2021-05-01T11:00:00Z|     u1|     u1_s1|
|2021-05-01T11:25:00Z|     u1|     u1_s1|
|2021-05-01T11:50:00Z|     u1|     u1_s1|
|2021-05-01T13:13:00Z|     u1|     u1_s2|
|2021-05-02T11:45:00Z|     u1|     u1_s3|
|2021-05-04T08:15:00Z|     u1|     u1_s4|
|2021-05-01T15:00:00Z|     u2|     u2_s1|
|2021-05-01T15:15:00Z|     u2|     u2_s1|
|2021-05-04T19:00:00Z|     u2|     u2_s2|
|2021-05-01T02:13:00Z|     u3|     u3_s1|
|2021-05-02T11:00:00Z|     u3|     u3_s2|
|2021-05-03T12:15:00Z|     u3|     u3_s3|
|2021-05-04T09:00:00Z|     u3|     u3_s4|
|2021-05-03T02:15:00Z|     u4|     u4_s1|
|2021-05-03T11:00:00Z|     u4|     u4_s2|
|2021-05-03T21:00:00Z|     u4|     u4_s3|
+--------------------+-------+----------+



In [0]:
# for user session detail table
userSessionGoldLocation = "dbfs:/user/hive/warehouse/clickstream.db/gold_all_clicks"

if DeltaTable.isDeltaTable(spark,userSessionGoldLocation):
    DeltaTable.forName(spark,'gold_all_clicks').alias('t').merge(
        source=dfUserSession.alias('s'),
        condition="t.User_id=s.User_id and t.timestamp=s.timestamp"
    )\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    dfUserSession.write.saveAsTable('gold_all_clicks')

In [0]:
# Get Number of sessions generated for each day
dfSessionEachDay = dfSilverTable.select('User_id','Session_id','timeStampCast','day')\
                            .dropDuplicates(['day','Session_id'])\
                            .groupBy('day').agg(F.count('Session_id').alias("sessionCounts")).orderBy('day')
dfSessionEachDay.show()

+----------+-------------+
|       day|sessionCounts|
+----------+-------------+
|2021-05-01|            4|
|2021-05-02|            2|
|2021-05-03|            4|
|2021-05-04|            3|
+----------+-------------+



In [0]:
# for Daily Session counts
dailySessionCounts = "dbfs:/user/hive/warehouse/clickstream.db/gold_daily_session_counts"
if DeltaTable.isDeltaTable(spark,dailySessionCounts):
    DeltaTable.forName(spark,'gold_daily_session_counts').alias('t').merge(
        source= dfSessionEachDay.alias('s'),
        condition= "t.day=s.day"
    )\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    print('no')
    dfSessionEachDay.write.mode('overwrite').saveAsTable('gold_daily_session_counts')

In [0]:
#Total time spent by a user in a day.
dfUserTimeEachDay = dfSilverTable.select('User_id','Session_id','timeStampCast','timeDiff','day')\
                                .groupBy('User_id','day').agg(F.sum('timeDiff').alias('Duration_minute'))\
                                .orderBy('User_id')
                            
dfUserTimeEachDay.show()

+-------+----------+---------------+
|User_id|       day|Duration_minute|
+-------+----------+---------------+
|     u1|2021-05-01|          133.0|
|     u1|2021-05-02|         1352.0|
|     u1|2021-05-04|         2670.0|
|     u2|2021-05-04|         4545.0|
|     u2|2021-05-01|           15.0|
|     u3|2021-05-01|            0.0|
|     u3|2021-05-04|         1245.0|
|     u3|2021-05-03|         1515.0|
|     u3|2021-05-02|         1967.0|
|     u4|2021-05-03|         1125.0|
+-------+----------+---------------+



In [0]:
# for user time spent daily
userDailyTimeSpent = "dbfs:/user/hive/warehouse/clickstream.db/gold_user_daily_time_spent"

if DeltaTable.isDeltaTable(spark,userDailyTimeSpent):
    DeltaTable.forName(spark,userDailyTimeSpent).alias('t').merge(
        source= dfUserTimeEachDay.alias('s'),
        condition= "t.User_id=s.User_id and t.day=s.day"
    )\
    .whenNotMatchedInsertAll()\
    .exceute()
else:
    dfUserTimeEachDay.write.mode('overwrite').saveAsTable('gold_user_daily_time_spent')

In [0]:
%sql
DESCRIBE HISTORY gold_user_daily_time_spent

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2024-03-01T15:08:30.000+0000,3575659572783655,jadhavprem22@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(3150524556320814),0301-132924-1r6htkhh,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 10, numOutputBytes -> 1244)",,Databricks-Runtime/12.2.x-scala2.12
