In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.appName("ETL").getOrCreate()

23/06/19 04:25:17 WARN Utils: Your hostname, codespaces-75f113 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
23/06/19 04:25:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/19 04:25:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Extract Data

In [3]:
compressed_file_path = "staging/equipment_failure_sensors.txt.gz"
equipament_sensors_path = "staging/equipment_sensors.csv"
equipament_path = "staging/equipment.json"

In [16]:
schema = StructType(
    [
        StructField("equipment_id", IntegerType(), True),
        StructField("sensor_id", IntegerType(), True)
    ]
    )

equipment_failure_sensors_df = spark.read.text(compressed_file_path)
equipament_sensors_df = spark.read.schema(schema).csv(equipament_sensors_path, header=True, sep=",")
equipament_df = spark.read.json(equipament_path, multiLine=True)

equipment_failure_sensors_df.show(5, truncate=False)
equipament_sensors_df.show(5, truncate=False)
equipament_df.show(5, truncate=False)

+--------------------------------------------------------------------------------------+
|value                                                                                 |
+--------------------------------------------------------------------------------------+
|[2021-05-18 0:20:48]\tERROR\tsensor[5820]:\t(temperature\t311.29, vibration\t6749.50) |
|[2021-05-18 0:20:48]\tERROR\tsensor[5820]:\t(temperature\t311.29, vibration\t6749.50) |
|[2021-06-14 19:46:9]\tERROR\tsensor[3359]:\t(temperature\t270.00, vibration\t-335.39) |
|[2020-09-27 22:55:11]\tERROR\tsensor[9503]:\t(temperature\t255.84, vibration\t1264.54)|
|[2019-02-9 20:56:4]\tERROR\tsensor[3437]:\t(temperature\t466.57, vibration\t-1865.26) |
+--------------------------------------------------------------------------------------+
only showing top 5 rows

+-------------+---------+
|equipament_id|sensor_id|
+-------------+---------+
|null         |null     |
|1            |4275     |
|2            |5212     |
|3            |738

In [107]:
equipment_failure_sensors_df = equipment_failure_sensors_df.withColumn(
    "value", regexp_replace("value", r"(\d{4})/(\d{2})/(\d{1,2})", "$1-$2-$3 00:00:00"))


equipment_failure_sensors_df = equipment_failure_sensors_df.select(
    regexp_extract("value", r"^\[(.+)\]\t", 1).alias("timestamp"),
    regexp_extract("value", r"\]\t(\w+)\t", 1).alias("level"),
    regexp_extract("value", r"sensor\[(\d+)\]", 1).alias("sensor_id"),
    regexp_extract("value", r"temperature\t(-?[\d\.]+)", 1).alias("temperature"),
    regexp_extract("value", r"vibration\t(-?[\d\.]+)", 1).alias("vibration"),
)



equipment_failure_sensors_df.show(5, truncate=False)

+-------------------+-----+---------+-----------+---------+
|timestamp          |level|sensor_id|temperature|vibration|
+-------------------+-----+---------+-----------+---------+
|2021-05-18 0:20:48 |ERROR|5820     |311.29     |6749.50  |
|2021-05-18 0:20:48 |ERROR|5820     |311.29     |6749.50  |
|2021-06-14 19:46:9 |ERROR|3359     |270.00     |-335.39  |
|2020-09-27 22:55:11|ERROR|9503     |255.84     |1264.54  |
|2019-02-9 20:56:4  |ERROR|3437     |466.57     |-1865.26 |
+-------------------+-----+---------+-----------+---------+
only showing top 5 rows



In [108]:
equipment_failure_sensors_df = equipment_failure_sensors_df.withColumn("level", col("level").cast(StringType())) \
    .withColumn("timestamp", col("timestamp").cast(TimestampType())) \
    .withColumn("sensor_id", col("sensor_id").cast(IntegerType())) \
    .withColumn("temperature", col("temperature").cast(DoubleType())) \
    .withColumn("vibration", col("vibration").cast(DoubleType()))

equipment_failure_sensors_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- level: string (nullable = true)
 |-- sensor_id: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- vibration: double (nullable = true)



In [None]:
equipament_sensors_df = equipament_sensors_df.withColumn("equipment_id", col("equipment_id").cast(IntegerType())) \
    .withColumn("sensor_id", col("sensor_id").cast(IntegerType()))

equipament_sensors_df.printSchema()

