In [1]:
# imports
import pandas as pd

from datetime import datetime, date

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from models import Equipment, EquipmentLog, EquipmentSensor

DATABASE_URI = 'sqlite:///database.sqlite'

In [24]:
# init sessions
spark = SparkSession.builder.master(
    "local[*]"
).config(
    "spark.driver.memory", "15g"
).appName(
    "shape_challenge"
).getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

engine = create_engine(DATABASE_URI, echo=False)

In [3]:
# load database data to dataframes
with Session(engine) as session:
    e_df = spark.createDataFrame(
        [
            Row(
                **{
                    key: value for key, value in equipment.__dict__.items() if not key.startswith("_")
                }
            ) for equipment in session.query(Equipment).all()
        ]
    )
    s_df = spark.createDataFrame(
        [
            Row(
                **{
                    key: value for key, value in sensor.__dict__.items() if not key.startswith("_")
                }
            ) for sensor in session.query(EquipmentSensor).all()
        ]
    )

In [4]:
# load logfile to dataframe
with open("./resource/equipment_failure_sensors.txt", "r") as log_file:
    l_df = spark.createDataFrame(
        [
            Row(
                **EquipmentLog.parse_log_line(log_line)
            ) for log_line in log_file.readlines()
        ]
    )

In [5]:
# merge dataframes
m_df = l_df.join(s_df, on="sensor_id", how="left")
m_df = m_df.join(e_df, on="equipment_id", how="left")

##### Total equipment failures that happened


In [6]:
m_df.count()

                                                                                

5000001

##### Total equipment failures that happened by equipment name

In [7]:
m_df.groupBy("name").count().show()

                                                                                

+--------+------+
|    name| count|
+--------+------+
|3329175B|356736|
|2C195700|357229|
|9AD15F7E|356084|
|ADE40E7F|357618|
|CF304D24|357179|
|E1AD07D4|357627|
|4E834E81|357528|
|78FFAD0C|357521|
|5310B9D7|357220|
|86083278|356855|
|43B81579|356846|
|98B84035|358414|
|09C37FB8|357701|
|E54B5C3A|355443|
+--------+------+



##### Which equipment name had most failures?

In [8]:
m_df.groupBy("name").count().orderBy(F.col("count").desc()).limit(1).show()

                                                                                

+--------+------+
|    name| count|
+--------+------+
|98B84035|358414|
+--------+------+



                                                                                

##### Average amount of failures across equipment group, ordered by the number of failures in ascending order?

In [9]:
(
    m_df.groupBy(["group_name", "name"])
        .count()
        .groupBy("group_name")
        .agg(F.avg("count").alias("avg_count"))
        .orderBy(F.col("avg_count").asc())
        .show()
)

                                                                                

+----------+---------+
|group_name|avg_count|
+----------+---------+
|  FGHQWR2Q|356867.25|
|  PA92NCXZ| 356892.5|
|  VAPQY59S| 356937.0|
|  Z9K1SAP4| 357528.0|
|  9N127Z5P| 357569.5|
|  NQWPA8D3| 357634.5|
+----------+---------+



                                                                                

##### Rank the sensors which present the most number of errors by equipment name in an equipment group.

In [23]:
(
    m_df.groupBy(["group_name", "name", "sensor_id"])
        .count()
        .withColumn(
            'sensor_rank',
            F.rank().over(
                Window.partitionBy("group_name", "name").orderBy(F.desc("count"))
            ))
        .filter('sensor_rank = 1')
        .orderBy(F.col("count").desc())
        .show()
)

                                                                                

+----------+--------+---------+-----+-----------+
|group_name|    name|sensor_id|count|sensor_rank|
+----------+--------+---------+-----+-----------+
|  FGHQWR2Q|E1AD07D4|     9349|  604|          1|
|  NQWPA8D3|98B84035|     4990|  582|          1|
|  NQWPA8D3|98B84035|     9777|  582|          1|
|  VAPQY59S|43B81579|     1127|  580|          1|
|  FGHQWR2Q|5310B9D7|     7150|  579|          1|
|  PA92NCXZ|09C37FB8|     9400|  577|          1|
|  VAPQY59S|2C195700|     7966|  575|          1|
|  Z9K1SAP4|4E834E81|     6639|  573|          1|
|  NQWPA8D3|86083278|     8992|  572|          1|
|  9N127Z5P|78FFAD0C|      582|  570|          1|
|  FGHQWR2Q|E54B5C3A|     9664|  570|          1|
|  VAPQY59S|3329175B|     5162|  567|          1|
|  FGHQWR2Q|CF304D24|     9343|  566|          1|
|  9N127Z5P|ADE40E7F|     4860|  562|          1|
|  PA92NCXZ|9AD15F7E|     8680|  561|          1|
+----------+--------+---------+-----+-----------+

