In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, DoubleType, TimestampType


spark = SparkSession \
    .builder \
    .appName("Twitter-Streaming") \
    .getOrCreate()

In [2]:
schema_twitter = StructType(
        [StructField("user_id",LongType(),True),
        StructField("user_name",StringType(),True),
        StructField("verified",BooleanType(),True),
        StructField("followers_count",IntegerType(),True),
        StructField("friends_count",IntegerType(),True),
        StructField("user_created_at",TimestampType(),True),
        StructField("tweet_id",LongType(),True),
        StructField("tweet_message",StringType(),True),
        StructField("tweet_created_at",TimestampType(),True),
        ])


parquetFile = spark \
            .readStream \
            .format("parquet") \
            .schema(schema_twitter) \
            .load("twitter.parquet")

In [3]:
twitter_msg = parquetFile.na.drop(subset=["tweet_message"])

In [4]:

# Output to memory to count rows
queryStreamMem = parquetFile \
                 .writeStream \
                 .format("memory") \
                 .queryName("tweets_count") \
                 .outputMode("update") \
                 .start()

In [17]:
spark.sql("select count(1) as qty from tweets_count").show()

+----+
| qty|
+----+
|6886|
+----+



In [None]:
from time import sleep
from IPython.display import clear_output

# Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "tweets_count" in lst_queries:
            # Count number of events
            spark.sql("select count(1) as qty from tweets_count").show()
        else:
            print("'tweets_count' query not found.")

        sleep(5)
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")