#Script Case Shape - Amanda Louise Costa Nascimento

In [None]:
!pip install pyspark



In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_utc_timestamp, unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_extract, to_timestamp
import argparse
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_utc_timestamp, unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Inicialize a sessão do Spark
spark = SparkSession.builder \
    .appName("Case Shape Amanda Nascimento") \
    .getOrCreate()


# Definindo o esquema para o arquivo de equipment
equipment_schema = StructType([
    StructField("equipment_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("group_name", StringType(), True)
])

# Carregando o arquivo de log
log_df1 = spark.read.text("/content/equipment_failure_sensors.txt")

# Extrair informações usando expressões regulares
extracted_df = log_df1.select(
    regexp_extract("value", r"\[(.*?)\]", 1).alias("data_hora"),
    regexp_extract("value", r"sensor\[(\d+)\]", 1).alias("sensor_id"),
    regexp_extract("value", r"temperature\s+(-?\d+\.\d+)", 1).alias("temp_sensor"),
    regexp_extract("value", r"vibration\s+(-?\d+\.\d+)", 1).alias("vibration_sensor")
)
# Converter a coluna 'data_hora' para o formato de timestamp
processed_df = extracted_df.withColumn("data_hora", to_timestamp("data_hora", "yyyy-MM-dd HH:mm:ss"))

# Convertendo o timestamp para UTC
log_df = processed_df.withColumn("timestamp", from_utc_timestamp(col("data_hora"), "GMT"))
# Renomeie a coluna sensor_id de log_df para evitar ambiguidade
log_df = log_df.withColumnRenamed("sensor_id", "log_sensor_id")

# Carregando o arquivo de relacionamento entre sensores e equipamentos
sensor_equipment_df1 = spark.read.csv("/content/equipment_sensors.csv", header=True)

# Carregando o arquivo JSON com o esquema especificado
equipment_df = spark.read.option("multiline","true").schema(equipment_schema).json("/content/equipment.json")

# Junção dos dataframes para obter o nome do equipamento com base no sensor
sensor_equipment_df = sensor_equipment_df1.join(equipment_df, sensor_equipment_df1.equipment_id == equipment_df.equipment_id, "left") \
    .select(sensor_equipment_df1["*"], equipment_df["name"], equipment_df["group_name"])

# Aplicando as consultas solicitadas

# 1. Falhas totais de equipamentos que aconteceram
total_failures = log_df.count()

# 2. Nome do equipamento com mais falhas
most_failed_equipment = log_df.join(sensor_equipment_df, log_df.log_sensor_id == sensor_equipment_df.sensor_id) \
    .groupBy("name") \
    .count() \
    .orderBy(col("count").desc()) \
    .select("name") \
    .first()[0]

# 3. Quantidade média de falhas em todos os grupos de equipamentos, ordenada pelo número de falhas em ordem crescente
average_failures_per_group = log_df.join(sensor_equipment_df, log_df.log_sensor_id == sensor_equipment_df.sensor_id) \
    .groupBy("group_name") \
    .count() \
    .orderBy(col("count").asc()) \
    .select("group_name", (F.col("count") / total_failures).alias("qt_media_falhas"))

# 4. Sensores que apresentam maior número de erros por nome de equipamento em um grupo de equipamentos
sensors_with_most_errors = log_df.join(sensor_equipment_df, log_df.log_sensor_id == sensor_equipment_df.sensor_id) \
    .groupBy("group_name", "name", "sensor_id") \
    .count() \
    .orderBy(col("count").desc()) \
    .select("group_name", "name", "sensor_id", "count")

# Mostrando os resultados
print("1. Falhas totais de equipamentos que aconteceram:", total_failures)
print("2. Nome do equipamento com mais falhas:", most_failed_equipment)
print("3. Quantidade média de falhas em todos os grupos de equipamentos:")
average_failures_per_group.show()
print("4. Sensores que apresentam maior número de erros por nome de equipamento em um grupo de equipamentos:")
sensors_with_most_errors.show()

# Encerrando a sessão do Spark
#spark.stop()

1. Falhas totais de equipamentos que aconteceram: 77272
2. Nome do equipamento com mais falhas: CF304D24
3. Quantidade média de falhas em todos os grupos de equipamentos:
+----------+-------------------+
|group_name|    qt_media_falhas|
+----------+-------------------+
|  Z9K1SAP4|0.07151361424578113|
|  NQWPA8D3|0.14107309245263486|
|  PA92NCXZ|0.14135780101459777|
|  9N127Z5P| 0.1423672222797391|
|  VAPQY59S|0.21469613831659592|
|  FGHQWR2Q| 0.2889921316906512|
+----------+-------------------+

4. Sensores que apresentam maior número de erros por nome de equipamento em um grupo de equipamentos:
+----------+--------+---------+-----+
|group_name|    name|sensor_id|count|
+----------+--------+---------+-----+
|  NQWPA8D3|98B84035|     8001|   21|
|  FGHQWR2Q|5310B9D7|     8917|   20|
|  VAPQY59S|3329175B|     7169|   20|
|  PA92NCXZ|09C37FB8|     4003|   19|
|  FGHQWR2Q|E54B5C3A|     2826|   19|
|  Z9K1SAP4|4E834E81|     1850|   19|
|  VAPQY59S|2C195700|     2742|   18|
|  FGHQWR2Q|E1AD

In [8]:
pip install black

Collecting black
  Downloading black-24.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
Collecting mypy-extensions>=0.4.3 (from black)
  Downloading mypy_extensions-1.0.0-py3-none-any.whl (4.7 kB)
Collecting pathspec>=0.9.0 (from black)
  Downloading pathspec-0.12.1-py3-none-any.whl (31 kB)
Installing collected packages: pathspec, mypy-extensions, black
Successfully installed black-24.3.0 mypy-extensions-1.0.0 pathspec-0.12.1


In [9]:
!black /content/case_shape.py

[1mreformatted /content/case_shape.py[0m

[1mAll done! ✨ 🍰 ✨[0m
[34m[1m1 file [0m[1mreformatted[0m.


In [10]:
!apt-get install -y git

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git is already the newest version (1:2.34.1-1ubuntu1.10).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [24]:
!git config --global user.name "amanda-louise"
!git config --global user.email "amandalouise@id.uff.br"

Reinitialized existing Git repository in /content/.git/


In [None]:
!git init
!git add case_shape.py
!git commit -m "Adding reformatted python"
!git branch -M main
!git remote add origin https://github.com/amanda-louise/case-shape.git
!git push -u origin main

In [None]:
!git status