In [None]:
spark.sparkContext.install_pypi_package("boto3")

from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import *
import boto3

VBox()

Starting Spark application


In [None]:
schema1 = StructType(
    [
        StructField('anonymousId', StringType(), True),
        StructField('context', StringType(), True),
        StructField('messageId', StringType(), True),
        StructField('properties', StringType(), True),
        StructField('event', StringType(), True),
        StructField('timestamp', StringType(), True),
        StructField('type', StringType(), True),
        StructField('userId', StringType(), True),
    ]
)

schema2 = StructType(
    [
        StructField('userAgent', StringType(), True),
        StructField('ip', StringType(), True),
        StructField('locale', StringType(), True),
        StructField('page', StringType(), True)
    ]
)

schema3 = StructType(
    [
        StructField('url', StringType(), True)
    ]
)

schema4 = StructType(
    [
        StructField('path', StringType(), True),
        StructField('page', StringType(), True),
        StructField('button', StringType(), True),
        StructField('action', StringType(), True),
        StructField('properties', StringType(), True)
    ]
)

schema5 = StructType(
    [
        StructField('name', StringType(), True),
        StructField('hospital', ArrayType(StringType()), True)
    ]
)

schema6 = StructType(
    [
        StructField('hospitalId', StringType(), True),
        StructField('pageName', StringType(), True),
        StructField('routeName', StringType(), True),
        StructField('to', StringType(), True)
    ]
)

In [None]:
def src_to_segment_log(df):
    df.withColumn("schema1", from_json("value", schema1))\
        .select(col('schema1.*'))\
        .withColumn("context", from_json("context", schema2))\
        .select(col('anonymousId'), col('context.*'), col('context'), col('messageId'), col('properties'), col('event')\
                , col('timestamp'), col('type'), col('userId'))\
        .withColumn("page", from_json("page", schema3))\
        .withColumn('context_useragent', col('userAgent'))\
        .withColumn('context_ip', col('ip'))\
        .withColumn('context_locale', col('locale'))\
        .withColumn("properties_", col('properties'))\
        .select(col('anonymousId'), col('context_useragent'), col('context_ip'), col('context_locale'), col('page.*'), col('context')\
                , col('messageId'), col('properties_'), col('event'), col('timestamp'), col('type'), col('userId'))\
        .withColumn("properties_", from_json("properties_", schema4))\
        .withColumn('context_page_url', col('url'))\
        .select(col('anonymousId'), col('context_useragent'), col('context_ip'), col('context_locale'), col('context_page_url'), col('context')\
                , col('messageId'), col('properties_.*'), col('properties_'), col('event'), col('timestamp'), col('type'), col('userId'))\
        .withColumn("button", from_json("button", schema5))\
        .withColumn('properties_path', col('path'))\
        .withColumn('properties_page', col('page'))\
        .select(col('anonymousId'), col('context_useragent'), col('context_ip'), col('context_locale'), col('context_page_url'), col('context')\
                , col('messageId'), col('properties_path'), col('properties_page'), col('button.*'), col('action'), col('properties')\
                , col('properties_'), col('event'), col('timestamp'), col('type'), col('userId'))\
        .withColumn("properties_button_hospital_num", col('hospital').getItem(0))\
        .withColumn('properties_button_hospital_name', col('hospital').getItem(1))\
        .withColumn('properties_button_name', col('name'))\
        .drop(col('button'), col('hospital'), col('name'))\
        .withColumn("action", from_json("action", schema5))\
        .select(col('anonymousId'), col('context_useragent'), col('context_ip'), col('context_locale'), col('context_page_url'), col('context')\
                , col('messageId'), col('properties_path'), col('properties_page'), col('properties_button_name')\
                , col('properties_button_hospital_num'), col('properties_button_hospital_name'), col('action.*'), col('properties')\
                , col('properties_'), col('event'), col('timestamp'), col('type'), col('userId'))\
        .withColumn("properties_action_hospital_id", col('hospital').getItem(0))\
        .withColumn('properties_action_hospital_name', col('hospital').getItem(1))\
        .withColumn('properties_action_name', col('name'))\
        .drop(col('action'), col('hospital'), col('name'))\
        .withColumn("properties", from_json("properties", schema6))\
        .select(col('anonymousId'), col('context_useragent'), col('context_ip'), col('context_locale'), col('context_page_url'), col('context')\
                , col('messageId'), col('properties_path'), col('properties_page'), col('properties_button_name'), col('properties_action_name')\
                , col('properties_action_hospital_id'), col('properties_action_hospital_name')\
                , col('properties_button_hospital_num'), col('properties_button_hospital_name')\
                , col('properties.*'), col('properties_'), col('event'), col('timestamp'), col('type'), col('userId'))\
        .withColumn("properties_properties_hospitalid", col('hospitalId')).withColumn("properties_properties_pagename", col('pageName'))\
        .withColumn("properties_properties_routename", col('routeName')).withColumn("properties_properties_to", col('to'))\
        .withColumn("properties", col('properties_'))\
        .drop(col('hospitalId'), col('pageName'), col('routeName'), col('to'), col('properties_'))\
        .withColumn('timestamp', to_timestamp(col('timestamp'), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))\
        .withColumn('yw', concat(year(date_add(from_utc_timestamp(col('timestamp'), 'Asia/Seoul'), 1)), lit('-')\
                                 , lpad(weekofyear(date_add(from_utc_timestamp(col('timestamp'), 'Asia/Seoul'), 1)), 2, '0')))
    
    df.createOrReplaceTempView("df")
    df_segment_log = spark.sql(f"""
        SELECT anonymousId, context_useragent, context_ip, context_locale, context_page_url, context,
        messageId, properties_path, properties_page, properties_button_name, properties_action_name, properties_action_hospital_id,
        properties_action_hospital_name, properties_button_hospital_num, properties_button_hospital_name, properties_properties_hospitalid,
        properties_properties_pagename, properties_properties_routename, properties_properties_to, properties,
        event, from_utc_timestamp(timestamp, 'Asia/Seoul') AS timestamp,
        yw,
        date_add(date_trunc('week', date_add(from_utc_timestamp(timestamp, 'Asia/Seoul'), 1)), -1) AS ywd,
        date_format(from_utc_timestamp(timestamp, 'Asia/Seoul'), 'yyyy-MM') AS ym,
        date_format(from_utc_timestamp(timestamp, 'Asia/Seoul'), 'yyyy-MM-dd') AS ymd,
        type, userId
        FROM df
    """)
    
    return df_segment_log

