In [4]:
import json
import pyspark
from pyspark.sql import Row
from pyspark.sql.functions import udf



In [5]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe" # silly change to show it works
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [15]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

sword_purchases = extracted_events \
   .filter(extracted_events.event_type == 'purchase_sword')
    
sword_purchases.show()

sword_purchases \
    .write \
    .mode("overwrite") \
    .parquet("/tmp/sword_purchases")

default_hits = extracted_events \
    .filter(extracted_events.event_type == 'default')
    
default_hits.show()

default_hits \
        .write \
        .mode("overwrite") \
        .parquet("/tmp/default_hits")
        


+------+-------------+----+---------------+--------------+--------------------+
|Accept|Cache-Control|Host|     User-Agent|    event_type|           timestamp|
+------+-------------+----+---------------+--------------+--------------------+
|   */*|     no-cache| moe|    curl/7.47.0|purchase_sword|2019-12-07 03:00:...|
|   */*|     no-cache| moe|    curl/7.47.0|purchase_sword|2019-12-07 03:00:...|
|   */*|     no-cache| moe|    curl/7.47.0|purchase_sword|2019-12-07 03:00:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|     no-cache| moe|ApacheBench/2

In [14]:
#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf


@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False


spark = SparkSession \
    .builder \
    .appName("ExtractEventsJob") \
    .getOrCreate()

raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

purchase_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .filter(is_purchase('raw'))

extracted_purchase_events = purchase_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()
extracted_purchase_events.printSchema()
extracted_purchase_events.show()

extracted_purchase_events \
    .write \
    .mode('overwrite') \
    .parquet('/tmp/purchases')

    


root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-----------------+---------------+--------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|           timestamp|
+------+-----------------+---------------+--------------+--------------------+
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|2019-12-07 03:00:...|
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|2019-12-07 03:00:...|
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|2019-12-07 03:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2019-12-07 03:14:...|
|   *

In [19]:


import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType


def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])


@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

raw_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .load()

sword_purchases = raw_events \
    .filter(is_sword_purchase(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_sword_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

sink = sword_purchases \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
    .option("path", "/tmp/sword_purchases") \
    .trigger(processingTime="10 seconds") \
    .start()




In [17]:
sql_string = "drop table if exists default.hdfs_sword_purchase_events"
spark.sql(sql_string)
sql_string = """
create external table if not exists default.hdfs_sword_purchase_events (
    raw_event string,
    timestamp string,
    Accept string,
    Host string,
    `User-Agent` string,
    event_type string
) 
stored as parquet 
location '/tmp/sword_purchases'  
tblproperties ("parquet.compress"="SNAPPY")
"""
spark.sql(sql_string)

DataFrame[]

In [18]:
sink.stop()