# Activity 3.2, part 2

In [2]:
import sys
!conda install --yes --prefix {sys.prefix} -c conda-forge pyspark

Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, window, to_timestamp, explode, split, col
from pyspark.sql.types import StructType, StructField, StringType

In [4]:
spark = SparkSession.builder.appName('Packt').getOrCreate()

In [5]:
raw_stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 1234).load()

In [6]:
tweet_datetime_format = 'EEE MMM dd HH:mm:ss ZZZZ yyyy'
schema = StructType([StructField('created_at', StringType(), True),
                     StructField('text', StringType(), True)])

In [7]:
tweet_stream = raw_stream.select(from_json('value', schema).alias('tweet'))

In [8]:
timed_stream = tweet_stream.select(
    to_timestamp('tweet.created_at', tweet_datetime_format).alias('timestamp'),
    explode(
        split('tweet.text', ' ')
    ).alias('word'))

In [9]:
windowed = timed_stream \
    .withWatermark('timestamp', '1 minute') \
    .groupBy(window('timestamp', '10 minutes'), 'word')

In [10]:
counts_per_window = windowed.count().orderBy(['window', 'count'], ascending=[0, 1])

In [None]:
query = counts_per_window.writeStream.outputMode('complete').format('console').option("truncate", False).start()
query.awaitTermination()