In [1]:
from pyspark.sql import SparkSession

In [2]:
SPARK_MASTER="spark://cm1:7077"
APP_NAME="Exemplo Spark, Kafka e ElasticSearch"

KAFKA_HOST="cm3:9092"

KAFKA_TOPIC_IN="student-a190020377-entrada"
KAFKA_TOPIC_OUT="student-a190020377-saida"

INTERVAL = "10 seconds"

### Inicialização da sessão

In [3]:
spark = SparkSession.builder \
        .master(SPARK_MASTER) \
        .appName(APP_NAME) \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/07 20:25:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Inscrição no topico

In [4]:
messages = spark.readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", KAFKA_HOST) \
                .option("subscribe", KAFKA_TOPIC_IN) \
                .option('includeTimestamp', 'true') \
                .load()

### Contagem das palavras

In [5]:
from pyspark.sql.functions import explode, split, col, upper, window, to_json, struct, lit

#### Separação das linhas em palavras

In [6]:
words = messages \
            .select(
                explode(split(col("value"), "\s+")).alias("word"),
                messages.timestamp
            ).select(
                upper(col("word")).alias('word'), 
                col("timestamp")
            )
words

DataFrame[word: string, timestamp: timestamp]

#### Agrupamento das palavras

In [7]:
wordCounts = words.withWatermark("timestamp", INTERVAL) \
                    .groupBy(
                        window(words.timestamp, INTERVAL, INTERVAL),
                        "timestamp",
                        "word"
                    ) \
                    .count()
wordCounts

DataFrame[window: struct<start:timestamp,end:timestamp>, timestamp: timestamp, word: string, count: bigint]

## Escrita no terminal

In [13]:
wct = wordCounts.writeStream \
                .format('console') \
                .outputMode("update") \
                .start()

23/02/07 20:14:06 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fbade2bf-bd7d-40ab-9b1a-1f6332b05d74. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/02/07 20:14:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+---------+----+-----+
|window|timestamp|word|count|
+------+---------+----+-----+
+------+---------+----+-----+



## Conversão em JSON

In [8]:
wordCountsJson = wordCounts.select(
                                lit('1').alias('key'),
                                to_json(struct("word", "count")).alias('value')
                            )
wordCountsJson

DataFrame[key: string, value: string]

### Escrita no terminal do JSON

In [9]:
wcj = wordCountsJson.writeStream \
                .format('console') \
                .outputMode("update") \
                .trigger(processingTime=INTERVAL) \
                .start()

23/02/07 20:17:54 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9c4800fe-ff56-40fb-ad28-57e905e64679. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/02/07 20:17:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


[Stage 1:>                                                        (0 + 0) / 200]

## Envio para topico do Kafka

In [9]:
wck = wordCountsJson \
        .writeStream \
        .outputMode("update") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_HOST) \
        .option('topic', KAFKA_TOPIC_OUT) \
        .option('checkpointLocation', '/tmp/spark-a190020377') \
        .trigger(processingTime=INTERVAL) \
        .start()

23/02/07 20:25:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

23/02/07 20:26:00 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 10563 milliseconds


                                                                                

## Finalização

In [10]:
wct.stop()
wcj.stop()
wck.stop

NameError: name 'wct' is not defined

In [11]:
spark.stop()