In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, year, month, dayofmonth
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1727940831410_0006,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark = SparkSession.builder \
        .appName("S3-Event-Processing") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") \
        .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#define the s3 buckets
RAW_BUCKET_NAME = 's3a://pismo-development-raw-events/'
PROCESSED_BUCKET_NAME = 's3a://pismo-development-processed-events/proceesed_events/'
TRANSFORMED_BUCKET_NAME = 's3a://pismo-development-transformed-events/transformed_events/event_log/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Define the schema for the raw JSON data
schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("domain", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("data", StructType([
        StructField("id", IntegerType(), True),
        StructField("old_status", StringType(), True),
        StructField("new_status", StringType(), True),
        StructField("reason", StringType(), True)
    ]))
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
#read JSON data from S3 using PySpark
df = spark.read.schema(schema).json(RAW_BUCKET_NAME)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#print sample data
df.head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(event_id='6b52377f-dbe6-4887-bd86-1f07d2578fd2', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 40, 916888), domain='account', event_type='status-change', data=Row(id=471582, old_status='SUSPENDED', new_status='INACTIVE', reason='Best resource attention economic understand treatment once son.')), Row(event_id='c2199d3d-5bc7-47a2-9b00-3fd97bc587b5', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 44, 438381), domain='account', event_type='creation', data=Row(id=667218, old_status='SUSPENDED', new_status='INACTIVE', reason='Woman anything range behavior development history though.')), Row(event_id='a52ebccb-3b91-4b63-9a9a-bcbaf25930c8', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 40, 618439), domain='transaction', event_type='creation', data=Row(id=333186, old_status='SUSPENDED', new_status='INACTIVE', reason='Add look article analysis time easy actually station.')), Row(event_id='c2199d3d-5bc7-47a2-9b00-3fd97bc587b5', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 41, 262530)

In [10]:
#transform data by selecting the latest event per domain, event_type, and data.id
window_spec = Window.partitionBy("domain", "event_type", "data.id").orderBy(col("timestamp").desc())
    
        # Add row number within each partition (we'll keep only the first row which is the latest event)
df_with_row_num = df.withColumn("row_num", row_number().over(window_spec))
        
        # Filter to keep only the latest event per group (row_num == 1)
df_latest = df_with_row_num.filter(col("row_num") == 1).drop("row_num")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
#show the unique dataset by timestamp
df_latest.head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(event_id='fad0474f-93ed-46d1-a657-974543515e29', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 41, 103663), domain='account', event_type='creation', data=Row(id=267212, old_status='SUSPENDED', new_status='INACTIVE', reason='Specific each air pay company happy.')), Row(event_id='120029bc-df17-4905-858e-3e0bd60c6d37', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 40, 537824), domain='account', event_type='creation', data=Row(id=346074, old_status='ACTIVE', new_status='ACTIVE', reason='Data down turn.')), Row(event_id='c7904bd3-74eb-401e-8252-04d5a4cdc403', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 40, 106511), domain='account', event_type='creation', data=Row(id=350276, old_status='SUSPENDED', new_status='ACTIVE', reason='Himself indicate physical.')), Row(event_id='75c2e00c-6363-4b84-b561-4613f0a931bf', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 41, 130811), domain='account', event_type='creation', data=Row(id=473811, old_status='ACTIVE', new_status='INACTIVE', reaso

In [13]:
#write the transformed data to S3 in Parquet format, partitioned by year, month, day, and event_type

df_with_date = df.withColumn("year", year(col("timestamp"))) \
                         .withColumn("month", month(col("timestamp"))) \
                         .withColumn("day", dayofmonth(col("timestamp")))

        # Write the DataFrame to S3 in Parquet format, partitioned by year, month, day, and event_type
df_with_date.write \
          .partitionBy("year", "month", "day", "event_type") \
          .mode("overwrite") \
          .parquet(PROCESSED_BUCKET_NAME)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
#Read processed file from s3 bucket in parquet
df_processed = spark.read.parquet(PROCESSED_BUCKET_NAME)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df_processed.head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(event_id='c2199d3d-5bc7-47a2-9b00-3fd97bc587b5', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 44, 438381), domain='account', data=Row(id=667218, old_status='SUSPENDED', new_status='INACTIVE', reason='Woman anything range behavior development history though.'), year=2024, month=10, day=3, event_type='creation'), Row(event_id='a52ebccb-3b91-4b63-9a9a-bcbaf25930c8', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 40, 618439), domain='transaction', data=Row(id=333186, old_status='SUSPENDED', new_status='INACTIVE', reason='Add look article analysis time easy actually station.'), year=2024, month=10, day=3, event_type='creation'), Row(event_id='c2199d3d-5bc7-47a2-9b00-3fd97bc587b5', timestamp=datetime.datetime(2024, 10, 3, 9, 31, 41, 262530), domain='account', data=Row(id=667218, old_status='SUSPENDED', new_status='INACTIVE', reason='Woman anything range behavior development history though.'), year=2024, month=10, day=3, event_type='creation'), Row(event_id='ef34feb2-0a25-473e-92fc-b

In [16]:
#show schema in parquet
df_processed.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- event_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- domain: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- old_status: string (nullable = true)
 |    |-- new_status: string (nullable = true)
 |    |-- reason: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- event_type: string (nullable = true)