In [0]:
dbutils.widgets.text("device_name","I:IB","Device")
dbutils.widgets.text("max_interval",".15","Maximum Lag Interval")

In [0]:
import datetime
import math
from pyspark.sql.functions import to_timestamp, year, month, dayofmonth, hour, minute, second, col, substring_index, length, pow, max, lag, expr, row_number, desc, unix_timestamp, avg
from pyspark.sql.window import Window

In [0]:
device_name = dbutils.widgets.get('device_name')
max_interval = round((float(dbutils.widgets.get('max_interval'))+1)/1000, 4)

if max_interval < 10:
  bin_size = 10
else:
  bin_size = int(math.floor(max_interval / 10.0)) * 10

In [0]:
print(max_interval) #made it a lot smaller... than it is: 599995.9990978241 so it wasn't tied to multiple gold timesteps ALTHOUGH we will want that later
print(bin_size)
print(device_name)

In [0]:
device1 = spark.read.format("delta").load('/mnt/gmps/delta/just_in_case/'+str(device_name))
timedf = spark.sql("SELECT goldFile,goldTS FROM global_temp.accelbase")

In [0]:
display(device1)

time,B|GMPSSC
2019-06-02T05:00:00.098+0000,407.0
2019-06-02T05:09:58.462+0000,407.0
2019-06-02T05:19:56.636+0000,407.0
2019-06-02T05:29:54.755+0000,407.0
2019-06-02T05:39:52.855+0000,407.0
2019-06-02T05:49:50.907+0000,407.0
2019-06-02T05:59:49.880+0000,407.0
2019-06-02T06:09:48.005+0000,407.0
2019-06-02T06:19:46.098+0000,407.0
2019-06-02T06:29:44.282+0000,407.0


In [0]:
display(timedf.orderBy('goldTS'))

goldFile,goldTS
MLEventData_1561985088.8237517_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.046+0000
MLEventData_1561990903.4338753_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.046+0000
MLEventData_1561985088.8237517_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.112+0000
MLEventData_1561990903.4338753_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.112+0000
MLEventData_1561985088.8237517_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.179+0000
MLEventData_1561990903.4338753_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.179+0000
MLEventData_1561990903.4338753_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.246+0000
MLEventData_1561985088.8237517_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.246+0000
MLEventData_1561985088.8237517_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.312+0000
MLEventData_1561990903.4338753_From_EventC_2019-05-02 00:00:00_to_2019-05-03 00:00:00.h5,2019-05-02T00:00:00.312+0000


In [0]:
actual_max_interval = 45*60

In [0]:
look_back = 'INTERVAL '+str(actual_max_interval)+' second'

testingdf = (timedf.join(device1.hint("range_join", bin_size),
                           (device1.time<=timedf.goldTS) & (device1.time>=(timedf.goldTS - expr(str(look_back)))),"inner"))

#display(testingdf)

In [0]:
windowSpec = (
  Window 
    .partitionBy(testingdf.goldTS)
    .orderBy(testingdf.time.desc())
)

In [0]:
test2 = (testingdf.withColumn("rowno",row_number().over(windowSpec))
         .filter(col("rowno") == 1)
         .drop("rowno")
        )

test2.cache()

#display(test2)

In [0]:
test3 = (test2.withColumn("timediff",
                         unix_timestamp('goldTS')
                         + substring_index('goldTS', '.', -1).cast('long')/pow(10,length(substring_index('goldTS', '.', -1))) 
                         - (unix_timestamp('time') 
                            + substring_index('time', '.', -1).cast('long')/pow(10,length(substring_index('time', '.', -1))))
                        )
       )

display(test3)

goldFile,goldTS,time,B|GMPSSC,timediff
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:02.085+0000,2019-06-02T05:00:00.098+0000,407.0,1.9860010147094729
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:03.351+0000,2019-06-02T05:00:00.098+0000,407.0,3.2520010471343994
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:16.814+0000,2019-06-02T05:00:00.098+0000,407.0,16.715000867843628
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:17.347+0000,2019-06-02T05:00:00.098+0000,407.0,17.248000860214233
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:19.214+0000,2019-06-02T05:00:00.098+0000,407.0,19.11500096321106
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:22.146+0000,2019-06-02T05:00:00.098+0000,407.0,22.04700088500977
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:41.673+0000,2019-06-02T05:00:00.098+0000,407.0,41.57400107383728
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:02.664+0000,2019-06-02T05:00:00.098+0000,407.0,62.565001010894775
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:12.593+0000,2019-06-02T05:00:00.098+0000,407.0,72.49400091171265
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:32.452+0000,2019-06-02T05:00:00.098+0000,407.0,92.35300087928772