In [None]:
equipament_df = equipament_df.withColumn("equipment_id", col("equipment_id").cast(IntegerType())) \
    .withColumn("group_name", col("group_name").cast(StringType())) \
    .withColumn("name", col("name").cast(StringType()))

equipament_df.printSchema()

In [None]:
equipment_failure_sensors_df.write.mode("overwrite").parquet("raw/equipment_failure_sensors.parquet")
equipament_sensors_df.write.mode("overwrite").parquet("raw/equipament_sensors.parquet")
equipament_df.write.mode("overwrite").parquet("raw/equipament.parquet")

                                                                                

## Star Schema

In [80]:
equipment_failure_sensors_df = spark.read.parquet("raw/equipment_failure_sensors.parquet")
equipament_sensors_df = spark.read.parquet("raw/equipament_sensors.parquet")
equipament_df = spark.read.parquet("raw/equipament.parquet")


In [115]:
# Join three tables
fact_table = equipment_failure_sensors_df.join(equipament_sensors_df, "sensor_id", "left") \
    .select("sensor_id", "equipment_id", "timestamp", "level", "temperature", "vibration")

fact_table.show(5, truncate=False)

fact_table.write.mode("overwrite").partitionBy("equipment_id", "sensor_id").parquet("prepared/equipment_failure.parquet")

                                                                                

+---------+------------+-------------------+-----+-----------+---------+
|sensor_id|equipment_id|timestamp          |level|temperature|vibration|
+---------+------------+-------------------+-----+-----------+---------+
|5820     |2           |2021-05-18 00:20:48|ERROR|311.29     |6749.5   |
|5820     |2           |2021-05-18 00:20:48|ERROR|311.29     |6749.5   |
|3359     |12          |2021-06-14 19:46:09|ERROR|270.0      |-335.39  |
|9503     |10          |2020-09-27 22:55:11|ERROR|255.84     |1264.54  |
|3437     |6           |2019-02-09 20:56:04|ERROR|466.57     |-1865.26 |
+---------+------------+-------------------+-----+-----------+---------+
only showing top 5 rows



                                                                                

In [118]:
equipament_dim = equipament_df.select("equipment_id", "group_name", "name")
equipament_dim.show(5, truncate=False)
equipament_dim.write.mode("overwrite").parquet("prepared/equipament.parquet")

+------------+----------+--------+
|equipment_id|group_name|name    |
+------------+----------+--------+
|1           |FGHQWR2Q  |5310B9D7|
|2           |VAPQY59S  |43B81579|
|3           |FGHQWR2Q  |E1AD07D4|
|4           |9N127Z5P  |ADE40E7F|
|5           |9N127Z5P  |78FFAD0C|
+------------+----------+--------+
only showing top 5 rows



## Analytics

In [None]:
equipment_failure_sensors_df.groupBy("level").count().show()

[Stage 36:>                                                         (0 + 1) / 1]

+-------+-------+
|  level|  count|
+-------+-------+
|  ERROR|4749475|
+-------+-------+



In [None]:
print("Total equipment failures: ", equipment_failure_sensors_df.count())

                                                                                

[Stage 39:>                                                         (0 + 1) / 1]

Total equipment failures:  5000001


                                                                                

In [None]:
# Join equipment_failure_sensors with equipament_sensors
df = equipment_failure_sensors_df.join(equipament_sensors_df, equipment_failure_sensors_df.sensor == equipament_sensors_df.sensor_id, "left")\
    .drop("sensor_id", "sensor", "temperature", "vibration")

df.show(5, truncate=False)

+-------------------+-----+------------+
|timestamp          |level|equipment_id|
+-------------------+-----+------------+
|2021-05-18 00:20:48|ERROR|2           |
|2021-05-18 00:20:48|ERROR|2           |
|2021-06-14 19:46:09|ERROR|12          |
|2020-09-27 22:55:11|ERROR|10          |
|2019-02-09 20:56:04|ERROR|6           |
+-------------------+-----+------------+
only showing top 5 rows



In [None]:
df.groupBy("equipment_id").count().show(5, truncate=False)

[Stage 45:>                                                         (0 + 1) / 1]

+------------+------+
|equipment_id|count |
+------------+------+
|12          |357179|
|1           |357220|
|13          |357528|
|6           |356084|
|3           |357627|
+------------+------+
only showing top 5 rows



                                                                                