In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
spark

In [3]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load() 

In [4]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 65 76 65 6...|events|        0|     0|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     1|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     2|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     3|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     4|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     5|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     6|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     7|2021-12-07 03:06:...|            0|
|null|[7B 22 65 76 65 6...|events|        0

In [5]:
raw_events.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [136]:
raw_events.count()

780

In [13]:
all_events = raw_events.select(raw_events.value.cast('string'))
all_events.show()

+--------------------+
|               value|
+--------------------+
|{"event_type": "a...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
|{"event_type": "c...|
+--------------------+
only showing top 20 rows



In [15]:
json.loads(all_events.collect()[6].value)

{'event_type': 'check',
 'neighboring_bombs': 35,
 'outcome': 'hit_mine',
 'session_id': 'ac796b7b-cf99-4aad-bb93-2c87a46e946a',
 'x_coord': 34,
 'y_coord': 98}

In [16]:
events_list= ['a_startup_event','check','flag','solution']

In [7]:
name = 'check'

@udf('boolean')
def test(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == name:
        return True
    return False

In [27]:
check_events = raw_events \
    .select(raw_events.value.cast('string').alias('stats'),\
            raw_events.timestamp.cast('string'))\
    .filter(test('stats'))

In [29]:
extracted_check_events = check_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.stats))) \
    .toDF()

In [30]:
extracted_check_events.show()

+----------+-----------------+--------+--------------------+--------------------+-------+-------+
|event_type|neighboring_bombs| outcome|          session_id|           timestamp|x_coord|y_coord|
+----------+-----------------+--------+--------------------+--------------------+-------+-------+
|     check|               38|hit_mine|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     29|     95|
|     check|               31|    safe|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     52|     37|
|     check|               45|hit_mine|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     35|     43|
|     check|               25|    safe|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     58|     57|
|     check|               12|    safe|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     76|     87|
|     check|               35|hit_mine|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     34|     98|
|     check|               14|    safe|ac796b7b-cf99-4aa...|2021-12-07 03:06:...|     56|     71|
|     check|        

In [31]:
extracted_check_events \
    .write \
    .mode('overwrite') \
    .parquet('/tmp/check_cell')

In [32]:
check = spark.read.parquet('/tmp/check_cell')

In [33]:
check.registerTempTable('check_event_table')

In [34]:
spark.sql("select * from check_event_table").toPandas()

Unnamed: 0,event_type,neighboring_bombs,outcome,session_id,timestamp,x_coord,y_coord
0,check,38,hit_mine,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.339,29,95
1,check,31,safe,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.351,52,37
2,check,45,hit_mine,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.362,35,43
3,check,25,safe,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.373,58,57
4,check,12,safe,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.385,76,87
5,check,35,hit_mine,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.395,34,98
6,check,14,safe,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.407,56,71
7,check,19,safe,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.417,73,36
8,check,18,safe,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.428,78,17
9,check,3,hit_mine,ac796b7b-cf99-4aad-bb93-2c87a46e946a,2021-12-07 03:06:54.439,89,22
