# Aggregating IPinYou Data

In this notebook we'll be creating a Spark Structured Streaming aggregation. We'll read Avro objects from our <b>ipinyou</b> Kafka topic, and write the aggregated data to the <b>ipinyou-agg</b> Kafka topic as Avro once again.

Before we set up our streaming aggregation, let's backfill our ipinyou-agg topic with some pre-aggregated data in order to simulate historical events.

<hr>
#### Import packages

In [None]:
%ShowTypes on

In [None]:
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import io.confluent.kafka.serializers.{KafkaAvroDecoder, KafkaAvroSerializer}
import java.util.Properties
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord, GenericRecordBuilder, GenericData}
import org.apache.avro.file.DataFileReader

#### Create Kafka Producer

In [None]:
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer])
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer])
props.put("schema.registry.url", "http://schema_registry:8081")
val producer = new KafkaProducer[String, GenericRecord](props)

#### Read in pre-aggregated Avro data from file

In [None]:
val agg = new File("ipinyou_agg.avro")

In [None]:
val aggSchema = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"topLevelRecord\",\"fields\":[{\"name\":\"ad_exchange\",\"type\":\"int\"},{\"name\":\"ten_second_floor\",\"type\":[\"string\",\"null\"]},{\"name\":\"count\",\"type\":\"long\"}]}")

In [None]:
import java.text.SimpleDateFormat
import java.util.Date
import java.util.TimeZone

def convertEpochTimestampToDate(time: Long, format: SimpleDateFormat): String = {
    format.format(new Date(time))
}

def convertDateToEpochTimestamp(date: String, format: SimpleDateFormat): Long = {
    format.parse(date).getTime
}

#### Produce historical aggregate data to <b>ipinyou-agg</b> topic

In [None]:
val datumReader = new GenericDatumReader[GenericRecord](aggSchema)
val dataFileReader = new DataFileReader[GenericRecord](agg, datumReader)
val currentTime = System.currentTimeMillis
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
while (dataFileReader.hasNext) {
    val maxTimestamp = 1382745560000L // latest timestamp found in avro data file
    val record = dataFileReader.next
    val recordTimestamp = convertDateToEpochTimestamp(record.get("ten_second_floor").toString, sdf)
    val newRecordTimestamp = convertEpochTimestampToDate(currentTime - (maxTimestamp - recordTimestamp), sdf)
    record.put(1, newRecordTimestamp)
    val producerRecord = new ProducerRecord[String, GenericRecord]("ipinyou-agg", record)
    producer.send(producerRecord)
}

<hr>

Now that we've backfilled our ipinyou-agg topic with some pre-aggregated data, we can set up our streaming aggregation.

#### Import packages

In [None]:
import jep.spark.avro.{SparkAvroConverter, SchemaRegistryService}

In [None]:
val ss = spark
import ss.implicits._

In [None]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.streaming.Trigger
import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.{Schema, SchemaBuilder}
import com.databricks.spark.avro._
import org.apache.spark.sql.functions.{max, rank, window}

#### Initialize Schema Registry Client

In [None]:
val schemaRegistry = new SchemaRegistryService("http://schema_registry:8081", 5)

#### Generate StructType of desired fields from IPinYou avro schema

In [None]:
@transient val schema = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Ipinyou\",\"namespace\":\"com.conversantmedia.cake.avro\",\"doc\":\"Action\",\"fields\":[{\"name\":\"bid_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"timestamp\",\"type\":[\"null\",\"long\"]},{\"name\":\"log_type\",\"type\":[\"null\",\"int\"]},{\"name\":\"ipinyou_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_agent\",\"type\":[\"null\",\"string\"]},{\"name\":\"ip_address\",\"type\":[\"null\",\"string\"]},{\"name\":\"region\",\"type\":[\"null\",\"int\"]},{\"name\":\"city\",\"type\":[\"null\",\"int\"]},{\"name\":\"ad_exchange\",\"type\":[\"null\",\"int\"]},{\"name\":\"domain\",\"type\":[\"null\",\"string\"]},{\"name\":\"url\",\"type\":[\"null\",\"string\"]},{\"name\":\"anonymous_url_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"ad_slot_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"ad_slot_width\",\"type\":[\"null\",\"int\"]},{\"name\":\"ad_slot_height\",\"type\":[\"null\",\"int\"]},{\"name\":\"ad_slot_visibility\",\"type\":[\"null\",\"string\"]},{\"name\":\"ad_slot_format\",\"type\":[\"null\",\"string\"]},{\"name\":\"ad_slot_floor_price\",\"type\":[\"null\",\"int\"]},{\"name\":\"creative_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"bidding_price\",\"type\":[\"null\",\"int\"]},{\"name\":\"paying_price\",\"type\":[\"null\",\"int\"]},{\"name\":\"landing_page_url\",\"type\":[\"null\",\"string\"]},{\"name\":\"advertiser_id\",\"type\":[\"null\",\"int\"]},{\"name\":\"user_tags\",\"type\":[\"null\",\"string\"]}],\"schemaId\":\"2\"}")

