In [2]:
# Cria a seção a ser utilizada para estabelecer a conexão
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
from pyspark.sql.functions import desc, asc

spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

In [3]:
# Definindo um esquema para os dados
from pyspark.sql.types import StructType

userSchema = StructType().add("timestamp", "timestamp").add("word", "string")

In [5]:
# Ciria um dataframe que será responsável ler cada um das linhas recebidas dos arquivos adicionados no diretório
files_diir = spark.readStream\
        .format("csv")\
        .schema(userSchema)\
        .option('includeTimestamp', 'true')\
        .option("header", "true")\
        .option("sep", ";")\
        .option("maxFilesPerTrigger", 1)\
        .load("*.csv")

In [6]:
# Imprimir o schema
files_diir.printSchema

<bound method DataFrame.printSchema of DataFrame[timestamp: timestamp, word: string]>

In [7]:
# Divide as linhas recebidas em cada palavras
words = files_diir.select(explode(
        split(files_diir.word, " ")).alias("word"), files_diir.timestamp)

In [8]:
words.isStreaming

True

In [9]:
# Agrupa os dados através da janela de tempo e computa sobre cada um dos grupos
windowedCounts = words.groupBy(
        window(words.timestamp, "10 Minutes", "5 Minutes"),
        words.word
).count().sort(asc("word"))

In [None]:
# Define a consulta query e como deve ser realizada  saída (sink) para o stream criado
query = windowedCounts \
        .writeStream \
        .outputMode("complete")\
        .format("console")\
        .option('truncate', 'false')\
        .start()
query.awaitTermination()
# Aguarda até que a "streaming query" termine