In [1]:
dbutils.widgets.text("device_name","I:IB","Device")
dbutils.widgets.text("max_interval",".15","Maximum Lag Interval")

In [2]:
import datetime
import math
from pyspark.sql.functions import to_timestamp, year, month, dayofmonth, hour, minute, second, col, substring_index, length, pow, max, lag, expr, row_number, desc, unix_timestamp, avg
from pyspark.sql.window import Window

In [3]:
device_name = dbutils.widgets.get('device_name')
max_interval = round((float(dbutils.widgets.get('max_interval'))+1)/1000, 4)

if max_interval < 10:
  bin_size = 10
else:
  bin_size = int(math.floor(max_interval / 10.0)) * 10

In [4]:
print(max_interval)
print(bin_size)
print(device_name)

In [5]:
device1 = spark.read.format("delta").load('/mnt/delta/'+str(device_name))

timedf = spark.sql("SELECT goldFile,goldTS FROM global_temp.accelbase")

In [6]:
look_back = 'INTERVAL '+str(max_interval)+' second'

testingdf = (timedf.join(device1.hint("range_join", bin_size),
                           (device1.time<=timedf.goldTS) & (device1.time>=(timedf.goldTS - expr(str(look_back)))),"inner"))

#display(testingdf)

goldFile,goldTS,file,time,value
MLEventData_1583910003.0025566_From_EventC_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.148+0000,MLParamData_1583906408.4261804_From_MLrn_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.015+0000,7033.23361377
MLEventData_1583910003.0025566_From_EventC_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.148+0000,MLParamData_1583906408.4261804_From_MLrn_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.082+0000,6977.38644603
MLEventData_1583910003.0025566_From_EventC_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.148+0000,MLParamData_1583906408.4261804_From_MLrn_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.147+0000,6261.59665404


In [7]:
windowSpec = (
  Window 
    .partitionBy(testingdf.goldTS)
    .orderBy(testingdf.time.desc())
)

In [8]:
test2 = (testingdf.withColumn("rowno",row_number().over(windowSpec))
         .filter(col("rowno") == 1)
         .drop("rowno")
        )

test2.cache()

#display(test2)

goldFile,goldTS,file,time,value
MLEventData_1583910003.0025566_From_EventC_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.148+0000,MLParamData_1583906408.4261804_From_MLrn_2020-03-10 00:00:00_to_2020-03-11 00:00:00.h5,2020-03-10T05:00:00.147+0000,6261.59665404


In [9]:
test3 = (test2.withColumn("timediff",
                         unix_timestamp('goldTS')
                         + substring_index('goldTS', '.', -1).cast('long')/pow(10,length(substring_index('goldTS', '.', -1))) 
                         - (unix_timestamp('time') 
                            + substring_index('time', '.', -1).cast('long')/pow(10,length(substring_index('time', '.', -1))))
                        )
       )

display(test3)

goldFile,goldTS,file,time,value,timediff
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:05.856+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:05.795+0000,503.540037,0.0609998703002929
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:13.320+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:13.262+0000,450.59203917,0.0570008754730224
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:26.581+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:26.531+0000,7033.53878955,0.0500001907348632
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:29.514+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:29.464+0000,5977.32541497,0.0490009784698486
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:36.911+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:00:36.865+0000,2259.36888723,0.0460000038146972
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:05.570+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:05.531+0000,503.540037,0.0390000343322753
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:09.170+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:09.131+0000,3185.57737953,0.0390000343322753
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:46.037+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:46.003+0000,4115.14280541,0.0330009460449218
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:53.439+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:53.406+0000,593.87206788,0.0329999923706054
MLEventData_1575360002.723245_From_EventC_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:55.506+0000,MLParamData_1575356421.3855522_From_MLrn_2019-12-02 00:00:00_to_2019-12-03 00:00:00.h5,2019-12-02T06:01:55.472+0000,7033.53878955,0.0330009460449218


In [10]:
spark.sql("set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = false")
spark.sql("set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true")

In [11]:
test3.write.format("delta").mode("append").save("/mnt/gmps/delta/silver/"+str(device_name))

In [12]:
dbutils.notebook.exit(str(device_name)+" successfully executed.")

I:MDAT40 successfully executed.