In [None]:
import uuid


In [82]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import types as st
from pyspark.sql import functions as sf

EVENT_SCHEMA = st.StructType([
    st.StructField("event_id", st.StringType()),
    st.StructField("timestamp", st.TimestampType()),
    st.StructField("domain", st.StringType()),
    st.StructField("event_type", st.StringType()),
    st.StructField("data", st.StringType())
])

spark_session = SparkSession.builder.appName("Pismo Challenge").getOrCreate()

def read_events_data(input_path):
    events_data = spark_session.read.json(
        input_path,
        schema=EVENT_SCHEMA
    )
    return events_data

def dedup_events(events_df):
    last_timestamp = Window.partitionBy("event_id") \
    .orderBy(sf.col("timestamp").desc())
    
    events_df = events_df.distinct() \
           .withColumn('distinct', sf.row_number().over(last_timestamp)) \
           .filter('distinct == 1') \
           .drop('distinct')
    
    return events_df

def trigger_events_processing(events_df, output_path, checkpoint_location):
    events_df = events_df.withColumn(
        'year', sf.year(sf.col('timestamp'))
    ).withColumn(
        'month', sf.month(sf.col('timestamp'))
    ).withColumn(
        'day', sf.dayofmonth(sf.col('timestamp'))
    )
   
    return events_df.write.partitionBy('domain', 'event_type', 'year', 'month', 'day').parquet(
        path=output_path,
    )


def process_events(input_path, output_path, checkpoint_location):
    events_data = read_events_data(input_path)
    events_data = dedup_events(events_data)
    trigger_events_processing(events_data, output_path, checkpoint_location).awaitTermination()

    
process_events(
    input_path='hdfs://localhost/input-events/', 
    output_path='hdfs://localhost/output-events/', 
    checkpoint_location='hdfs://localhost/checkpoint-events/'
)

AnalysisException: path hdfs://localhost/output-events already exists.

In [83]:
read_events_data('hdfs://localhost/input-events/')

DataFrame[event_id: string, timestamp: timestamp, domain: string, event_type: string, data: string]

In [81]:
event_id = 'c498e23b-4eb5-4402-ab93-dc8480bc4f03'
spark_session.read.parquet('hdfs://localhost/output-events').filter(f'event_id = "{event_id}"').show(100, False)

+------------------------------------+-------------------+-----------+-------+-------------+----+-----+---+
|event_id                            |timestamp          |data       |domain |event_type   |year|month|day|
+------------------------------------+-------------------+-----------+-------+-------------+----+-----+---+
|c498e23b-4eb5-4402-ab93-dc8480bc4f03|2011-04-08 05:45:55|{"id":8754}|account|status-detail|2011|4    |8  |
+------------------------------------+-------------------+-----------+-------+-------------+----+-----+---+

