# Temporal Pattern Mining 

Implementación del proyecto de Big Data.


## Integrantes:
- Jose de Lama Zegarra
- Esteban Villacorta Garcia
- Juan Galvez Ccopa


## Link a Colab:

El notebook depende parcialmente de funcionalidad unica de Google Colab para el uso de Google Drive. Para poder usarse, acceder al siguiente enlace:

https://colab.research.google.com/drive/1aMZVr_aSYxYiZBY2-_t2CYBR5fad9x4s?usp=sharing

El historial de cambios del notebook está tambien disponible en Colab.


## Dataset
El dataset puede descargarse del siguiente enlace:
https://s3.amazonaws.com/nist-netzero/2015-data-files/All-Subsystems-minute.csv

Este archivo debe agregarse a Drive para que el notebook pueda funcionar correctamente.


In [None]:
%pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ef4d168f556aebeb4bea997aa7f4adf51f1ed9403ed4cac3864b9615ab3e0484
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import pandas as pd
import numpy as np
import random
import tqdm

from datetime import datetime
import matplotlib.pyplot as plt
import plotly.express as px

import pyspark, pyspark.sql.dataframe
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [None]:
os.makedirs("drive/MyDrive/bigdata", exist_ok=True)

In [None]:
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[4]") \
      .appName("SparkByExamples.com") \
      .config("spark.ui.port", "4050") \
      .getOrCreate()

# Read the data


In [None]:
# Read the data
data = spark.read.csv('/content/drive/MyDrive/All-Subsystems-minute.csv', header=True)

# Cast all columns to Floats, except for Timestamp and DayOfWeek since they are special
types = {col: "float" for col in data.columns}
types["Timestamp"] = "timestamp"
types["DayOfWeek"] = "string"

data = data.select([F.col(c).cast(types[c]) for c in data.columns])
data.show()

+-------------------+---------------+-------------------------+------------------------+---------------------+----------------------------------+---------------------------------+--------------------+-------------------+--------------------+------------------------+-------------------+--------------------+-----------------------------+----------------------------+---------------------------------+-------------------------------+----------------------------+--------------------------------+------------------------------+---------------------------+-----------------------+-------------------+--------------------+----------------------+---------------------+---------------------+-----------------------+----------------------+----------------------+-----------------------+----------------------+----------------------+---------------------------+--------------------------+---------------------------+--------------------------+-------------------------+--------------------------+------------

In [None]:
N = data.count()

# Select boolean columns


In [None]:
boolean_columns = [
    "DHW_StatusSolenoidColdMBATub",
    "DHW_StatusSolenoidColdMBAShower",
    "DHW_StatusSolenoidHotMBATub",
    "DHW_StatusSolenoidHotMBAShower",
    "Load_StatusApplianceCooktop",
    "Load_StatusApplianceDishwasher",
    "Load_StatusApplianceOven",
    "Load_StatusApplianceRangeHood",
    "Load_StatusLatentload",
    "Load_StatusPlugLoadBlender",
    "Load_StatusPlugLoadBR2Laptop",
    "Load_StatusPlugLoadBR3Laptop",
    "Load_StatusPlugLoadCanOpener",
    "Load_StatusPlugLoadCoffeeMaker",
    "Load_StatusPlugLoadDesktopPCMonitor",
    "Load_StatusPlugLoadFan",
    "Load_StatusPlugLoadHairDryerCurlIron",
    "Load_StatusPlugLoadHandMixer",
    "Load_StatusPlugLoadHeatingPad",
    "Load_StatusPlugLoadIron",
    "Load_StatusPlugLoadLRBlueRay",
    "Load_StatusPlugLoadLRTV",
    "Load_StatusPlugLoadMBRBlueRay",
    "Load_StatusPlugLoadMBRTV",
    "Load_StatusPlugLoadSlowCooker",
    "Load_StatusPlugLoadToaster",
    "Load_StatusPlugLoadToasterOven",
    "Load_StatusPlugLoadVacuum",
    "Load_StatusPlugLoadVideoGame",
    "Load_StatusBR4Lights",
    "Load_StatusMBALights",
    "Load_StatusMBRLights1",
    "Load_StatusMBRLights2",
    "Load_StatusBA2Lights",
    "Load_StatusBR2Lights",
    "Load_StatusBA1Lights",
    "Load_StatusBR3Lights",
    "Load_StatusSensHeatPrntBDOWN",
    "Load_StatusSensHeatPrntAUP",
    "Load_StatusKitchenLightsA",
    "Load_StatusSensHeatPrntBUP",
    "Load_StatusSensHeatChildAUP",
    "Load_StatusSensHeatChildBUP",
    "Load_StatusKitchenLightsB",
    "Load_StatusSensHeatChildBDOWN",
    "Load_StatusLRLights1",
    "Load_StatusSensHeatPrntADOWN",
    "Load_StatusSensHeatChildADOWN",
    "Load_StatusKitchenLightsC",
    "Load_StatusLRLights2",
    "Load_StatusDRLights",
    "Load_StatusLRLights3",
    "Load_StatusEntryHallLights",
]


