In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime
import re

In [2]:
conf = SparkConf().setAppName("RDD_TextAnalysis").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:
raw_log_rdd=sc.textFile("/home/jovyan/3_5_Independent_work/sensor_logs.txt")

log_fields_rdd=raw_log_rdd.map(lambda x: x.split("|"))

print(log_fields_rdd.take(6))



In [4]:
def convert(rdd):
  timestamp, sensor_id, temperature, pressure, status, error_code = rdd
  # Преобразование timestamp_str в datetime объект
  timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
  # Используем None, если значение 'NULL'
  temperature = float(temperature) if temperature != "NULL" else None
  pressure = float(pressure) if pressure != "NULL" else None
  # Используем None, если значение NULL
  error_code = error_code if error_code != "NULL" else None

  # Возвращаем кортеж с преобразованными типами
  return (timestamp, sensor_id, temperature, pressure, status, error_code)

In [5]:
transformed_rdd = log_fields_rdd.map(convert)
print(transformed_rdd.take(5))



In [6]:
# №2 - Подсчет статусов (OK, WARNING, ERROR)
count_status_all=(transformed_rdd.map(lambda x: (x[4], 1))
                                .reduceByKey(lambda x, y: x+y)
                                .sortBy(lambda x: x[1], ascending=False)
)
for i, j in count_status_all.collect():
  print(f"{i}: {j}")

OK: 28
ERROR: 6


In [7]:
# №3 - Подсчет ERROR по сенсорам
print("_" * 20, "Первый вариант", "_" * 20, '\n')
count_sensor_ERROR=(transformed_rdd.map(lambda x: (x[1], x[4]))
                                   .filter(lambda x: x[1] == "ERROR")
                                   .map(lambda x: (x[0], 1))
                                   .reduceByKey(lambda a, b: a + b)
                                   .sortByKey()
)
for i, j in count_sensor_ERROR.collect():
  print(f"{i}: {j}")

print("_" * 20, "Второй вариант", "_" * 20, '\n')

count_sensor_ERROR=(transformed_rdd.filter(lambda x: x[4] == "ERROR")
                                   .map(lambda x: (x[1]))
)
result=count_sensor_ERROR.countByValue()
result=dict(result)
for i, j in result.items():
  print(f"{i}: {j}")

print("_" * 20, "Все", "_" * 20, '\n')
count_sensor_ERROR=(transformed_rdd.map(lambda x: ((x[1], x[4]), 1))
                                   .reduceByKey(lambda a, b: a + b)
)
for (sensor_id, status), count in count_sensor_ERROR.collect():
    if status == "ERROR":
        print(f"{sensor_id}: {count}")
    print(f"{sensor_id} {status}: {count}")

____________________ Первый вариант ____________________ 

S103: 2
S104: 3
S105: 1
____________________ Второй вариант ____________________ 

S103: 2
S104: 3
S105: 1
____________________ Все ____________________ 

S101 OK: 11
S103: 2
S103 ERROR: 2
S102 OK: 7
S104 OK: 4
S104: 3
S104 ERROR: 3
S103 OK: 3
S105 OK: 3
S105: 1
S105 ERROR: 1


In [8]:
#4 Расчет среднего значения температуры
avg_temperature=(transformed_rdd.map(lambda x: x[2])
                                .filter(lambda x: x != None)
                                .mean()


)
print(f'{avg_temperature:.2f}')

25.84


In [9]:
#5
all_error=(transformed_rdd.map(lambda x: (x[5], 1))
                          .filter(lambda x: x[0] != None)
                          .reduceByKey(lambda a, b: a + b)
                          .sortByKey()
)
for i, j in all_error.collect():
  print(f"{i}: {j}")

E01: 2
E02: 2
E03: 2
E04: 2
E05: 2


In [10]:
high_temp_and_press=(transformed_rdd.map(lambda x: (x[0], x[1], x[2], x[3]))
                                    .filter(lambda x: x[0] != None and x[1] != None and x[2] != None and x[3] != None)
                                    .filter(lambda x: x[2] > 26 and x[3] > 10)
)
for i, j, k, l in high_temp_and_press.collect():
  print(f"{i} {j} {k} {l}")

2025-01-10 08:30:20 S104 27.0 10.3
2025-01-11 09:00:00 S104 27.2 10.4
2025-01-12 10:15:00 S102 26.3 10.2
2025-01-13 11:00:00 S104 27.3 10.6
2025-01-14 08:40:15 S101 26.1 10.6
2025-01-14 08:45:00 S102 26.4 10.4
2025-01-15 09:00:00 S104 27.5 10.7
2025-01-15 09:10:00 S101 26.2 10.7
2025-01-16 10:00:10 S104 27.6 10.8
2025-01-16 10:10:15 S101 26.3 10.8
2025-01-16 10:15:00 S102 26.6 10.5


In [11]:
sc.stop()