importando as bibliotecas que vão ser usadas

In [1]:
try:
    import patoolib
    import pandas as pd
    import os
    import pyspark
    import sys
    from delta import *
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    from datetime import datetime, timedelta
except Exception as e:
    raise ValueError(e)

configurando variáveis de ambiente para usar o Apache Spark e o Java JDK

In [2]:
# Pasta onde o spark está instalado
path_spark = 'C:/apps/spark-3.3.2-bin-hadoop3'

# Pasta onde o Java JDK está instalado
path_java = 'C:/Program Files/Microsoft/jdk-11.0.16.8-hotspot'

# Definir como variáveis de ambiente
os.environ['SPARK_HOME']  = path_spark
os.environ['HADOOP_HOME'] = path_spark
os.environ['JAVA_HOME'] = path_java
os.environ['PATH'] += os.pathsep + f'{path_spark}/bin'
os.environ['PATH'] += os.pathsep + path_java

iniciando sessão spark

In [3]:
try:
    builder = pyspark.sql.SparkSession.builder.appName("spark_session") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.databricks.delta.schema.autoMerge.enabled","true")
    global spark
    spark = configure_spark_with_delta_pip(builder).getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
except Exception as e:
    print("Erro ao iniciar sessão")
    raise ValueError(e)

Extrai os arquivos .rar

In [4]:

arquivo_rar = 'equipment_failure_sensors.rar'
diretorio_destino = r'C:\Users\gabriel.cardoso\Desktop\Gabriel\1.Shape\challenge-de-interview\arquivo_extraido/'  
patoolib.extract_archive(arquivo_rar, outdir=diretorio_destino)

patool: Extracting equipment_failure_sensors.rar ...
patool: running "C:\Program Files\7-Zip\7z.EXE" x -oC:\Users\gabriel.cardoso\Desktop\Gabriel\1.Shape\challenge-de-interview\arquivo_extraido/ -- equipment_failure_sensors.rar


lê o arquivo txt extraido

In [4]:
df_txt = spark.read.text('arquivo_extraido\equpment_failure_sensors.txt')
df_txt.show(truncate = False)

