# Interpolate big data time series in native pySpark
Databricks pyspark notebook showing how big data time series can be interpolated in spark without using UDF's

In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import timeit

In [0]:
nsensors = 10
ntimestamps = 1000
resample_interval = 60*60  # Resample interval size in seconds
interval_from = '2021-01-01 00:00:00'
interval_to = '2021-01-07 00:00:00'

# Create test data

In [0]:
# Per sensor create ntimestamps random time values between interval_from and interval_to
# Take a sinus based function to calculate a value (take different magnitude and period per sensor)
df_sensors = spark.range(nsensors).selectExpr('id+1 as SensorId')
df_time = spark.range(ntimestamps).selectExpr('id as Range')
df_test = (df_sensors.join(df_time, how='full')
           .withColumn('UnixTimestamp', F.expr(f"round(rand()*(unix_timestamp('{interval_to}')-unix_timestamp('{interval_from}'))+unix_timestamp('{interval_from}'))"))
           .withColumn('Timestamp', F.expr("to_timestamp(UnixTimestamp)"))
           .withColumn('Value', F.expr(f"SensorId * sin(2*3.14/SensorId*(UnixTimestamp - unix_timestamp('{interval_from}'))/(unix_timestamp('{interval_to}')-unix_timestamp('{interval_from}')))"))
           .drop('UnixTimestamp')
           .drop('Range')
           # Make sure no duplicate rows
           .dropDuplicates(['SensorId', 'Timestamp'])
          )

In [0]:
df_test.persist()

In [0]:
df_test.count()

# Resample + interpolate pyspark

In [0]:
df_pyspark = (
      df_test
      # Get timestamp and value of previous measurement
      .selectExpr(
        "SensorId",
        "LAG(Timestamp) OVER (PARTITION BY SensorId ORDER BY Timestamp ASC) as PreviousTimestamp",
        "Timestamp as NextTimestamp",
        "LAG(Value) OVER (PARTITION BY SensorId ORDER BY Timestamp ASC) as PreviousValue",
        "Value as NextValue",
      )
      # To determine resample interval round up start and round down end timeinterval to nearest interval boundary
      .withColumn("PreviousTimestampRoundUp", F.expr(f"to_timestamp(ceil(unix_timestamp(PreviousTimestamp)/{resample_interval})*{resample_interval})"))
      .withColumn("NextTimestampRoundDown", F.expr(f"to_timestamp(floor(unix_timestamp(NextTimestamp)/{resample_interval})*{resample_interval})"))
      # Make sure we don't get any negative intervals (whole interval is within resample interval)
      .filter("PreviousTimestampRoundUp<=NextTimestampRoundDown")
      # Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
      .withColumn("Timestamp", F.expr(f"explode(sequence(PreviousTimestampRoundUp, NextTimestampRoundDown, interval {resample_interval} second)) as Timestamp"))
      # Interpolate value between previous and next
      .selectExpr(
        "SensorId",
        "Timestamp", 
        """(unix_timestamp(Timestamp)-unix_timestamp(PreviousTimestamp))
            /(unix_timestamp(NextTimestamp)-unix_timestamp(PreviousTimestamp))
            *(NextValue-PreviousValue) 
            +PreviousValue
            as Value"""
      )
)

In [0]:
%%timeit  -n 1 -r 1
df_pyspark.count()

In [0]:
display(df_test.filter("SensorId = 1"))

SensorId,Timestamp,Value
1,2021-01-06T04:27:36.000+0000,-0.7547981969481278
1,2021-01-06T03:40:11.000+0000,-0.7769529701063022
1,2021-01-05T18:20:47.000+0000,-0.9625922633825796
1,2021-01-05T18:57:17.000+0000,-0.9550659303542672
1,2021-01-05T14:21:56.000+0000,-0.994926442678897
1,2021-01-05T11:00:33.000+0000,-0.9989604854205512
1,2021-01-05T11:01:03.000+0000,-0.9989769860464404
1,2021-01-01T10:18:31.000+0000,0.4345782877579138
1,2021-01-01T13:19:17.000+0000,0.5488271655921787
1,2021-01-02T19:18:53.000+0000,0.9497979608635498


In [0]:
display(df_pyspark.filter("sensorId=1"))

SensorId,Timestamp,Value
1,2021-01-01T01:00:00.000+0000,0.0435972546365942
1,2021-01-01T02:00:00.000+0000,0.0871106264202934
1,2021-01-01T03:00:00.000+0000,0.1304601733297302
1,2021-01-01T04:00:00.000+0000,0.1735543558075046
1,2021-01-01T05:00:00.000+0000,0.2163240596842531
1,2021-01-01T06:00:00.000+0000,0.2586893132677063
1,2021-01-01T07:00:00.000+0000,0.300554481709672
1,2021-01-01T08:00:00.000+0000,0.341852960663266
1,2021-01-01T09:00:00.000+0000,0.3824993800164611
1,2021-01-01T10:00:00.000+0000,0.4224176616541389