In [None]:
def segment_log_to_segment_log_session(df_segment_log):
    df_segment_log.createOrReplaceTempView("df_segment_log")
    df_segment_log_session = spark.sql("""
        SELECT
            event.anonymousid || '-' || row_number() over(partition by event.anonymousid order by event.timestamp) as session_id,
            event.anonymousid,
            event.timestamp as session_start_at,
            lead(timestamp) over(partition by event.anonymousid order by event.timestamp) as next_session_start_at
        FROM (
            SELECT
                e.anonymousid,
                e.timestamp,
                DATEDIFF(minute, LAG(e.timestamp) OVER(PARTITION BY e.anonymousid ORDER BY e.timestamp), e.timestamp) AS inactivity_time
            FROM df_segment_log AS e) as event
        WHERE (event.inactivity_time > 30 OR event.inactivity_time is null)
    """)
    
    return df_segment_log_session

In [None]:
def get_s3_directories(prefix):
    global s3, bucket_name

    response = s3.list_objects(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
    pre_list = []
    for content in response.get('CommonPrefixes', []):
        pre_list.append(content.get('Prefix')[:-1])
    
    return pre_list


In [None]:
s3 = boto3.client('s3', region_name='ap-northeast-2')
bucket_name = 'emr-data-sync'
prefix = "segment-logs"

directories = get_s3_directories("s3a://" + bucket_name + prefix)

for directory in directories:
    folders = get_s3_directories("s3a://" + bucket_name + directory)
    for folder in folders:
        print(folder)
        partition_0 = folder.split("/")[-1]
        df = spark.read.text("s3a://" + bucket_name + folder + "/*.gz").withColumn("partition_0", lit(partition_0))        
        df_segment_log = src_to_segment_log(df)
        df_segment_log_session = segment_log_to_segment_log_session(df_segment_log)
        
        df_segment_log.show()
        df_segment_log_session.show()