# Initialize spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming import StreamingContext

spark = SparkSession \
    .builder \
    .appName("CloudProject") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/26 20:39:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/26 20:39:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
from pyspark.sql.types import *
scc = StreamingContext(spark,4)

# Initialize data streaming

In [3]:
MONITOR_SOURCE='/home/guglielmo/universita/cloud_computing/progetto/code/mock-data/monitor'
P_SENSORS_SOURCE='/home/guglielmo/universita/cloud_computing/progetto/code/mock-data/measurements/pollution'
T_SENSORS_SOURCE='/home/guglielmo/universita/cloud_computing/progetto/code/mock-data/measurements/temperature'

### Initialize monitor data streaming

In [4]:
schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("device_health", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("area", StringType(), True),
    StructField("customer", StringType(), True),
])
monitor_stream = spark.readStream.schema(schema).format("json").option('path',MONITOR_SOURCE).load()
monitor_stream

DataFrame[device_id: string, device_health: int, type: string, area: string, customer: string]

In [5]:
monitor_stream.isStreaming

True

### Initialize sensors data streaming

In [6]:
schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("measured", TimestampType(), True),
    StructField("arrived", TimestampType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("temperature", IntegerType(), True)
])
t_sensors_stream = spark.readStream.schema(schema).format("json").option('path',T_SENSORS_SOURCE).load()
t_sensors_stream

DataFrame[device_id: string, measured: timestamp, arrived: timestamp, humidity: int, temperature: int]

In [7]:
schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("measured", TimestampType(), True),
    StructField("arrived", TimestampType(), True),
    StructField("CO2_level", IntegerType(), True)
])
p_sensors_stream = spark.readStream.schema(schema).format("json").option('path',P_SENSORS_SOURCE).load()
p_sensors_stream

DataFrame[device_id: string, measured: timestamp, arrived: timestamp, CO2_level: int]

In [8]:
print(monitor_stream.isStreaming, t_sensors_stream.isStreaming, p_sensors_stream.isStreaming)

True True True


### Cleanse data

In [9]:
import dateutil.parser

# Remove duplicates
t_query = t_sensors_stream.dropDuplicates(["device_id", "measured"])

# Remove values arrived more than one day late
t_query = t_query.filter("datediff(arrived, measured) < 1")

### Aggregate data

In [10]:
m_query = monitor_stream

final = t_query.join(m_query, on="device_id")

In [13]:
m_stream = m_query \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

t_stream = t_query \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

def process(row):
    row = row.asDict()
    row['measured'] = str(row['measured'])
    row['arrived'] = str(row['arrived'])
    print(row)
    

final.writeStream.foreach(process).start()

final_stream = final \
    .writeStream \
    .foreach(process) \
    .start()

'''p_sensors_query = p_sensors_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()'''

-------------------------------------------
Batch: 0
-------------------------------------------


22/06/26 20:41:01 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9e935ed5-4ada-45d7-aa4f-f2b3cf02c0a0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/26 20:41:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/06/26 20:41:01 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b83c3d7f-c78b-4802-afb8-5f9dee9a581c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/26 20:41:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not support

'p_sensors_query = p_sensors_stream     .writeStream     .outputMode("append")     .format("console")     .start()'

+----------+-------------+-----------+-----------+-------------+
| device_id|device_health|       type|       area|     customer|
+----------+-------------+-----------+-----------+-------------+
|t-17231251|            0|temperature|residential|   AB-Service|
|t-67464570|            3|temperature|residential|   AB-Service|
|t-48882030|            0|temperature| industrial|Atlanta Group|
|p-34605465|           10|  pollution| industrial|Atlanta Group|
|p-45626401|            2|  pollution| industrial|Atlanta Group|
|t-68775576|            8|temperature| industrial|   AB-Service|
|p-51135281|            9|  pollution|residential|Atlanta Group|
|p-86670867|           10|  pollution| industrial|Atlanta Group|
|p-36725626|            4|  pollution|residential|Atlanta Group|
|t-77155876|            1|temperature|residential|Atlanta Group|
|t-43260871|            1|temperature|residential|   AB-Service|
|p-88074073|            1|  pollution|residential|Atlanta Group|
|p-56728681|            0

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-------------------+-------------------+--------+-----------+
|device_id|           measured|            arrived|humidity|temperature|
+---------+-------------------+-------------------+--------+-----------+
|        2|2022-06-17 00:00:00|2022-06-17 00:00:00|       4|          6|
|        1|2022-06-01 00:00:00|2022-06-01 00:00:00|       6|          6|
+---------+-------------------+-------------------+--------+-----------+



{'device_id': '1', 'measured': '2022-06-01 00:00:00', 'arrived': '2022-06-01 00:00:00', 'humidity': 6, 'temperature': 6, 'device_health': 0, 'type': 'temperature', 'area': 'residential', 'customer': 'AB-Service'}
{'device_id': '2', 'measured': '2022-06-17 00:00:00', 'arrived': '2022-06-17 00:00:00', 'humidity': 4, 'temperature': 6, 'device_health': 3, 'type': 'temperature', 'area': 'residential', 'customer': 'AB-Service'}
{'device_id': '1', 'measured': '2022-06-01 00:00:00', 'arrived': '2022-06-01 00:00:00', 'humidity': 6, 'temperature': 6, 'device_health': 0, 'type': 'temperature', 'area': 'residential', 'customer': 'AB-Service'}
{'device_id': '2', 'measured': '2022-06-17 00:00:00', 'arrived': '2022-06-17 00:00:00', 'humidity': 4, 'temperature': 6, 'device_health': 3, 'type': 'temperature', 'area': 'residential', 'customer': 'AB-Service'}
                                                                                

In [12]:
m_stream.stop()
t_stream.stop()
# p_sensors_query.stop()
final_stream.stop()