
# Extract data from TimescaleDB, aggregate it and load the aggregated data back to TimescaleDB. 

This notebook shows you how to import and aggregate data from JDBC TimescaleDB database back into a TimescaleDB database using Python.

## Step 0: Set Timezone & Get the parameters

In [0]:
# set the timezone of the spark session
# otherwise the timezone information of data from TimescaleDB disapper
spark.conf.set("spark.sql.session.timeZone", "Asia/Bangkok")

# the resample period has to be in minute
resample_period = dbutils.widgets.get("resample_period")
if not isinstance(resample_period, int):
    resample_period = int(resample_period)

table_name = dbutils.widgets.get("table_name")
destination_table_name = dbutils.widgets.get("destination_table_name")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3736157526499560>, line 6[0m
[1;32m      3[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.sql.session.timeZone[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mAsia/Bangkok[39m[38;5;124m"[39m)
[1;32m      5[0m [38;5;66;03m# the resample period has to be in minute[39;00m
[0;32m----> 6[0m resample_period [38;5;241m=[39m dbutils[38;5;241m.[39mwidgets[38;5;241m.[39mget([38;5;124m"[39m[38;5;124mresample_period[39m[38;5;124m"[39m)
[1;32m      7[0m [38;5;28;01mif[39;00m [38;5;129;01mnot[39;00m [38;5;28misinstance[39m(resample_period, [38;5;28mint[39m):
[1;32m      8[0m     resample_period [38;5;241m=[39m [38;5;28mint[39m(resample_period)

File [0;32m/databricks/python_shell/dbruntime/WidgetHandlerImpl.py:42[0m, in [0;36m


## Step 1: Connection information

First define some variables to programmatically create these connections.

In [0]:
driver = "org.postgresql.Driver"

database_host = "alto-workshop-timescaledb.postgres.database.azure.com"
database_port = "5432" # update if you use a non-default port
database_name = "postgres" # eg. postgres
user = "solemnLizard"
password = "af6f4b55-48e0-4fe1-a2b6-67869a28776e"

url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}"

print(url)

jdbc:postgresql://alto-workshop-timescaledb.postgres.database.azure.com:5432/postgres


## Step 2: Constructing the filter

We will filter out the data between certain time periods.

For now, we will get the current timestamp with ```pendulum``` library. In this case, we can't really do retry or backfilling. We may need to improve this somehow. 

In [0]:
import pendulum

end = pendulum.now(tz='Asia/Bangkok')
# check if the end is every resample_period minutes
if end.minute % resample_period == 0:
    # change the seconds and micro seconds to 0
    end = end.set(second=0, microsecond=0)
else:
    # if not, round it down to the nearest resample_period minutes
    end = end.subtract(minutes=end.minute % resample_period)
    # change the seconds and micro seconds to 0
    end = end.set(second=0, microsecond=0)

# start is resample_period minutes before end
start = end.subtract(minutes=resample_period)

extract_query = f"""(SELECT * FROM {table_name} WHERE timestamp >= '{start}' AND timestamp < '{end}') as filtered_data"""

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-3736157526499557>, line 5[0m
[1;32m      3[0m end [38;5;241m=[39m pendulum[38;5;241m.[39mnow(tz[38;5;241m=[39m[38;5;124m'[39m[38;5;124mAsia/Bangkok[39m[38;5;124m'[39m)
[1;32m      4[0m [38;5;66;03m# check if the end is every resample_period minutes[39;00m
[0;32m----> 5[0m [38;5;28;01mif[39;00m end[38;5;241m.[39mminute [38;5;241m%[39m resample_period [38;5;241m==[39m [38;5;241m0[39m:
[1;32m      6[0m     [38;5;66;03m# change the seconds and micro seconds to 0[39;00m
[1;32m      7[0m     end [38;5;241m=[39m end[38;5;241m.[39mset(second[38;5;241m=[39m[38;5;241m0[39m, microsecond[38;5;241m=[39m[38;5;241m0[39m)
[1;32m      8[0m [38;5;28;01melse[39;00m:
[1;32m      9[0m     [38;5;66;03m# if not, round it down to the nearest resample_period minut


## Step 2: Reading the data

We will extract the data from **TimescaleDB** with the above filter applied.

In [0]:
source_table = (spark.read
    .format("jdbc")
    .option("driver", driver)
    .option("url", url)
    .option("dbtable", extract_query)
    .option("user", user)
    .option("password", password)
    .load()
)

In [0]:
# view the dataframe
if not source_table.isEmpty():
  display(source_table)
else:
  print(f"There is no data between {start} and {end}")

timestamp,device_id,aggregation_type,datapoint,value
2023-09-28T13:59:00.000+0700,eb27641363d2b2a091jdar,mode_1min,online_status,"""online"""
2023-09-28T13:59:00.000+0700,eb27641363d2b2a091jdar,mode_1min,presence_state,"""unoccupied"""
2023-09-28T13:59:00.000+0700,eb27641363d2b2a091jdar,mean_1min,sensitivity,100.0
2023-09-28T13:59:00.000+0700,eba63b92a045a9e8dbibaj,mean_1min,noise,72.0
2023-09-28T13:59:00.000+0700,eba63b92a045a9e8dbibaj,mean_1min,temperature,27.0
2023-09-28T13:59:00.000+0700,eba63b92a045a9e8dbibaj,mean_1min,co2,421.0
2023-09-28T13:59:00.000+0700,eba63b92a045a9e8dbibaj,mean_1min,pm25,2.0
2023-09-28T13:59:00.000+0700,eba63b92a045a9e8dbibaj,mean_1min,illuminance,40.0
2023-09-28T13:59:00.000+0700,eba63b92a045a9e8dbibaj,mean_1min,humidity,75.0
2023-09-28T13:58:00.000+0700,eb27641363d2b2a091jdar,mode_1min,online_status,"""online"""


## Step 3: Aggregate the data

We will aggregate the data based on resample_period.

Currently we only support the ```mean``` function for "numeric" values and ```mode``` for "string" values.

In [0]:
if source_table.isEmpty():
  print(f"There is no data between {start} and {end}")
else:
  from pyspark.sql import functions as F

  # convert to value that can be used in spark window function
  resample_period = '{} minutes'.format(resample_period)

  # separate the df based on the datatype of the "value" column
  numeric_df = source_table.filter(source_table["value"].cast("double").isNotNull())
  string_df  = source_table.filter(source_table["value"].cast("double").isNull())

In [0]:
from pyspark.sql.functions import mean, avg, floor, window, min, mode, lit

if source_table.isEmpty():
  print(f"There is no data between {start} and {end}")
else:
  if numeric_df.isEmpty():
    pass
  else:
    window_spec = window("timestamp", f"{resample_period}")
    numeric_df = numeric_df.groupBy("device_id", "datapoint", window_spec).agg(
      mean("value").alias("value"),
      min("timestamp").alias("timestamp")
    )
    numeric_df = numeric_df.withColumn("aggregation_type", lit(f"mean_{resample_period}"))

  if string_df.isEmpty():
    pass
  else:
    window_spec = window("timestamp", f"{resample_period}")
    string_df = string_df.groupBy("device_id", "datapoint", window_spec).agg(
      mode("value").alias("value"),
      min("timestamp").alias("timestamp")
    )
    string_df = string_df.withColumn("aggregation_type", lit(f"mode_{resample_period}"))

In [0]:
if source_table.isEmpty():
  print(f"There is no data between {start} and {end}")
else:
  if numeric_df.isEmpty() and string_df.isEmpty():
    df_to_export = None
  elif not numeric_df.isEmpty() and string_df.isEmpty():
    df_to_export = numeric_df
  elif numeric_df.isEmpty() and not string_df.isEmpty():
    df_to_export = string_df
  elif not numeric_df.isEmpty() and not string_df.isEmpty():
    # concat the two dataframes
    df_to_export = numeric_df.union(string_df)
  if df_to_export is None:
    pass
  else:
    df_to_export = df_to_export.select("timestamp","device_id", "aggregation_type", "datapoint", "value")
    display(df_to_export)

timestamp,device_id,aggregation_type,datapoint,value
2023-09-28T13:55:00.000+0700,eba63b92a045a9e8dbibaj,mean_5 minutes,illuminance,40.666666666666664
2023-09-28T13:55:00.000+0700,eba63b92a045a9e8dbibaj,mean_5 minutes,temperature,27.0
2023-09-28T13:55:00.000+0700,eba63b92a045a9e8dbibaj,mean_5 minutes,co2,422.0
2023-09-28T13:55:00.000+0700,eb27641363d2b2a091jdar,mean_5 minutes,sensitivity,100.0
2023-09-28T13:55:00.000+0700,eba63b92a045a9e8dbibaj,mean_5 minutes,noise,71.66666666666667
2023-09-28T13:55:00.000+0700,eba63b92a045a9e8dbibaj,mean_5 minutes,humidity,75.0
2023-09-28T13:55:00.000+0700,eba63b92a045a9e8dbibaj,mean_5 minutes,pm25,2.0
2023-09-28T13:55:00.000+0700,eb27641363d2b2a091jdar,mode_5 minutes,online_status,"""online"""
2023-09-28T13:55:00.000+0700,eb27641363d2b2a091jdar,mode_5 minutes,presence_state,"""unoccupied"""


## Step 4: Write data to TimescaleDB

In [0]:
if source_table.isEmpty():
  print(f"There is no data between {start} and {end}")
else:
  if df_to_export is None:
    pass
  else:
    if not df_to_export.isEmpty():
      df_to_export.write.jdbc(
        url=url,
        table=destination_table_name,
        mode='overwrite',
        properties= {
          'user': user,
          'password': password
        }
      )