<a href="https://colab.research.google.com/github/Erike-Simon/CESAR-AED/blob/main/ProcDados_structured_streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install --upgrade pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=ca1ac9902e8309452586c2118aa67e1dc6afb2b3bf44488b4de9ea5d271e88df
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql import Window

In [None]:
# Criando um cluster local com 2 workers, 1 cores por worker e 3GB de RAM por worker

spark = SparkSession.builder\
    .master("local-cluster[2, 1, 3072]")\
    .getOrCreate()
spark

In [None]:
ROOT_DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/proc-dados-larga-escala/data"

In [None]:
# Parar spark session
# spark.stop()

## Exemplo de Structured Streaming usando Complete Mode

### Estrutura básica do Streaming

In [None]:
lines = spark.readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 10000)\
    .load()
print(type(lines))

lineCounts = lines.groupBy('value')\
    .count()
print(type(lineCounts))

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    df.show()

query = lineCounts.writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()
print(type(query))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.streaming.query.StreamingQuery'>


In [None]:
query.stop()

### Exemplo word count com Streaming

In [None]:
lines = spark.readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 10000)\
    .load()

words = lines.select(
    F.explode(
        F.split(lines['value'], ' ')
    ).alias('word')
)

wordCounts = words.groupBy('word').count()

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    df.show()

query = wordCounts.writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()

In [None]:
query.stop()

## Exemplo de Structured Streaming usando a coluna Timestamp

In [None]:
lines = spark.readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 10000)\
    .option('includeTimestamp', 'true')\
    .load()

words = lines.select(
    'timestamp',
    F.explode(
        F.split(lines['value'], ' ')
    ).alias('word')
)

wordCounts = words.groupBy('timestamp', 'word').count()

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    df.show()

query = wordCounts.writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()

In [None]:
query.stop()

## Exemplo de Structured Streaming usando Window

In [None]:
lines = spark.readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 10000)\
    .option('includeTimestamp', 'true')\
    .load()

words = lines.select(
    'timestamp',
    F.explode(
        F.split(lines['value'], ' ')
    ).alias('word')
)

wordCounts = words\
    .groupBy(
        F.window('timestamp', '10 seconds', '10 seconds'),
        'word'
    ).count()\
    .orderBy(F.asc('window.start'), F.asc('count'))

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    print(df.toPandas())

query = wordCounts.writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()

In [None]:
query.stop()

## Exemplo de Structured Streaming usando Update Mode

In [None]:
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 10000)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words
words = lines.select(
    'timestamp',
    F.explode(
        F.split(lines['value'], ' ')
    ).alias('word')
)

wordCounts = words\
    .groupBy(
        F.window('timestamp', '10 seconds', '10 seconds'),
        'word'
    ).count()

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    print(df.toPandas())

query = wordCounts\
    .writeStream\
    .outputMode('update')\
    .foreachBatch(foreach_batch_function)\
    .start()

In [None]:
query.stop()

## Exemplo de Structured Streaming usando Append Mode

In [None]:
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 10000)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words
words = lines.select(
    'timestamp',
    F.explode(
        F.split(lines['value'], ' ')
    ).alias('word')
)

wordCounts = words\
    .withWatermark('timestamp', '10 seconds')\
    .groupBy(
        F.window('timestamp', '10 seconds', '10 seconds'),
        'word'
    ).count()

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    print(df.toPandas())

query = wordCounts\
    .writeStream\
    .outputMode('append')\
    .foreachBatch(foreach_batch_function)\
    .start()

In [None]:
query.stop()

## Exemplos de Structured Streaming usando arquivos CSV

Os "output modes" (modos de saída) são uma característica importante do Structured Streaming no Apache Spark que define como os resultados do processamento de streaming são escritos para um destino de saída. Existem três modos de saída principais:

1. **Complete Mode (Modo Completo):** Neste modo, o resultado completo de cada microbacth é escrito para o destino de saída. Isso significa que o resultado reflete o estado completo do conjunto de dados no final de cada microbacth, o que pode incluir dados duplicados se o mesmo dado tiver sido processado várias vezes devido a falhas ou reexecuções. É útil para casos em que você precisa de uma visão completa e atualizada do estado atual do stream em cada etapa.

2. **Append Mode (Modo Adição):** Neste modo, apenas os novos resultados de cada microbacth são escritos para o destino de saída. Isso significa que apenas as novas linhas ou eventos que foram processados desde a última execução são adicionados ao destino de saída. Se o mesmo dado for processado novamente, ele não será incluído no resultado, evitando duplicatas. É útil quando você está apenas interessado nos novos dados adicionados ao stream e não precisa manter o estado completo.

