In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [11]:
events = spark.read.json("event_1.json")

In [12]:
events.show()

+--------------------+--------+--------------------+--------------------+------------+----------------+-------------+--------------------+----------------+----------+--------+-------------+----------+--------------------+----------------+----------+--------------+
|         description|duration|           event_url|               group|          id|maybe_rsvp_count|        mtime|                name|payment_required|rsvp_limit|  status|         time|utc_offset|               venue|venue_visibility|visibility|yes_rsvp_count|
+--------------------+--------+--------------------+--------------------+------------+----------------+-------------+--------------------+----------------+----------+--------+-------------+----------+--------------------+----------------+----------+--------------+
|<p>May you have a...|    null|http://www.meetup...|{{2, career/busin...|lnsdslyvpbfc|               0|1478608237521|HAPPY THANKSGIVIN...|               0|        20|upcoming|1479922200000| -21600000|     

In [13]:
events.columns

['description',
 'duration',
 'event_url',
 'group',
 'id',
 'maybe_rsvp_count',
 'mtime',
 'name',
 'payment_required',
 'rsvp_limit',
 'status',
 'time',
 'utc_offset',
 'venue',
 'venue_visibility',
 'visibility',
 'yes_rsvp_count']

In [14]:
events.printSchema()

root
 |-- description: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- event_url: string (nullable = true)
 |-- group: struct (nullable = true)
 |    |-- category: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- shortname: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- group_lat: double (nullable = true)
 |    |-- group_lon: double (nullable = true)
 |    |-- group_photo: struct (nullable = true)
 |    |    |-- highres_link: string (nullable = true)
 |    |    |-- photo_id: long (nullable = true)
 |    |    |-- photo_link: string (nullable = true)
 |    |    |-- thumb_link: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- join_mode: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- urlname: string (nullable = true)
 |-- id: st

In [16]:
event_schema = events.schema

In [26]:
stream = (
    spark.readStream.schema(event_schema)
        .option("maxFilesPerTrigger", 1)
        .json('events/')
)

In [28]:
# Get and Transform the streaming data
events_group = stream.groupBy("payment_required").count().orderBy(F.desc("count"))

In [32]:
query = (
    events_group.writeStream.queryName("events_group")
    .format("memory")
    .outputMode("complete")
    .start()
)
# uncomment this line to continuously accept stream of data
# query.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM events_group"
    )
    if _df.count() > 0:
        _df.show()
    time.sleep(0.5)

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+-----+

+----------------+-----+
|payment_required|count|
+----------------+-----+
|               0|    6|
+----------------+

In [33]:
query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [31]:
query.stop()