<a href="https://colab.research.google.com/github/alessandro-rubin/databricks_training/blob/main/Find_consecutive_values.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AIM: to find samples with consecutive values of a signal in a dataframe with signals from many different vehicles

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=e94872ad9b56f039025b3bb9e129b90d3f0b5bcee932e792690529ba3089b518
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [50]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag, lead
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pandas as pd
import datetime
from datetime import datetime
spark = SparkSession.builder.appName("example").getOrCreate()


def find_consecutive_intervals(df, signal_column, v_0, n):
    # Define a window specification to order rows by datetime
    window_spec = Window.orderBy("datetime")

    # Create a column that indicates when the signal is equal to v_0
    df = df.withColumn("is_v_0", (F.col(signal_column) == v_0).cast("integer"))

    # Create a column that assigns a group ID to consecutive rows with the same is_v_0 value
    df = df.withColumn(
        "group_id",
        F.sum("is_v_0").over(window_spec.rowsBetween(Window.unboundedPreceding, 0))
    )

    # Create a column that counts the number of consecutive v_0 rows within each group
    df = df.withColumn(
        "consecutive_count",
        F.when(F.col("is_v_0") == 1, F.sum("is_v_0").over(window_spec)).otherwise(0)
    )
    df.orderBy('vehicle','datetime').show()

    # Filter rows where consecutive_count is greater than or equal to n
    filtered_df = df.filter(F.col("consecutive_count") >= n)

    # Calculate the start and end of each interval and interval length in samples
    result_df = filtered_df.groupBy("group_id").agg(
        F.min("datetime").alias("start_datetime"),
        F.max("datetime").alias("end_datetime"),
        F.count("*").alias("interval_length_samples")
    )

    return result_df

def find_consecutive_intervals2(df, signal_column,vehicle_col,time_col, v_0, n):
    # Define a window specification to order rows by datetime
    window_spec = Window.orderBy(time_col)

    # Create a column that indicates when the signal is equal to v_0
    df = df.withColumn("is_v_0", (F.col(signal_column) == v_0).cast("integer"))

    # Create a column that assigns a group ID to consecutive rows with the same is_v_0 value
    df = df.withColumn(
        "group_id",
        -F.sum("is_v_0").over(window_spec) + F.row_number().over(window_spec)
    )

    # Create a column that counts the number of consecutive v_0 rows within each group
    df = df.withColumn(
        "consecutive_count",
        F.when(F.col("is_v_0") == 1, F.sum("is_v_0").over(Window.partitionBy(vehicle_col,"group_id"))).otherwise(0)
    )
    df.orderBy(vehicle_col,'datetime').show()

    # Filter rows where consecutive_count is greater than or equal to n
    filtered_df = df.filter(F.col("consecutive_count") >= n)
    filtered_df.show()
    # Calculate the start and end of each interval and interval length in samples
    result_df = filtered_df.groupBy("group_id").agg(
        F.min(time_col).alias("start_datetime"),
        F.max(time_col).alias("end_datetime"),
        F.count("*").alias("interval_length_samples"),
        F.first(vehicle_col)
    )

    return result_df
# Usage example:
# Assuming 'df' is your PySpark DataFrame
columns=["vehicle", "signal", "datetime"]


In [43]:
n_rows=1000
veh=[f'veh_{i}' for i in np.random.randint(1,3,n_rows)]
signal=np.maximum(np.random.randint(1,15,n_rows),10)
signal

