In [1]:
import configparser
import findspark
import os
import pyspark
import time

from functools import reduce
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.streaming import StreamingContext

In [2]:
config = configparser.ConfigParser()
config.read('config.ini')

['config.ini']

In [3]:
TOPIC = config['LOCAL']['TOPIC']
SERVER = config['LOCAL']['SERVER']
INTERVAL = int(config['LOCAL']['INTERVAL'])

In [4]:
findspark.init()

In [5]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 pyspark-shell'

In [6]:
spark = SparkSession.builder.appName('Machine-IoT-Monitor').getOrCreate()

23/02/09 21:11:18 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.0.16 instead (on interface wlp4s0)
23/02/09 21:11:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/cfascina/Projetos/Machine-IoT-Monitor/.env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/cfascina/.ivy2/cache
The jars for the packages stored in: /home/cfascina/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6d0fadef-f50e-4e64-8689-578ea0824664;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: 

23/02/09 21:11:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/09 21:11:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
schema = StructType([
    StructField('id_machine', StringType(), True), 
    StructField('temperature', IntegerType(), True),
    StructField('rpm', IntegerType(), True),
    StructField('timestamp', StringType(), True) 
])

In [8]:
df = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', SERVER) \
    .option('subscribe', TOPIC) \
    .load()

In [9]:
df = df.selectExpr('CAST(value AS STRING)')

In [10]:
df_streaming = df.withColumn(
    'jsonData', 
    from_json(col('value'), schema)
).select('jsonData.*')


In [11]:
temp_table = f"machine_data_{time.strftime('%H%M%S', time.localtime())}"

In [12]:
streaming = df_streaming \
    .writeStream \
    .queryName(temp_table) \
    .outputMode('append') \
    .format('memory') \
    .start()

23/02/09 21:11:23 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-90aa2047-4c8f-4d57-9d11-8803e61e32f2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/02/09 21:11:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [13]:
logs = []

for _ in range(5):
    result = spark.sql(f"SELECT id_machine, temperature, rpm, timestamp from {temp_table}")
    logs.append(result)
    result.show()
    time.sleep(INTERVAL)
    
streaming.stop()

+----------+-----------+---+---------+
|id_machine|temperature|rpm|timestamp|
+----------+-----------+---+---------+
+----------+-----------+---+---------+



                                                                                

+----------+-----------+----+--------------------+
|id_machine|temperature| rpm|           timestamp|
+----------+-----------+----+--------------------+
|   MACH-10|        147|1613|2023-02-09 21:11:...|
|   MACH-10|        123|2306|2023-02-09 21:11:...|
+----------+-----------+----+--------------------+

+----------+-----------+----+--------------------+
|id_machine|temperature| rpm|           timestamp|
+----------+-----------+----+--------------------+
|   MACH-10|        147|1613|2023-02-09 21:11:...|
|   MACH-10|        123|2306|2023-02-09 21:11:...|
|   MACH-10|        140|1621|2023-02-09 21:12:...|
|   MACH-10|        102|1968|2023-02-09 21:12:...|
|   MACH-10|        110|2034|2023-02-09 21:12:...|
+----------+-----------+----+--------------------+

+----------+-----------+----+--------------------+
|id_machine|temperature| rpm|           timestamp|
+----------+-----------+----+--------------------+
|   MACH-10|        147|1613|2023-02-09 21:11:...|
|   MACH-10|        123|2306|

In [14]:
dfs = [df for df in logs]

In [15]:
dfs = reduce(DataFrame.unionAll, dfs)

In [16]:
dfs.coalesce(1).write.option('header', True).csv('logs')