+---------------------------------------------------------------------------------------+
|value                                                                                  |
+---------------------------------------------------------------------------------------+
|[2021-05-18 0:20:48]\tERROR\tsensor[5820]:\t(temperature\t311.29, vibration\t6749.50)  |
|[2021-05-18 0:20:48]\tERROR\tsensor[5820]:\t(temperature\t311.29, vibration\t6749.50)  |
|[2021-06-14 19:46:9]\tERROR\tsensor[3359]:\t(temperature\t270.00, vibration\t-335.39)  |
|[2020-09-27 22:55:11]\tERROR\tsensor[9503]:\t(temperature\t255.84, vibration\t1264.54) |
|[2019-02-9 20:56:4]\tERROR\tsensor[3437]:\t(temperature\t466.57, vibration\t-1865.26)  |
|[2019-02-6 6:19:34]\tERROR\tsensor[2958]:\t(temperature\t143.02, vibration\t-4993.13)  |
|[2019-08-10 20:23:22]\tERROR\tsensor[3743]:\t(temperature\t249.81, vibration\t8925.85) |
|[2021-03-25 14:39:49]\tERROR\tsensor[6282]:\t(temperature\t475.57, vibration\t-2859.46)|
|[2020-05-

Faz um filtro no que está entre os parênteses e criar uma nova coluna

In [11]:
df_filter = df_txt.withColumn("temperatureEvibration", regexp_extract(col("value"), r'\((.*?)\)', 1))
df_filter = df_filter.withColumn("value", trim(regexp_extract(col("value"), r'([^\(]*)', 1)))
df_filter.show(truncate=False)

+---------------------------------------------+----------------------------------------+
|value                                        |temperatureEvibration                   |
+---------------------------------------------+----------------------------------------+
|[2021-05-18 0:20:48]\tERROR\tsensor[5820]:\t |temperature\t311.29, vibration\t6749.50 |
|[2021-05-18 0:20:48]\tERROR\tsensor[5820]:\t |temperature\t311.29, vibration\t6749.50 |
|[2021-06-14 19:46:9]\tERROR\tsensor[3359]:\t |temperature\t270.00, vibration\t-335.39 |
|[2020-09-27 22:55:11]\tERROR\tsensor[9503]:\t|temperature\t255.84, vibration\t1264.54 |
|[2019-02-9 20:56:4]\tERROR\tsensor[3437]:\t  |temperature\t466.57, vibration\t-1865.26|
|[2019-02-6 6:19:34]\tERROR\tsensor[2958]:\t  |temperature\t143.02, vibration\t-4993.13|
|[2019-08-10 20:23:22]\tERROR\tsensor[3743]:\t|temperature\t249.81, vibration\t8925.85 |
|[2021-03-25 14:39:49]\tERROR\tsensor[6282]:\t|temperature\t475.57, vibration\t-2859.46|
|[2020-05-15 17:30:17

Faz a subistituição de \t por , na coluna value

In [13]:
df_substituindoT = df_filter.withColumn("value", regexp_replace(col("value"), r'\t', ','))
df_substituindoT.show(truncate=False)

+------------------------------------------+----------------------------------------+
|value                                     |temperatureEvibration                   |
+------------------------------------------+----------------------------------------+
|[2021-05-18 0:20:48],ERROR,sensor[5820]:, |temperature\t311.29, vibration\t6749.50 |
|[2021-05-18 0:20:48],ERROR,sensor[5820]:, |temperature\t311.29, vibration\t6749.50 |
|[2021-06-14 19:46:9],ERROR,sensor[3359]:, |temperature\t270.00, vibration\t-335.39 |
|[2020-09-27 22:55:11],ERROR,sensor[9503]:,|temperature\t255.84, vibration\t1264.54 |
|[2019-02-9 20:56:4],ERROR,sensor[3437]:,  |temperature\t466.57, vibration\t-1865.26|
|[2019-02-6 6:19:34],ERROR,sensor[2958]:,  |temperature\t143.02, vibration\t-4993.13|
|[2019-08-10 20:23:22],ERROR,sensor[3743]:,|temperature\t249.81, vibration\t8925.85 |
|[2021-03-25 14:39:49],ERROR,sensor[6282]:,|temperature\t475.57, vibration\t-2859.46|
|[2020-05-15 17:30:17],ERROR,sensor[2477]:,|temperatur

Dividindo a coluna temperatureEvibration criada anteriormente em duas colunas diferentes, chamadas temperature e vibration

In [16]:
df_column_div = df_substituindoT.withColumn("temperature", split("temperatureEvibration", ",")[0]) \
                    .withColumn("vibration", split("temperatureEvibration", ",")[1]) \
                    .drop("temperatureEvibration")
df_column_div.show(truncate=False)

+------------------------------------------+-------------------+--------------------+
|value                                     |temperature        |vibration           |
+------------------------------------------+-------------------+--------------------+
|[2021-05-18 0:20:48],ERROR,sensor[5820]:, |temperature\t311.29| vibration\t6749.50 |
|[2021-05-18 0:20:48],ERROR,sensor[5820]:, |temperature\t311.29| vibration\t6749.50 |
|[2021-06-14 19:46:9],ERROR,sensor[3359]:, |temperature\t270.00| vibration\t-335.39 |
|[2020-09-27 22:55:11],ERROR,sensor[9503]:,|temperature\t255.84| vibration\t1264.54 |
|[2019-02-9 20:56:4],ERROR,sensor[3437]:,  |temperature\t466.57| vibration\t-1865.26|
|[2019-02-6 6:19:34],ERROR,sensor[2958]:,  |temperature\t143.02| vibration\t-4993.13|
|[2019-08-10 20:23:22],ERROR,sensor[3743]:,|temperature\t249.81| vibration\t8925.85 |
|[2021-03-25 14:39:49],ERROR,sensor[6282]:,|temperature\t475.57| vibration\t-2859.46|
|[2020-05-15 17:30:17],ERROR,sensor[2477]:,|temperatur

Retirando a tabulação das colunas temperature e vibration para manter somente os valores que vão ser usados

In [17]:
df_removeTab = df_column_div.withColumn("temperature", split("temperature", "\t")[1])
df_removeTab = df_removeTab.withColumn("vibration", split("vibration", "\t")[1])
df_removeTab.show(truncate=False)

+------------------------------------------+-----------+---------+
|value                                     |temperature|vibration|
+------------------------------------------+-----------+---------+
|[2021-05-18 0:20:48],ERROR,sensor[5820]:, |311.29     |6749.50  |
|[2021-05-18 0:20:48],ERROR,sensor[5820]:, |311.29     |6749.50  |
|[2021-06-14 19:46:9],ERROR,sensor[3359]:, |270.00     |-335.39  |
|[2020-09-27 22:55:11],ERROR,sensor[9503]:,|255.84     |1264.54  |
|[2019-02-9 20:56:4],ERROR,sensor[3437]:,  |466.57     |-1865.26 |
|[2019-02-6 6:19:34],ERROR,sensor[2958]:,  |143.02     |-4993.13 |
|[2019-08-10 20:23:22],ERROR,sensor[3743]:,|249.81     |8925.85  |
|[2021-03-25 14:39:49],ERROR,sensor[6282]:,|475.57     |-2859.46 |
|[2020-05-15 17:30:17],ERROR,sensor[2477]:,|200.20     |-6866.76 |
|[2020-12-11 11:52:47],ERROR,sensor[3838]:,|330.33     |-6170.30 |
|[2019-04-16 8:28:34],ERROR,sensor[4031]:, |310.94     |7200.50  |
|[2020-10-1 20:8:30],ERROR,sensor[6848]:,  |495.47     |7626.3

1. Dividindo a coluna value e usando a virgula pra separar as colunas desejadas
2. Extraindo os numeros dos sensores que estão dentro de [] 

In [18]:
df_final = df_removeTab.withColumn("value_array", split(col("value"), ","))
df_final = df_final.withColumn("timestamp", df_final["value_array"][0]) \
                   .withColumn("error_type", df_final["value_array"][1]) \
                   .withColumn("sensor_info", df_final["value_array"][2]) \
                   .withColumn("temperature", df_final["temperature"].cast("float")) \
                   .withColumn("vibration", df_final["vibration"].cast("float"))
df_final = df_final.drop("value_array", "value")
df_final = df_final.withColumn("sensor_id", regexp_extract("sensor_info", "\\[(\\d+)\\]", 1))
df_final = df_final.drop("sensor_info")
df_final.show(truncate=False)


+-----------+---------+---------------------+----------+---------+
|temperature|vibration|timestamp            |error_type|sensor_id|
+-----------+---------+---------------------+----------+---------+
|311.29     |6749.5   |[2021-05-18 0:20:48] |ERROR     |5820     |
|311.29     |6749.5   |[2021-05-18 0:20:48] |ERROR     |5820     |
|270.0      |-335.39  |[2021-06-14 19:46:9] |ERROR     |3359     |
|255.84     |1264.54  |[2020-09-27 22:55:11]|ERROR     |9503     |
|466.57     |-1865.26 |[2019-02-9 20:56:4]  |ERROR     |3437     |
|143.02     |-4993.13 |[2019-02-6 6:19:34]  |ERROR     |2958     |
|249.81     |8925.85  |[2019-08-10 20:23:22]|ERROR     |3743     |
|475.57     |-2859.46 |[2021-03-25 14:39:49]|ERROR     |6282     |
|200.2      |-6866.76 |[2020-05-15 17:30:17]|ERROR     |2477     |
|330.33     |-6170.3  |[2020-12-11 11:52:47]|ERROR     |3838     |
|310.94     |7200.5   |[2019-04-16 8:28:34] |ERROR     |4031     |
|495.47     |7626.35  |[2020-10-1 20:8:30]  |ERROR     |6848  

In [19]:
df_final.printSchema()

root
 |-- temperature: float (nullable = true)
 |-- vibration: float (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- error_type: string (nullable = true)
 |-- sensor_id: string (nullable = true)



Começando a leitura do arquivo Json


In [27]:
df_json = pd.read_json("equipment.JSON")
df_json


Unnamed: 0,equipment_id,name,group_name
0,1,5310B9D7,FGHQWR2Q
1,2,43B81579,VAPQY59S
2,3,E1AD07D4,FGHQWR2Q
3,4,ADE40E7F,9N127Z5P
4,5,78FFAD0C,9N127Z5P
5,6,9AD15F7E,PA92NCXZ
6,7,E54B5C3A,FGHQWR2Q
7,8,86083278,NQWPA8D3
8,9,3329175B,VAPQY59S
9,10,98B84035,NQWPA8D3


Lendo o arquivo CSV

In [29]:
nome_arquivo_csv = 'equipment_sensors.csv'
df_csv = pd.read_csv(nome_arquivo_csv)
df_csv

Unnamed: 0,equipment_id,sensor_id
0,1,4275
1,2,5212
2,3,7381
3,4,396
4,5,1645
...,...,...
9995,14,7672
9996,1,1302
9997,2,434
9998,3,5440


1. Salvando os arquivos em parquet, o parquet será usando para a criação das tempView que vão ser usandas no spark.sql

In [30]:
df_json.to_parquet("parquet\equipments.parquet")
df_csv.to_parquet("parquet\equipment_sensors.parquet")

lendo os arquivos parquet que foram salvos anterior mente e criando as tempView que vão ser usadas

In [31]:
spark.read.parquet("parquet\equipments.parquet").createOrReplaceTempView("arquivo_json")
spark.read.parquet("parquet\equipment_sensors.parquet").createOrReplaceTempView("arquivo_csv")
df_final.createOrReplaceTempView("arquivo_txt")

Começando os select que vão ser usados para responder as 4 questões

In [32]:
spark.sql(
    """
    SELECT
        sensor_id,
        timestamp,
        error_type,
        vibration,
        temperature    
    FROM arquivo_txt as t
    """
).show(truncate=False)

+---------+---------------------+----------+---------+-----------+
|sensor_id|timestamp            |error_type|vibration|temperature|
+---------+---------------------+----------+---------+-----------+
|5820     |[2021-05-18 0:20:48] |ERROR     |6749.5   |311.29     |
|5820     |[2021-05-18 0:20:48] |ERROR     |6749.5   |311.29     |
|3359     |[2021-06-14 19:46:9] |ERROR     |-335.39  |270.0      |
|9503     |[2020-09-27 22:55:11]|ERROR     |1264.54  |255.84     |
|3437     |[2019-02-9 20:56:4]  |ERROR     |-1865.26 |466.57     |
|2958     |[2019-02-6 6:19:34]  |ERROR     |-4993.13 |143.02     |
|3743     |[2019-08-10 20:23:22]|ERROR     |8925.85  |249.81     |
|6282     |[2021-03-25 14:39:49]|ERROR     |-2859.46 |475.57     |
|2477     |[2020-05-15 17:30:17]|ERROR     |-6866.76 |200.2      |
|3838     |[2020-12-11 11:52:47]|ERROR     |-6170.3  |330.33     |
|4031     |[2019-04-16 8:28:34] |ERROR     |7200.5   |310.94     |
|6848     |[2020-10-1 20:8:30]  |ERROR     |7626.35  |495.47  

validando a quantidade de registros totais

In [33]:
spark.sql(
    """
    SELECT count(*)  
    FROM arquivo_txt as t
    """
).show(truncate=False)

+--------+
|count(1)|
+--------+
|5000001 |
+--------+



validando quantos registros duplicdos tem na tabela

In [34]:
spark.sql(
    """
    WiTH duplicadas as (
    SELECT sensor_id, timestamp, error_type, vibration, temperature, COUNT(*)
    FROM arquivo_txt
    GROUP BY sensor_id, timestamp, error_type, vibration, temperature 
    HAVING COUNT(*) > 1
    )
    Select * from duplicadas
    """
).show(truncate=False)


+---------+--------------------+----------+---------+-----------+--------+
|sensor_id|timestamp           |error_type|vibration|temperature|count(1)|
+---------+--------------------+----------+---------+-----------+--------+
|5820     |[2021-05-18 0:20:48]|ERROR     |6749.5   |311.29     |2       |
+---------+--------------------+----------+---------+-----------+--------+



excluindo os registros duplicados para poder trabalhar com a tabela final

In [35]:
spark.sql(
    """
    with deduplicate as (
    SELECT
        sensor_id,
        ROW_NUMBER() OVER (PARTITION BY sensor_id, timestamp, error_type,vibration, temperature    ORDER BY timestamp asc) AS row_number,
        timestamp,
        error_type,
        vibration,
        temperature    
    FROM arquivo_txt as t
    ),
    dup as (
        select * from deduplicate
        where row_number = 1
    )
    select * from dup
    """
).createOrReplaceTempView("arquivo_txt_deduplicado")

Respondendo as questões:

Total equipment failures that happened?

In [36]:
spark.sql(
    """
    with qtd_de_erros as (
    SELECT
        sensor_id,
        timestamp,
        error_type,
        vibration,
        temperature    
    FROM arquivo_txt_deduplicado as t
    WHERE error_type = 'ERROR'
   )
   select count(*) from qtd_de_erros
    """
).show(truncate=False)

+--------+
|count(1)|
+--------+
|4749474 |
+--------+



criando os joins entre as tempView pra conseguir chegar nos names e group_names de cada sensor

In [37]:
spark.sql(
    """
    SELECT
        t.sensor_id,
        timestamp,
        error_type,
        vibration,
        temperature,
        c.equipment_id  
    FROM arquivo_txt_deduplicado as t
    Left join arquivo_csv c on t.sensor_id = c.sensor_id
    """
).createOrReplaceTempView("txt_join_csv")

In [38]:
spark.sql(
    """
    SELECT
        j.name,
        j.group_name,
        sensor_id,
        t.equipment_id,
        timestamp,
        error_type,
        vibration,
        temperature 
    FROM txt_join_csv as t
    left join arquivo_json j on j.equipment_id = t.equipment_id
    """
).createOrReplaceTempView("names_e_groupNames")

In [39]:
spark.sql(
    """
    SELECT
        name,
        group_name,
        sensor_id,
        equipment_id,
        timestamp,
        error_type,
        vibration,
        temperature 
    FROM names_e_groupNames as n
    """
).show(truncate=False)

+--------+----------+---------+------------+---------------------+----------+---------+-----------+
|name    |group_name|sensor_id|equipment_id|timestamp            |error_type|vibration|temperature|
+--------+----------+---------+------------+---------------------+----------+---------+-----------+
|98B84035|NQWPA8D3  |1        |10          |[2019-01-1 0:16:36]  |ERROR     |2905.12  |58.97      |
|98B84035|NQWPA8D3  |1        |10          |[2019-01-18 21:27:9] |ERROR     |-5357.64 |370.79     |
|98B84035|NQWPA8D3  |1        |10          |[2019-01-19 11:35:34]|ERROR     |3476.94  |464.17     |
|98B84035|NQWPA8D3  |1        |10          |[2019-01-20 2:46:19] |ERROR     |2250.63  |307.37     |
|98B84035|NQWPA8D3  |1        |10          |[2019-02-1 17:14:15] |ERROR     |3745.17  |273.32     |
|98B84035|NQWPA8D3  |1        |10          |[2019-02-10 17:52:12]|ERROR     |2752.82  |20.37      |
|98B84035|NQWPA8D3  |1        |10          |[2019-02-2 1:12:18]  |ERROR     |-6122.21 |163.8      |


Which equipment name had most failures?

In [42]:
spark.sql(
    """
    SELECT
        name,
        COUNT(*) as total_falhas
    FROM names_e_groupNames
    WHERE error_type = 'ERROR'
    GROUP BY name
    ORDER BY total_falhas DESC
    LIMIT 1
    """
).show(truncate=False)

+--------+------------+
|name    |total_falhas|
+--------+------------+
|2C195700|343800      |
+--------+------------+



Average amount of failures across equipment group, ordered by the number of failures in ascending order?

In [43]:
spark.sql(
    """
    SELECT group_name,
        sensor_id,
        equipment_id,
        count(1) as c
    FROM names_e_groupNames
    WHERE error_type = 'ERROR'
    GROUP BY group_name, sensor_id, equipment_id
    ORDER BY group_name desc
    """
).createOrReplaceTempView("media_falhas")

In [44]:
spark.sql(
    """
    SELECT 
        AVG(c) as media_falhas,
        group_name
    FROM media_falhas
    GROUP BY group_name
    """
).show(truncate=False)

+------------------+----------+
|media_falhas      |group_name|
+------------------+----------+
|499.3943557370962 |FGHQWR2Q  |
|499.7354458364038 |PA92NCXZ  |
|500.8073529411765 |Z9K1SAP4  |
|500.5375          |9N127Z5P  |
|499.53538311371403|VAPQY59S  |
|500.834680382072  |NQWPA8D3  |
+------------------+----------+



Rank the sensors which present the most number of errors by equipment name in an equipment group.

In [45]:
spark.sql(
    """
    SELECT
        group_name,
        sensor_id,
        name as equipment_name,
        COUNT(*) as total_erros
    FROM names_e_groupNames
    WHERE error_type = 'ERROR'
    GROUP BY group_name, sensor_id, name
    ORDER BY equipment_name, group_name, total_erros desc
    """
).show(truncate=False)

+----------+---------+--------------+-----------+
|group_name|sensor_id|equipment_name|total_erros|
+----------+---------+--------------+-----------+
|PA92NCXZ  |9400     |09C37FB8      |577        |
|PA92NCXZ  |1358     |09C37FB8      |573        |
|PA92NCXZ  |8631     |09C37FB8      |563        |
|PA92NCXZ  |862      |09C37FB8      |560        |
|PA92NCXZ  |3070     |09C37FB8      |559        |
|PA92NCXZ  |8752     |09C37FB8      |559        |
|PA92NCXZ  |1125     |09C37FB8      |556        |
|PA92NCXZ  |4494     |09C37FB8      |556        |
|PA92NCXZ  |9285     |09C37FB8      |549        |
|PA92NCXZ  |2778     |09C37FB8      |548        |
|PA92NCXZ  |6948     |09C37FB8      |548        |
|PA92NCXZ  |470      |09C37FB8      |548        |
|PA92NCXZ  |6156     |09C37FB8      |547        |
|PA92NCXZ  |6189     |09C37FB8      |546        |
|PA92NCXZ  |6880     |09C37FB8      |546        |
|PA92NCXZ  |9958     |09C37FB8      |545        |
|PA92NCXZ  |798      |09C37FB8      |545        |


encerra a sessão spark

In [46]:
try:
    spark.stop()
    print('SparkSession encerrada')
except Exception as e:
    print('Erro ao encerrar a SparkSession')
    raise ValueError(e)

SparkSession encerrada