In [0]:
spark.sql("set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = false")
spark.sql("set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true")

test3.write.format("delta").mode("append").save("/mnt/gmps/delta/silver/B|GMPSSC_again")

In [0]:
#test3.write.format("delta").mode("append").save("/mnt/gmps/delta/silver/"+str(device_name)+"_complete")

In [0]:
dbutils.notebook.exit(str(device_name)+" successfully executed.")

I:MDAT40 successfully executed.

In [0]:
%fs
ls /mnt/gmps/delta/silver/B|GMPSSC_again

path,name,size
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/_delta_log/,_delta_log/,0
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00000-3d0b8c0c-9449-49b1-8dd7-9d11d52db7ed-c000.snappy.parquet,part-00000-3d0b8c0c-9449-49b1-8dd7-9d11d52db7ed-c000.snappy.parquet,128841077
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00000-8d510e83-d09f-47f1-bb93-a0818931642c-c000.snappy.parquet,part-00000-8d510e83-d09f-47f1-bb93-a0818931642c-c000.snappy.parquet,14635606
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00001-755da43f-3286-4181-a242-4967c18142aa-c000.snappy.parquet,part-00001-755da43f-3286-4181-a242-4967c18142aa-c000.snappy.parquet,128905144
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00001-d8965084-0dc8-4880-9af0-b6b4b2533555-c000.snappy.parquet,part-00001-d8965084-0dc8-4880-9af0-b6b4b2533555-c000.snappy.parquet,14640774
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00002-84f5f169-ba4a-414d-ab49-6024f38b9940-c000.snappy.parquet,part-00002-84f5f169-ba4a-414d-ab49-6024f38b9940-c000.snappy.parquet,14660654
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00002-bf63be90-b65e-4e6a-95a8-bb3c650a0b5a-c000.snappy.parquet,part-00002-bf63be90-b65e-4e6a-95a8-bb3c650a0b5a-c000.snappy.parquet,128930626
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00003-3e289247-ced6-4146-aceb-6909bf23fb19-c000.snappy.parquet,part-00003-3e289247-ced6-4146-aceb-6909bf23fb19-c000.snappy.parquet,14644038
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00003-5c3160f3-a950-4724-8a4b-d7a8a4cc0e77-c000.snappy.parquet,part-00003-5c3160f3-a950-4724-8a4b-d7a8a4cc0e77-c000.snappy.parquet,128955463
dbfs:/mnt/gmps/delta/silver/B|GMPSSC_again/part-00004-0ac1d043-1d26-4507-a956-eb4632e9b0ea-c000.snappy.parquet,part-00004-0ac1d043-1d26-4507-a956-eb4632e9b0ea-c000.snappy.parquet,14670393


In [0]:
a = spark.read.format("delta").load('/mnt/gmps/delta/silver/B|GMPSSC_again')

goldFile,goldTS,time,B|GMPSSC,timediff
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:40.140+0000,2019-06-02T05:00:00.098+0000,407.0,40.04100108146668
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:00:50.203+0000,2019-06-02T05:00:00.098+0000,407.0,50.10400104522705
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:05.663+0000,2019-06-02T05:00:00.098+0000,407.0,65.56400108337402
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:22.255+0000,2019-06-02T05:00:00.098+0000,407.0,82.15600109100342
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:36.317+0000,2019-06-02T05:00:00.098+0000,407.0,96.21800088882446
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:40.516+0000,2019-06-02T05:00:00.098+0000,407.0,100.41700100898744
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:01:57.179+0000,2019-06-02T05:00:00.098+0000,407.0,117.08000087738036
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:02:03.045+0000,2019-06-02T05:00:00.098+0000,407.0,122.94600105285645
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:02:36.647+0000,2019-06-02T05:00:00.098+0000,407.0,156.5480010509491
MLEventData_1561846766.0503128_From_EventC_2019-06-02 00:00:00_to_2019-06-03 00:00:00.h5,2019-06-02T05:03:03.248+0000,2019-06-02T05:00:00.098+0000,407.0,183.1490008831024


In [0]:
a.count()