array([10, 13, 10, 10, 11, 10, 12, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10,
       13, 10, 10, 10, 10, 11, 13, 10, 11, 10, 10, 11, 10, 10, 10, 13, 12,
       10, 10, 14, 10, 10, 10, 10, 10, 11, 10, 10, 10, 10, 10, 14, 10, 10,
       10, 10, 10, 10, 10, 12, 10, 10, 10, 10, 12, 10, 14, 10, 10, 11, 10,
       14, 10, 10, 10, 10, 10, 10, 10, 14, 13, 10, 10, 14, 10, 10, 10, 10,
       10, 10, 10, 10, 10, 10, 14, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10,
       10, 12, 10, 10, 10, 13, 10, 14, 10, 10, 13, 10, 10, 10, 10, 10, 10,
       10, 10, 11, 10, 11, 11, 10, 10, 10, 10, 11, 10, 10, 10, 11, 10, 10,
       10, 10, 10, 10, 10, 10, 13, 10, 10, 11, 10, 10, 14, 11, 10, 10, 14,
       10, 10, 12, 10, 10, 10, 10, 10, 10, 11, 10, 10, 10, 10, 10, 10, 10,
       10, 10, 10, 10, 13, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10,
       10, 10, 10, 10, 10, 10, 11, 13, 10, 14, 10, 10, 10, 13, 10, 10, 12,
       10, 12, 13, 10, 10, 10, 11, 10, 10, 10, 10, 10, 10, 10, 11, 10, 10,
       10, 10, 10, 10, 10

In [44]:

datelist = pd.date_range(datetime.today(), periods=n_rows,freq='S').tolist()


In [52]:
data=[(veh[i],float(signal[i]),datetime.strftime(datelist[i],'%Y-%m-%d %H:%M:%S') ) for i in range(n_rows) ]

In [53]:
df = spark.createDataFrame(data, columns)
df.show()
n=7

+-------+------+-------------------+
|vehicle|signal|           datetime|
+-------+------+-------------------+
|  veh_1|  10.0|2023-09-13 18:32:27|
|  veh_1|  13.0|2023-09-13 18:32:28|
|  veh_2|  10.0|2023-09-13 18:32:29|
|  veh_2|  10.0|2023-09-13 18:32:30|
|  veh_1|  11.0|2023-09-13 18:32:31|
|  veh_1|  10.0|2023-09-13 18:32:32|
|  veh_1|  12.0|2023-09-13 18:32:33|
|  veh_1|  10.0|2023-09-13 18:32:34|
|  veh_2|  10.0|2023-09-13 18:32:35|
|  veh_1|  10.0|2023-09-13 18:32:36|
|  veh_2|  10.0|2023-09-13 18:32:37|
|  veh_1|  10.0|2023-09-13 18:32:38|
|  veh_2|  10.0|2023-09-13 18:32:39|
|  veh_1|  10.0|2023-09-13 18:32:40|
|  veh_1|  10.0|2023-09-13 18:32:41|
|  veh_1|  10.0|2023-09-13 18:32:42|
|  veh_2|  10.0|2023-09-13 18:32:43|
|  veh_2|  13.0|2023-09-13 18:32:44|
|  veh_1|  10.0|2023-09-13 18:32:45|
|  veh_1|  10.0|2023-09-13 18:32:46|
+-------+------+-------------------+
only showing top 20 rows



In [47]:
result = find_consecutive_intervals(df,'signal',6, n)
result.show()


+-------+------+-------------------+------+--------+-----------------+
|vehicle|signal|           datetime|is_v_0|group_id|consecutive_count|
+-------+------+-------------------+------+--------+-----------------+
|  veh_1|  10.0|2023-09-13 18:32:27|     0|       0|                0|
|  veh_1|  13.0|2023-09-13 18:32:28|     0|       0|                0|
|  veh_1|  11.0|2023-09-13 18:32:31|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:32|     0|       0|                0|
|  veh_1|  12.0|2023-09-13 18:32:33|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:34|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:36|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:38|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:40|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:41|     0|       0|                0|
|  veh_1|  10.0|2023-09-13 18:32:42|     0|       0|                0|
|  veh

In [54]:

result = find_consecutive_intervals2(df,'signal','vehicle','datetime',10, n)
result.show()

+-------+------+-------------------+------+--------+-----------------+
|vehicle|signal|           datetime|is_v_0|group_id|consecutive_count|
+-------+------+-------------------+------+--------+-----------------+
|  veh_1|  10.0|2023-09-13 18:32:27|     1|       0|                1|
|  veh_1|  13.0|2023-09-13 18:32:28|     0|       1|                0|
|  veh_1|  11.0|2023-09-13 18:32:31|     0|       2|                0|
|  veh_1|  10.0|2023-09-13 18:32:32|     1|       2|                1|
|  veh_1|  12.0|2023-09-13 18:32:33|     0|       3|                0|
|  veh_1|  10.0|2023-09-13 18:32:34|     1|       3|                6|
|  veh_1|  10.0|2023-09-13 18:32:36|     1|       3|                6|
|  veh_1|  10.0|2023-09-13 18:32:38|     1|       3|                6|
|  veh_1|  10.0|2023-09-13 18:32:40|     1|       3|                6|
|  veh_1|  10.0|2023-09-13 18:32:41|     1|       3|                6|
|  veh_1|  10.0|2023-09-13 18:32:42|     1|       3|                6|
|  veh