In [None]:
import scala.collection.JavaConversions._
schema.getFields.map(_.name).foreach(println)

In [None]:
val fullStruct = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

In [None]:
def buildSubStruct(fields: Array[String], accStruct: StructType): StructType = {
    if (fields.isEmpty) accStruct
    else buildSubStruct(fields.tail, accStruct.add(fullStruct(fields.head)))
}

In [None]:
val desiredFields = Array("ad_exchange", "log_type", "timestamp")

In [None]:
val subStruct = buildSubStruct(desiredFields, new StructType)

In [None]:
subStruct.printTreeString

#### Initialize Streaming Dataset

In [None]:
val ipinyou_stream = spark.readStream.
    format("kafka").
    option("kafka.bootstrap.servers", "broker:9092").
    option("subscribe", "ipinyou").
    option("startingOffsets", "latest").
    load()

In [None]:
ipinyou_stream.printSchema

#### Deserialize GenericRecords and convert to Spark SQL Rows

In [None]:
val converted = ipinyou_stream.select("value").
    as[Array[Byte]].
    map(record => SparkAvroConverter.avroToRow(record, schemaRegistry, "ipinyou", subStruct))(RowEncoder(subStruct))

In [None]:
converted.printSchema

#### Setup Aggregation

Equivalent Pseudo SQL:

```SQL
SELECT
    CASE
        WHEN ad_exchange IS NULL THEN -1
        ELSE ad_exchange
    END AS ad_exchange,
    (floor(extract(second FROM to_timestamp(timestamp)) / 10) * 10)::STRING AS ten_second_floor,
    count(1)
FROM
    ipinyou
WHERE
    log_type = 1
GROUP BY
    CASE
        WHEN ad_exchange IS NULL THEN -1
        ELSE ad_exchange
    END AS ad_exchange,
    (floor(extract(second FROM to_timestamp(timestamp)) / 10) * 10)::STRING
```    

In [None]:
val agg = converted.where("log_type = 1").// impressions have log_type of 1
    selectExpr("ad_exchange", "CAST(from_unixtime(timestamp/1000) as timestamp) as timestamp").
    na.
    fill(-1, Seq("ad_exchange")). // fill null ad_exchange values with -1
    withWatermark("timestamp", "5 seconds").
    groupBy($"ad_exchange", window($"timestamp", "10 seconds")).
    count.
    selectExpr("ad_exchange", "cast(window.start as string) as ten_second_floor", "count")

In [None]:
agg.printSchema

#### Convert streaming aggregate Rows back to serialized GenericRecords and write to new Kafka topic

In [None]:
val bytes = agg.map(row => SparkAvroConverter.rowToAvro(row, schemaRegistry, "ipinyou-agg", "Ipinyou.agg", "com.conversantmedia.cake.avro"))

In [None]:
val query = bytes.writeStream.
    format("kafka").
    option("kafka.bootstrap.servers", "broker:9092").
    option("topic", "ipinyou-agg").
    trigger(Trigger.ProcessingTime("10 seconds")).
    option("checkpointLocation", "ipinyou_checkpoint").
    outputMode("append").
    start()

In [None]:
query.status

<hr>
#### [Real-time visualization with Kibana](http://localhost:5601)