In [None]:
# test_columns = boolean_columns[:10]
TIME_COL = "TimeStamp_Count"
TIME_TYPE = FloatType()

In [None]:
df = data.select(TIME_COL, *boolean_columns)
df.createOrReplaceTempView("EVENTS")


In [None]:
df.show()

+---------------+----------------------------+-------------------------------+---------------------------+------------------------------+---------------------------+------------------------------+------------------------+-----------------------------+---------------------+--------------------------+----------------------------+----------------------------+----------------------------+------------------------------+-----------------------------------+----------------------+------------------------------------+----------------------------+-----------------------------+-----------------------+----------------------------+-----------------------+-----------------------------+------------------------+-----------------------------+--------------------------+------------------------------+-------------------------+----------------------------+--------------------+--------------------+---------------------+---------------------+--------------------+--------------------+--------------------+---

In [None]:

def append_partition_index(partition_index, partition): 
    for item in partition:
        yield tuple([partition_index, *item]) 


In [None]:
import pyspark.sql.functions as f
from pyspark.sql.functions import lit, udf, col, concat
from pyspark.sql import Window

In [None]:
# w = Window.orderBy("TimeStamp_Count")
# def process_event(C):
#     test = df.select("TimeStamp_Count", C)
#     test = test.withColumn(f"Type", F.lit(C))
#     test = test.where(f"{C} IN (0.0, 1.0)")
#     test = test.withColumn(f"{C}_Prev", F.lag(C).over(w))
#     test = test.where(f"{C} != {C}_Prev")
#     test = test.withColumn(f"T_Prev", F.lag("TimeStamp_Count", default=1.0).over(w))
#     test = test.select("Type", "T_Prev", F.col("TimeStamp_Count").alias("T_Cur") , (F.col(C) - F.col(C+"_Prev")).alias("ChangeBy"))
# #     return test

w = Window.orderBy(TIME_COL)

def is_on(x): return int(x == 1.0)
is_on_func = udf(is_on, IntegerType())

def process_event(C):
    return df.select(F.col(TIME_COL), is_on_func(C).alias(C))\
        .withColumn("isNewOn", ((F.lag(C, default=0).over(w) == 0) & (F.col(C) == 1)).cast(IntegerType()))\
        .filter(F.col(C) == 1)\
        .withColumn("isOnGroup", F.sum("isNewOn").over(w))\
        .drop("isNewOn")\
        .groupby("isOnGroup").agg(
            F.min(TIME_COL).alias("start"), 
            F.max(TIME_COL).alias("end"))\
        .withColumn('EventName', F.lit(C))
    

