José Luis García Nava
DEPFIE-SCOM
Cloud-based Implementation of Distributed Machine Learning Algorithms for Time Series Forecasting
Parquet Archive as Time Series Database Management System
Updated from MIRD-related research to Spark 3

In [4]:
# change timestamp from string to datetime
# from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import to_timestamp

In [22]:
SOURCE_PARQUET_PATH = '/home/developer/On_Premises/Data_Lake/complete_20160101000000_to_20180809114000'

In [23]:
DEST_PARQUET_PATH = '/home/developer/On_Premises/MIRD_ROOT/data/raw'

In [39]:
devices = ['CPE42015', 'CPE42025']

In [40]:
resolutions = ['ten_min', 'hourly', 'daily']

In [41]:
for device in devices:
    for resolution in resolutions:
        # build a path to the source Parquet archive, per device/resolution (it has multiple-date information)
        path = '{}/{}/{}.parquet'.format(SOURCE_PARQUET_PATH, resolution, device)
        # read the DataFrame into Spark
        raw_df = spark.read.parquet(path)
        # multidate_df is a Spark DataFrame with timestamp as datetime
        # use a Spark DataFrame operation instead of a Spark SQL query
        multidate_df = raw_df.select(to_timestamp(raw_df.timestamp, 'yyyy-MM-dd HH:mm:ss').alias('timestamp'),
                                     'Van', 'Vbn', 'Vcn', 'Vav', \
                                     'ia', 'ib', 'ic', 'iav', \
                                     'kw', 'kvar', 'kwan', 'kwbn', 'kwcn', 'kvaran', 'kvarbn', 'kvarcn', \
                                     'f', 'fp', 'thdvan', 'thdvbn', 'thdvcn', 'thdia', 'thdib', 'thdic', \
                                     'desbV', 'desbI', \
                                     'kwhE', 'kwhR', 'kvarhDel', 'kvarhrec', 'kvarhq3', 'kvarhq4')
    
        # create the temporary view on the Spark DataFrame
        multidate_df.createOrReplaceTempView('multidate_df_view')

        # a DataFrame with all the dates in the multidate DataFrame
        date_df = spark.sql('select substring(timestamp, 1, 10) as date \
                             from multidate_df_view \
                             group by substring(timestamp, 1, 10) \
                             order by substring(timestamp, 1, 10)')
    
        # all dates available for a given device/resolution, to an ordered list
        dates_list = date_df.rdd.map(lambda row : row.date).collect()
        dates_list.sort()

        # now iterate on dates, per device/resolution
        for date in dates_list:
            # build the query string, and change timestamp to string for persistence
            query_string = 'select string(timestamp) as timestamp, \
                                   Van, Vbn, Vcn, Vav, \
                                   ia, ib, ic, iav, \
                                   kw, kvar, kwan, kwbn, kwcn, kvaran, kvarbn, kvarcn, \
                                   f, fp, \
                                   thdvan, thdvbn, thdvcn, thdia, thdib, thdic, \
                                   desbV, desbI, \
                                   kwhE, kwhR, kvarhDel, kvarhrec, kvarhq3, kvarhq4 \
                            from multidate_df_view \
                            where substring(timestamp, 1, 10) = "{}" \
                            order by substring(timestamp, 1, 10)'.format(date)

            # get lectures of a single date from the original DataFrame
            buffer_df = spark.sql(query_string)
            # build a path to the directory where date-based parquet archives reside
            archive_string = '{}/{}/{}.parquet/{}'.format(DEST_PARQUET_PATH, resolution, device, date)
            # now persist buffer_df, with a given max number of partitions
            buffer_df.coalesce(2).write.parquet(archive_string)

In [16]:
# read some DataFrame to verify it
device = 'CPE04025'
resolution = 'daily'
date = '2017-03-10'

In [19]:
test_path = '{}/{}/{}.parquet/{}'.format(DEST_PARQUET_PATH, resolution, device, date)

In [20]:
test_df = spark.read.parquet(test_path)

In [21]:
# verify the persisted DataFrame
test_df.select('timestamp', 'kw', 'thdia').show(200)

+-------------------+-------+-----+
|          timestamp|     kw|thdia|
+-------------------+-------+-----+
|2017-03-10 00:00:00|512.734| 6.08|
|2017-03-10 00:10:00|498.602| 6.34|
|2017-03-10 00:20:00|485.091| 6.65|
|2017-03-10 00:30:00| 489.55| 6.32|
|2017-03-10 00:40:00|476.283| 6.61|
|2017-03-10 00:50:00|471.961|  6.7|
|2017-03-10 01:00:00|473.202| 6.49|
|2017-03-10 01:10:00|472.595| 6.32|
|2017-03-10 01:20:00|450.274| 6.45|
|2017-03-10 01:30:00|445.939| 6.64|
|2017-03-10 01:40:00|455.401| 6.62|
|2017-03-10 01:50:00| 464.06| 6.31|
|2017-03-10 02:00:00|451.797| 6.51|
|2017-03-10 02:10:00|452.021| 6.66|
|2017-03-10 02:20:00| 446.47| 6.62|
|2017-03-10 02:30:00|436.802| 6.83|
|2017-03-10 02:40:00|435.781| 6.82|
|2017-03-10 02:50:00|431.806| 6.77|
|2017-03-10 03:00:00|440.792| 6.93|
|2017-03-10 03:10:00|428.536| 7.31|
|2017-03-10 03:20:00|417.271| 7.56|
|2017-03-10 03:30:00|412.985| 7.72|
|2017-03-10 03:40:00|416.334| 7.51|
|2017-03-10 03:50:00|416.121| 7.33|
|2017-03-10 04:00:00|426.672

In [None]:
# ToDo: trim this dataset to 2018-07-31 23:50:00 to adjust lenght to entire months