# Real-time Impression Tracking

In this sample notebook we are processing impressions from website ads and processing events in near real-time. Impressions are sent as pixel events into an Amazon Kinesis Stream. Then, events are processed and enriched within Amazon EMR using a Spark Structure Streaming application. Finally, transformed events are written directly into a realtime_clicks table in a data lake in Amazon S3.  Spark Structure Streaming works with streaming data as if you were working with regular dataframes in batch operations. It is Spark's role to figure out how to continusly read and process data making streaming development easier. For more details, see the [Sparks Structure Streaming Documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). 

We are using an [Apache Iceberg](https://iceberg.apache.org/) open table format to store our streamed impresions. Apache Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data. To catalog tables we are using the AWS Glue Catalog to enable queries from multiple AWS native query engines like Amazon Athena or Amazon Redshift. 


---

### PRE-REQUISITES

- We assume you have an Amazon EMR cluster attached to this notebook. Testing was performed using version emr-6.15.0.
- To simulate pixel events from a public website into Amazon Kinesis we are using the [Kinesis Data Generator tool](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) to simulate 100s of pixel events per second. Launch the Kinesis Data Generator CloudFormation template in the same region, open the 'kinesis-data-generator' CloudFormation stack Outputs tab, and click on the KinesisDataGeneratorUrl link. Login to the generator, select the region, and select the 'emr-kinesis-stream-KinesisDataStream-xxxxxxxxxxxx' as the Stream/delivery stream. To run the code in this sample notebook, paste the following payload into the Record template tab in the Kinesis Data Generator and click 'Send Data' to simulate website ad impressions:


```
{        
    "event_time": "{{date.now}}", 
    "action_source": "WEBSITE", 
    "action_source_url": "www.example.com",        
    "user_data": {        
        "ipsrc": "{{random.arrayElement(
        ["a5b50a8b-3a77-4f83-aff4-68aa167f7c67", "b5b50a8b-3a77-4f83-aff4-68aa167f7c63","c5b50a8b-3a77-4f83-aff4-68aa167f7c62","d5b50a8b-3a77-4f83-aff4-68aa167f7c65"]
    )}}",        
        "vmcid": "{{random.arrayElement(
        ["a5b50a8b-3a77-4f83-aff4-68aa167f7c67", "b5b50a8b-3a77-4f83-aff4-68aa167f7c63","c5b50a8b-3a77-4f83-aff4-68aa167f7c62","d5b50a8b-3a77-4f83-aff4-68aa167f7c65"]
    )}}", 
        "email": "{{random.arrayElement(
        ["a5b50a8b-3a77-4f83-aff4-68aa167f7c67", "b5b50a8b-3a77-4f83-aff4-68aa167f7c63","c5b50a8b-3a77-4f83-aff4-68aa167f7c62","d5b50a8b-3a77-4f83-aff4-68aa167f7c65"]
    )}}",
        "region": "{{random.arrayElement(["AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY"])}}"
    },
    "custom_data": {        
        "gv": "12.99",        
        "ec": "test_category",        
        "el": "test_label",        
        "ea": "test_action",
        "cid": 123,      
        "product_id": "{{random.arrayElement(["product_id1", "product_id2"])}}",        
        "user_defined": {         
            "addToCart" : "true",             
            "signUp" : "true",             
            "purchaseAmount" : "1500"        
        }
}}
```

---

## 1. Configuration

Configure your Spark session using the %%configure -f magic command. We will be using the Glue Catalog for Iceberg Tables. We also need to specify the jar for the [Kinesis Connector for Spark Structure Streaming](https://github.com/awslabs/spark-sql-kinesis-connector) libraries. Before you run the following step, make sure you update the S3Bucket name in the "spark.sql.catalog.glue_catalog.warehouse" parameter. If you choose a different bucket than the one deployed by the template, make sure your EMR cluster has read and write permissions. Search IAM roles for 'analytics-with-emr-iam-EMREC2Role'.

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars": "s3://awslabs-code-us-east-1/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar",
        "spark.sql.catalog.glue_catalog":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.glue_catalog.warehouse":"s3://analytics-with-emr-<your AWS Account ID>/data/curated/glue_catalog/tables/",
        "spark.sql.catalog.glue_catalog.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.glue_catalog.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
}

In [None]:
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import *

Before you run the next cell open the SparkUI on the application link above to monitor your job. Also update your S3Bucket name into the checkpoint_path variable below. Finally, update the kinesis_stream_name variable to match the name of your Kinesis Data Stream.

In [None]:
endpoint_url = "https://kinesis.us-west-2.amazonaws.com"
kinesis_stream_name = "<your-kinesis-stream-name>"
checkpoint_path = "s3://analytics-with-emr-<your AWS Account ID>/data/checkpoints"

## 2. Read from the stream

We use the [Spark Structure Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) engine to read data from the kinesis stream using the [Kinesis Spark SQL Connector](https://github.com/awslabs/spark-sql-kinesis-connector). The connector will create a DynamoDB table for checkpointing the stream reads.

In [None]:
kinesis_stream = spark.readStream \
    .format("aws-kinesis") \
    .option("kinesis.region", "us-west-2") \
    .option("kinesis.streamName", kinesis_stream_name) \
    .option("kinesis.consumerType", "GetRecords") \
    .option("kinesis.endpointUrl", endpoint_url) \
    .option("kinesis.startingposition", "LATEST") \
    .option("kinesis.metadataCommitterType", "DYNAMODB") \
    .option("kinesis.dynamodb.tableName", "kinesis_with_emr_checkpoints") \
    .load()
kinesis_stream.printSchema()

## 3. Debug code

For developing and troubleshooting queries using interactive notebooks we can use the memory output format and validate output using Spark SQL. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the Spark application driver’s memory.

In [None]:
query = kinesis_stream.selectExpr("cast(data as String) as data") \
    .writeStream \
    .format("memory") \
    .outputMode("append") \
    .trigger(processingTime='1 minute')\
    .queryName("kinesis_output") \
    .start()

Expect a delay on receiving data from your Amazon Kinesis Data Stream based on the time interval configurations. Wait and re-execute query if the next code returns an empty dataframe.

In [None]:
df = sqlContext.sql("SELECT * FROM kinesis_output limit 3")
df.show()

You can also run sql driectly using the "%%sql" command

In [None]:
%%sql
select count(*) from kinesis_output;

Lets define the expected schema from your pixel payload based on the results. Spark Structure Streaming expects schema to be predefined to write the output data.

In [None]:
schema = StructType([
    StructField("event_time", TimestampType()),
    StructField("action_source", StringType()),
    StructField("action_source_url", StringType()),
    StructField(
        "user_data",
        StructType([
            StructField('ipsrc', StringType()),
            StructField('vmcid', StringType()),
            StructField('email', StringType()),
            StructField('region', StringType())
        ])),
    StructField("custom_data", StringType())
])

In [None]:
df.select(from_json("data", schema).alias("data")) \
.select(to_timestamp("data.event_time").alias("event_time"),
        "data.action_source",
        "data.action_source_url",
        "data.user_data.ipsrc",
        "data.user_data.vmcid",
        "data.user_data.email",
        "data.user_data.region",
        "data.custom_data") \
.show()

Once we validated our query, we can stop this application and develop the code to populate our datalake.

In [None]:
query.stop()

## 4. Populate Data Lake

Once we read data from the Amazon Kinesis stream, we transform input data using Spark SQL functions and we continously write output data into an Apache Iceberg table. For more details on Iceberg as the output format of Spark Structure Streaming see [this documentation](https://iceberg.apache.org/docs/1.4.0/spark-structured-streaming/).

i. Create the iceberg tables in the glue catalog.

In [None]:
%%sql
use glue_catalog

In [None]:
%%sql
CREATE SCHEMA IF NOT EXISTS measurementdb;

In [None]:
%%sql
CREATE TABLE IF NOT EXISTS glue_catalog.measurementdb.realtime_clicks
    (
        event_time        timestamp,
        action_source     string,
        action_source_url string,
        user_ipsrc        string,
        user_vmcid        string,
        user_email        string,
        region            string,
        custom_data       string
    )
USING iceberg
PARTITIONED BY (date_hour(event_time))

ii. Use Spark Structure Streaming to continously write data into the Iceberg table.

In [None]:
schema = StructType([
    StructField("event_time", TimestampType()),
    StructField("action_source", StringType()),
    StructField("action_source_url", StringType()),
    StructField("user_data", StructType([
            StructField('ipsrc', StringType()),
            StructField('vmcid', StringType()),
            StructField('email', StringType()),
            StructField('region', StringType())
        ])),
    StructField("custom_data", StringType())
])

In [None]:
kinesis_stream.select(from_json(col("data").cast("string"), schema).alias("data")) \
    .select(to_timestamp("data.event_time").alias("event_time")
            ,col("data.action_source").alias("action_source")
            ,col("data.action_source_url").alias("action_source_url")
            ,col("data.user_data.ipsrc").alias("user_ipsrc")
            ,col("data.user_data.vmcid").alias("user_vmcid")
            ,col("data.user_data.email").alias("user_email")
            ,col("data.user_data.region").alias("region")
            ,col("data.custom_data").alias("custom_data")) \
    .writeStream \
    .queryName("realtime_clicks") \
    .format("iceberg") \
    .outputMode("append") \
    .option("fanout-enabled", "true") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(processingTime='1 minute') \
    .toTable("measurementdb.realtime_clicks") \
    .awaitTermination()

The execution will remain in progress until the application is terminated. Login to the [Amazon Athena Console](https://us-west-2.console.aws.amazon.com/athena) to query the "measurmentdb.realtime_clicks" table. Query example: SELECT * FROM "AwsDataCatalog"."measurementdb"."realtime_clicks"limit 10;

As an alternative you can also use a different notebook to run this queries in your table:

In [None]:
%%sql
SELECT * FROM measurementdb.realtime_clicks limit 10;

Run the following query couple times across 1 minute intervals to see how record counts increase

In [None]:
%%sql
SELECT count(*) FROM measurementdb.realtime_clicks;

## 5. Maintenance

Expect data lakes to produce a high number of small files (Kbs) when they are populated from streaming data sources. In addition, Iceberg tables create a new snapshot, or version, of a table for each data write. Iceberg snapshots allow tables to be rolled back to any prior valid snapshots in the event of an error. To optimize query performance and storage cost, Apache Iceberg and the AWS Glue Catalog provide abstractions and features to delete old snapshots and merge files into larger objects. Consider implementing workloads for periodic maintenance of your tables. For more information on this topic see [Apache Iceberg Maintenance Documentation](https://iceberg.apache.org/docs/1.2.0/maintenance/), [AWS Glue Automatic Compaction of Apache Iceberg Tables](https://aws.amazon.com/blogs/aws/aws-glue-data-catalog-now-supports-automatic-compaction-of-apache-iceberg-tables/) and the [Optimizing Iceberg Tables in Athena Documentation](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html).

i. See the latest snapshots.

In [None]:
%%sql
SELECT * FROM measurementdb.realtime_clicks.snapshots limit 10;

ii. Expire older snapshots. Update the date to a value greater than the latest snapshot. After running the command, execute the previous statement to see how snapshots are reduced to the last 2.

In [None]:
spark.sql(
    "CALL glue_catalog.system.expire_snapshots(table => 'glue_catalog.measurementdb.realtime_clicks', older_than => DATE '2024-02-13', retain_last => 2)"
)

ii. Automatic data compaction can be enabled through the AWS console or programatically with AWS Glue APIs or via Athena. For more details on the process, see the post for [AWS Glue Automatic Compaction of Apache Iceberg Tables](https://aws.amazon.com/blogs/aws/aws-glue-data-catalog-now-supports-automatic-compaction-of-apache-iceberg-tables/) or [Optimizing Iceberg Tables in Athena Documentation](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html). 

## 6. Clean

In [None]:
%%sql
DROP TABLE IF EXISTS measurementdb.realtime_clicks;