In [None]:
!rm -rf drive/MyDrive/bigdata/*

for column in tqdm.tqdm(boolean_columns):
    test = process_event(column)
    test.write.format("csv").save(f"drive/MyDrive/bigdata/{column}")

100%|██████████| 53/53 [13:23<00:00, 15.16s/it]


In [None]:
# #  better?
# from functools import reduce
# events_per_column = [process_event(column) for column in tqdm.tqdm(boolean_columns)]
# all_events = reduce(lambda a, b: a.union(b), events_per_column)
# all_events = all_events.cache()
# all_events.write.format("csv").save(f"all_events")

In [None]:
PARTITIONS = 32
SIGMA = 0.8
DELTA = 0.8
MAXIMAL_DT = 1.0 * 4 * 60

In [None]:
schema = StructType([
    StructField("InstanceId", StringType(), True),
    StructField("Start", TIME_TYPE, True),
    StructField("End", TIME_TYPE, True),
    StructField("EventName", StringType(), True)]
)

pre_dseq = spark.read.csv([f"drive/MyDrive/bigdata/{c}" for c in boolean_columns[:20]], header=False, schema=schema)
pre_dseq = pre_dseq.select("EventName", "Start", "End")

In [None]:
pre_dseq.show()

+--------------------+------+------+
|           EventName| Start|   End|
+--------------------+------+------+
|DHW_StatusSolenoi...| 211.0| 211.0|
|DHW_StatusSolenoi...| 240.0| 240.0|
|DHW_StatusSolenoi...| 365.0| 376.0|
|DHW_StatusSolenoi...| 378.0| 378.0|
|DHW_StatusSolenoi...| 392.0| 399.0|
|DHW_StatusSolenoi...| 401.0| 401.0|
|DHW_StatusSolenoi...| 405.0| 412.0|
|DHW_StatusSolenoi...| 414.0| 414.0|
|DHW_StatusSolenoi...| 418.0| 418.0|
|DHW_StatusSolenoi...| 946.0| 954.0|
|DHW_StatusSolenoi...| 972.0| 979.0|
|DHW_StatusSolenoi...|1062.0|1062.0|
|DHW_StatusSolenoi...|1077.0|1077.0|
|DHW_StatusSolenoi...|1197.0|1197.0|
|DHW_StatusSolenoi...|1292.0|1292.0|
|DHW_StatusSolenoi...|1317.0|1317.0|
|DHW_StatusSolenoi...|1332.0|1332.0|
|DHW_StatusSolenoi...|1628.0|1628.0|
|DHW_StatusSolenoi...|1657.0|1657.0|
|DHW_StatusSolenoi...|1780.0|1787.0|
+--------------------+------+------+
only showing top 20 rows



In [None]:
def consolidate_ranges(partition):
    for item in partition:
        yield item[0], (item[1], item[2])

IntervalType = StructType([
    StructField("Start", TIME_TYPE, True),
    StructField("End", TIME_TYPE, True)
])

schema_dseq = StructType([
    StructField("ID", LongType(), False),
    StructField("Event", StringType(), False),
    StructField("Interval", IntervalType, False),
])

schema_dev = StructType([
    StructField("Event", StringType(), False),
    StructField("Intervals", MapType(LongType(), ArrayType(IntervalType)), False),
    StructField("Bitfield", ArrayType(LongType()), False),
])

schema_1Freq = StructType([
    StructField("Event", StringType(), False),
    StructField("Intervals", MapType(LongType(), ArrayType(IntervalType)), False),
    StructField("Bitfield", ArrayType(LongType()), False),
    StructField("Support", FloatType(), False),
])

schema_2FreqPairs = StructType([
    StructField("Event_1", schema_1Freq, False),
    StructField("Event_2", schema_1Freq, False),
    StructField("Bitfield", ArrayType(LongType()), False),
    StructField("Support", FloatType(), False),
    StructField("Confidence", FloatType(), False)
])



In [None]:
dseq = pre_dseq\
    .repartitionByRange(PARTITIONS, "Start")\
    .sortWithinPartitions("Start")\
    .rdd\
    .mapPartitions(consolidate_ranges)\
    .mapPartitionsWithIndex(append_partition_index)

In [None]:
dseqDF = dseq.toDF(schema=schema_dseq)
dseqDF.show()

64

In [None]:
def event_in_front(x):
    id, event, instance = x
    return event, {id: [instance]}

def merge_dols(dol1, dol2):
    keys = set(dol1).union(dol2)
    no = []
    return dict((k, dol1.get(k, no) + dol2.get(k, no)) for k in keys)

def build_bitmap(dol):
    return [int(i in dol) for i in range(PARTITIONS)]
    

In [None]:
dev = dseq\
    .map(event_in_front)\
    .reduceByKey(lambda a, b: merge_dols(a, b))\
    .map(lambda x: (*x, build_bitmap(x[1])))

In [None]:
devDF = dev.toDF(schema=schema_dev)
devDF.show()

+--------------------+--------------------+--------------------+
|               Event|           Intervals|            Bitfield|
+--------------------+--------------------+--------------------+
|Load_StatusLatent...|{0 -> [{997.0, 14...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{1087.0, 1...|[1, 1, 1, 1, 1, 1...|
|DHW_StatusSolenoi...|{0 -> [{918.0, 92...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{1087.0, 1...|[1, 1, 1, 1, 1, 1...|
|Load_StatusApplia...|{0 -> [{1059.0, 1...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{1097.0, 1...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{368.0, 38...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{1087.0, 1...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{1297.0, 1...|[1, 1, 1, 1, 1, 1...|
|Load_StatusApplia...|{0 -> [{1205.0, 1...|[1, 1, 1, 1, 1, 1...|
|Load_StatusApplia...|{0 -> [{7662.0, 7...|[1, 1, 1, 1, 1, 1...|
|Load_StatusPlugLo...|{0 -> [{398.0, 40...|[1, 1, 1, 1, 1, 1...|
|DHW_StatusSolenoi...|{0 

In [None]:
def single_support(x): 
    return sum(x[2]) / PARTITIONS


In [None]:
_1Freq = dev\
    .map(lambda x: (*x, single_support(x)))\
    .filter(lambda x: x[3] >= SIGMA)

In [None]:
_1FreqDF = _1Freq.toDF(schema=schema_1Freq)
_1FreqDF.show()

+--------------------+--------------------+--------------------+-------+
|               Event|           Intervals|            Bitfield|Support|
+--------------------+--------------------+--------------------+-------+
|Load_StatusLatent...|{0 -> [{997.0, 14...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusPlugLo...|{0 -> [{1087.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|DHW_StatusSolenoi...|{0 -> [{918.0, 92...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusPlugLo...|{0 -> [{1087.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusApplia...|{0 -> [{1059.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusPlugLo...|{0 -> [{1097.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusPlugLo...|{0 -> [{368.0, 38...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusPlugLo...|{0 -> [{1087.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusPlugLo...|{0 -> [{1297.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusApplia...|{0 -> [{1205.0, 1...|[1, 1, 1, 1, 1, 1...|    1.0|
|Load_StatusApplia...|{0 -> [{7662.0, 7...|[1, 1, 1

In [None]:
def bitmap_and(b1, b2):
    return list(a & b for a, b in zip(b1, b2))

def add_merged_bitmap(x):
    e1, i1, b1, sup1 = x[0]
    e2, i2, b2, sup2 = x[1]
    return x[0], x[1], bitmap_and(b1, b2)

def add_support_conf_2pairs(x):
    e1, i1, b1, sup1 = x[0]
    e2, i2, b2, sup2 = x[1]
    bm12 = x[2]
    
    sup12 = sum(bm12) / PARTITIONS
    conf = sup12 / max(sup1, sup2)
    return (*x, sup12, conf)

In [None]:
_2FreqPairs = _1Freq\
    .cartesian(_1Freq)\
    .map(add_merged_bitmap)\
    .map(add_support_conf_2pairs)

In [None]:
_2FreqPairsDF = _2FreqPairs.toDF(schema=schema_2FreqPairs)
_2FreqPairsDF.show()

+--------------------+--------------------+--------------------+-------+----------+
|             Event_1|             Event_2|            Bitfield|Support|Confidence|
+--------------------+--------------------+--------------------+-------+----------+
|{Load_StatusLaten...|{Load_StatusLaten...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusLaten...|{Load_StatusPlugL...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusPlugL...|{Load_StatusLaten...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusPlugL...|{Load_StatusPlugL...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusLaten...|{DHW_StatusSoleno...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusLaten...|{Load_StatusPlugL...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusPlugL...|{DHW_StatusSoleno...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusPlugL...|{Load_StatusPlugL...|[1, 1, 1, 1, 1, 1...|    1.0|       1.0|
|{Load_StatusLaten...|{Load_StatusAppli...|[1, 1, 1, 1, 1, 1...|    1.0|    

In [None]:
def create_2_patterns(x):
    e1, e2, b12, s12, c12 = x
    n1, i1, b1, s1 = e1
    n2, i2, b2, s2 = e2
    return n1, n2, i1, i2, s1, s2, s12, c12


In [None]:
_2FreqPatterns = _2FreqPairs\
    .map(create_2_patterns)\
    .filter(lambda x: x[4] >= SIGMA and x[5] >= SIGMA and x[7] >= DELTA)

In [None]:
_2FreqPatterns = _2FreqPatterns.toDF()
_2FreqPatterns.show()