3. **Update Mode (Modo Atualização):** Este modo é semelhante ao modo completo, mas apenas as linhas que foram atualizadas desde a última execução são escritas para o destino de saída. Isso é útil quando você está interessado apenas nas linhas que foram modificadas ou atualizadas no stream. No entanto, o modo de atualização só funciona se o seu processamento de streaming incluir operações que podem ser identificadas como "upserts" ou atualizações no conjunto de dados.

A escolha do modo de saída depende dos requisitos específicos do seu aplicativo e do tipo de análise que você está realizando. Cada modo tem suas vantagens e limitações, e é importante selecionar o modo mais apropriado com base nas necessidades do seu aplicativo e nos requisitos de saída.

#### Exemplo com dados de bitcoin

In [None]:
df = spark.read.csv(
    f"{ROOT_DATA_PATH}/bigfile.csv",
    schema="Height INTEGER, \
      Input STRING, \
      Output STRING, \
      Sum STRING, \
      Time TIMESTAMP"
)
df.printSchema()

root
 |-- Height: integer (nullable = true)
 |-- Input: string (nullable = true)
 |-- Output: string (nullable = true)
 |-- Sum: string (nullable = true)
 |-- Time: timestamp (nullable = true)



In [None]:
df.show(3)

+------+--------------------+--------------------+------+-------------------+
|Height|               Input|              Output|   Sum|               Time|
+------+--------------------+--------------------+------+-------------------+
|   546|['1DZTzaBHUDM7T3Q...|['1KAD5EnzzLtrSo2...|['25']|2009-01-15 06:08:20|
|   546|['1KAD5EnzzLtrSo2...|['1KAD5EnzzLtrSo2...|['25']|2009-01-15 06:08:20|
|   546|['1KAD5EnzzLtrSo2...|['1DZTzaBHUDM7T3Q...|['25']|2009-01-15 06:08:20|
+------+--------------------+--------------------+------+-------------------+
only showing top 3 rows



In [None]:
%%time

df.select(F.sum('Height')).show()

+-------------+
|  sum(Height)|
+-------------+
|2589395531434|
+-------------+

CPU times: user 309 ms, sys: 60 ms, total: 369 ms
Wall time: 1min 4s


In [None]:
%%time

df.count()

CPU times: user 85.3 ms, sys: 5.56 ms, total: 90.9 ms
Wall time: 16.1 s


13494203

In [None]:
inputStream = spark.readStream.csv(
    f"{ROOT_DATA_PATH}/bitcoin",
    schema="Height INTEGER, \
      Input STRING, \
      Output STRING, \
      Sum STRING, \
      Time TIMESTAMP"
)

# inputStream = inputStream.select(F.sum('Height'))
inputStream = inputStream.select(F.count('Height')) # Modifica o DataFrame de streaming 'inputStream' usando a função
                                                    # 'select' para calcular a contagem de valores na coluna "Height".

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    print(df.toPandas())

# 1. Configura a escrita do DataFrame de streaming para um sink (destino) usando 'writeStream';
# 2. O método 'outputMode('update')' define o modo de saída como "update", o que significa que
#    apenas as linhas atualizadas desde a última execução serão escritas no sink;
# 3. Aplica a função 'foreachBatch' para cada microbacth de dados;
# 4. Inicia a consulta de streaming usando 'start()'. Isso inicia o processo de execução do fluxo.
#    Durante a execução, o Spark divide os dados de entrada em microbatches e aplica as transformações
#    definidas no inputStream a cada microbatch.
query = inputStream\
    .writeStream\
    .outputMode('update')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(480)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
query.stop()

#### Qual é a quantidade total de eventos de cada anúncio nos últimos 10 segundos? Calcular a cada 10 segundos e use update mode

O método **window** é usado para definir janelas de tempo no Spark Streaming. Ele permite agrupar os dados em intervalos específicos de tempo para análise. Aqui está uma explicação detalhada dos parâmetros do método window:

* **colName:** Este é o nome da coluna que contém os timestamps dos eventos. No seu caso, é 'timestamp', indicando que a janela de tempo será baseada nos timestamps presentes nessa coluna.

* **windowDuration:** Este parâmetro especifica o tamanho da janela de tempo. No seu código, é definido como '10 seconds', o que significa que cada janela de tempo terá uma duração de 10 segundos.

* **slideDuration (opcional):** Este parâmetro especifica o intervalo de tempo entre o início de uma janela e a próxima. No seu código, também é definido como '10 seconds', o que significa que as janelas são deslocadas a cada 10 segundos.

In [None]:
inputStream = spark.readStream.csv(
    f"{ROOT_DATA_PATH}/ad_action_exercises",
    schema="timestamp TIMESTAMP, \
      user_id STRING, \
      action STRING, \
      adId STRING, \
      campaignId STRING"
)

# 1. Os dados do streaming são agrupados por janelas de tempo
#    de 10 segundos (usando a função 'window', segundo parâmetro), com sliding
#    também de 10 segundos (terceiro parâmetro) e também pelo campo 'adId';
# 2. Em seguida,a contagem de registros é feita para cada grupo pelo 'count()'.
inputStream = inputStream\
    .groupBy(
        F.window('timestamp', '10 seconds', '10 seconds'),
        'adId'
    ).count()

def foreach_batch_function(df, epoch_id):
    df = df.orderBy('window.start')
    print(epoch_id)
    print(df.toPandas())

query = inputStream\
    .writeStream\
    .outputMode('update')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(60)

0
                                        window     adId  count
0   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_06   9840
1   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_05   8681
2   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_09   9212
3   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_03   8241
4   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_02   9187
5   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_07   9507
6   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_08   8164
7   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_10   5374
8   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_04   8840
9   (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_01   7864
10  (2023-09-01 02:42:10, 2023-09-01 02:42:20)  adId_04   7975
11  (2023-09-01 02:42:10, 2023-09-01 02:42:20)  adId_02   8281
12  (2023-09-01 02:42:10, 2023-09-01 02:42:20)  adId_03   7535
13  (2023-09-01 02:42:10, 2023-09-01 02:42:20)  adId_09   8186
14  (2023-09-01 02:42:10, 2023-09-01 02:42:20)  adId_

False

Ao observar as contagens de registros para cada janela de tempo, você está vendo quantos registros foram observados em intervalos de 10 segundos para cada 'adId'. Como o Spark Streaming processa os dados em batches, a mudança na época indica que um novo batch de dados foi processado para essa janela de tempo específica, e o resultado é a contagem de registros para esse batch em particular

In [None]:
query.stop()

#### Quais são os top 3 anúncios e intervalos com mais eventos considerando todos os intervalos de janela? Calcule com uma janela de 10 segundos e periodicidade de 10 segundos e use complete mode.



In [None]:
inputStream = spark.readStream.csv(
    f"{ROOT_DATA_PATH}/ad_action_exercises",
    schema="timestamp TIMESTAMP, \
      user_id STRING, \
      action STRING, \
      adId STRING, \
      campaignId STRING"
)

inputStream = inputStream\
    .groupBy(
        F.window('timestamp', '10 seconds', '10 seconds'),
        'adId'
    ).count()\
    .orderBy(F.desc('count'))\
    .limit(3)

def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    print(df.toPandas())

query = inputStream\
    .writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(30)

0
                                       window     adId  count
0  (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_06   9840
1  (2023-09-01 02:42:20, 2023-09-01 02:42:30)  adId_06   9731
2  (2023-09-01 02:42:00, 2023-09-01 02:42:10)  adId_07   9507


False

In [None]:
query.stop()

#### Quais são os top 3 anúncios com mais eventos dos últimos 10 segundos em cada intervalo de janela? Calcule a cada 10 segundos e use o complete mode

In [None]:
inputStream = spark.readStream.csv(
    f"{ROOT_DATA_PATH}/ad_action_exercises",
    schema="timestamp TIMESTAMP, \
      user_id STRING, \
      action STRING, \
      adId STRING, \
      campaignId STRING"
)

inputStream = inputStream\
    .groupBy(
        F.window('timestamp', '10 seconds', '10 seconds'),
        'adId'
    ).count()

def foreach_batch_function(df, epoch_id):
    window_group = Window.partitionBy('window.start')\
        .orderBy(F.desc('count'))
    df = df.withColumn('rank', F.row_number().over(window_group))\
        .where(F.col('rank') <= 3)\
        .drop('rank')\
        .orderBy(F.asc('window.start'))
    print(epoch_id)
    print(df.toPandas())

query = inputStream\
    .writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(40)

0


False

In [None]:
query.stop()

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 120, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 117, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-6-f2eddeb85302>", line 24, in foreach_batch_function
    print(df.toPandas())
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/pandas/conversion.py", line 202, in toPandas
    rows = self.collect()
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/dataframe.py", line 1257, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1322, 