In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3b404c29f72a2288787f9d8cab0e0cbfe9b53e006ba086709533f517285ea554
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
if __name__ == "__main__":
  spark = SparkSession.builder.appName('Jobathon').master('local').getOrCreate()

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
df_clickstream = '/content/drive/My Drive/jobathon_sample_data/jobathon_click_data.json'
user_mapping = '/content/drive/My Drive/jobathon_sample_data/jobathon_login_data.csv'

In [11]:
def sample_function(spark, s3_clickstream_path, s3_login_path):
    
    df_clickstream =  spark.read.format("json").load(s3_clickstream_path)
    
    user_mapping =  spark.read.format("csv").option("header",True).load(s3_login_path)

    df_clickstream = df_clickstream.withColumn('current_page_url',col('client_side_data.current_page_url')).drop('client_side_data')

    df_clickstream = user_mapping.join(df_clickstream, (user_mapping.session_id == df_clickstream.session_id)&(to_date(user_mapping.login_date_time)==to_date(df_clickstream.event_date_time)),'right').drop(user_mapping.session_id)

    df_clickstream = df_clickstream.fillna(value='Unregister',subset=["user_id"])

    df_clickstream = df_clickstream.withColumn('logged_in', when((df_clickstream.user_id == 'Unregister'),lit('0')).otherwise(lit('1'))) \
                            .withColumn('click', when((df_clickstream.event_type == 'click'),lit('1')).otherwise(lit('0'))) \
                            .withColumn('pageload', when((df_clickstream.event_type == 'pageload'),lit('1')).otherwise(lit('0'))) \
                            .withColumn('current_date',substring(col("event_date_time"),1,10))

    for_data = df_clickstream.groupBy('current_date','user_id','browser_id').agg(min(to_timestamp(df_clickstream.event_date_time)).alias('First')).select('user_id','First','browser_id','current_date')
    
    for_data = for_data.join(df_clickstream.select('current_page_url','event_date_time'), (for_data['first'] == df_clickstream['event_date_time'])&(for_data['user_id']==df_clickstream['user_id'])&(for_data['browser_id']==df_clickstream['browser_id']),'left')
    
    df_clickstream = df_clickstream.join(for_data,(for_data['current_date'] == df_clickstream['current_date'])&(for_data['user_id']==df_clickstream['user_id'])&(for_data['browser_id']==df_clickstream['browser_id']),'left')\
                                    .drop(for_data.current_date)\
                                    .drop(for_data.user_id)\
                                    .drop(for_data.browser_id)\
                                    .drop(for_data.current_page_url)     

    df_clickstream = df_clickstream.withColumnRenamed('current_page_url','first_url')

    df_clickstream = df_clickstream.select('*').groupBy('current_date','browser_id','user_id','logged_in','first_url').agg(sum(df_clickstream.click).alias('number_of_clicks'),sum(df_clickstream.pageload).alias('number_of_pageloads'))
    
    df_clickstream = df_clickstream.replace("Unregister", None ,subset = ["user_id"])

    df_union = df_clickstream.select("current_date","browser_id","user_id","logged_in","first_url","number_of_clicks","number_of_pageloads")

    df_union.createOrReplaceTempView("df_union_tbl")
    
    df_result = spark.sql("select * from df_union_tbl")
    
    # Return your final spark df"""
    return df_result

In [12]:
sample_function(spark,df_clickstream,user_mapping).show(5)

+------------+--------------------+-------+---------+--------------------+----------------+-------------------+
|current_date|          browser_id|user_id|logged_in|           first_url|number_of_clicks|number_of_pageloads|
+------------+--------------------+-------+---------+--------------------+----------------+-------------------+
|  2022-07-31|1DSHqOeZmWRLijQx9...|   null|        0|https://www.gosho...|             4.0|                0.0|
|  2022-07-31|1j5R1PQuTDcHXcd52...|   null|        0|https://www.gosho...|            24.0|                2.0|
|  2022-07-31|1r3shTM5PCT83mpWD...|   null|        0|https://www.gosho...|             2.0|                0.0|
|  2022-07-31|24vwqAsGWbb2CkiWB...|   null|        0|https://www.gosho...|             0.0|                1.0|
|  2022-07-31|27ODeTm2X9Nhrgs1a...|   null|        0|https://www.gosho...|             1.0|                0.0|
+------------+--------------------+-------+---------+--------------------+----------------+-------------