In [1]:
import os
import sys
sys.path.append("../")

import pandas as pd
import numpy as np
import datetime as dt
import re

from src.utils.parser import parse_config
from src.utils.vault import get_secrets
from src.utils.processing import downcast
import snowflake.connector

config = parse_config(os.path.abspath(os.path.join(os.getcwd(), "../src/settings.yml")))
parquet_file = "../data/raw"

In [2]:
data_config = get_secrets("snowflake")
data_config.update(config["snowflake"]["data"])
snowflake_ctx = snowflake.connector.connect(**data_config)

In [3]:
last_processed = "2000-1-1 15:35:04.518 UTC"
# last_processed = pd.read_parquet("../data/raw", columns=["processed_on"]).max()[0]
last_processed = dt.datetime.strptime(last_processed, "%Y-%m-%d %H:%M:%S.%f %Z").date()
from_date = last_processed - dt.timedelta(days=last_processed.weekday())

channel_like = "register://electricity/0/activepower/%?avg=15"
# channel_like = "register://electricity/0/activepower/l_?avg=15"

query = f"""
SELECT
    t0.BOXID AS BOXID,
    t0.CHANNELID AS CHANNELID,
    YEAROFWEEKISO(t0.DATUMTIJD) AS YEAR,
    WEEKISO(t0.DATUMTIJD) AS WEEK,
    CURRENT_TIMESTAMP AS PROCESSED_ON,
    MAX(t0.WAARDE) AS MAX_VALUE,
    MIN(t0.WAARDE) AS MIN_VALUE
FROM {data_config["database"]}.{data_config["schema"]}.{data_config["table"]} t0
    WHERE t0.CHANNELID LIKE '{channel_like}'
     AND t0.DATUMTIJD >= DATE('{from_date}')
     AND t0.DATUMTIJD < DATEADD(DAY, -DAYOFWEEKISO(CURRENT_DATE), CURRENT_DATE)
     AND t0.BOXID IN ('075.547-1', '069.509-1')
GROUP BY t0.BOXID, t0.CHANNELID, YEAROFWEEKISO(t0.DATUMTIJD), WEEKISO(t0.DATUMTIJD)
"""

In [4]:
%%time
df_query = pd.read_sql(sql=query, con=snowflake_ctx)

CPU times: user 87 ms, sys: 128 ms, total: 215 ms
Wall time: 1min


In [6]:
pattern = channel_like\
    .replace("?", "\?")\
    .replace("/", "\/")\
    .replace("%", "(sumli|l[1,2,3])")
df_query["L"] = df_query["CHANNELID"].str.extract(pattern)

In [7]:
df_query = df_query.apply(downcast, try_numeric=True, category=True)

In [8]:
df_query.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 588 entries, 0 to 587
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype              
---  ------        --------------  -----              
 0   BOXID         588 non-null    category           
 1   CHANNELID     588 non-null    category           
 2   YEAR          588 non-null    uint16             
 3   WEEK          588 non-null    uint8              
 4   PROCESSED_ON  588 non-null    datetime64[ns, UTC]
 5   MAX_VALUE     588 non-null    float32            
 6   MIN_VALUE     588 non-null    float32            
 7   L             588 non-null    category           
dtypes: category(3), datetime64[ns, UTC](1), float32(2), uint16(1), uint8(1)
memory usage: 13.2 KB


In [12]:
df_query.to_parquet(parquet_file, partition_cols=["BOXID", "L"])

In [13]:
df_query

Unnamed: 0,BOXID,CHANNELID,YEAR,WEEK,PROCESSED_ON,MAX_VALUE,MIN_VALUE,L
0,075.547-1,register://electricity/0/activepower/l2?avg=15,2021,1,2021-09-02 09:50:35.731000+00:00,55.125271,10.691280,l2
1,075.547-1,register://electricity/0/activepower/l2?avg=15,2020,52,2021-09-02 09:50:35.731000+00:00,53.982140,13.882600,l2
2,075.547-1,register://electricity/0/activepower/l2?avg=15,2020,20,2021-09-02 09:50:35.731000+00:00,38.883331,-5.431684,l2
3,075.547-1,register://electricity/0/activepower/l2?avg=15,2020,16,2021-09-02 09:50:35.731000+00:00,40.508598,-3.709962,l2
4,075.547-1,register://electricity/0/activepower/l2?avg=15,2020,46,2021-09-02 09:50:35.731000+00:00,50.619869,1.707081,l2
...,...,...,...,...,...,...,...,...
583,075.547-1,register://electricity/0/activepower/l3?avg=15,2019,7,2021-09-02 09:50:35.731000+00:00,44.156551,8.423216,l3
584,069.509-1,register://electricity/0/activepower/l3?avg=15,2021,32,2021-09-02 09:50:35.731000+00:00,0.195667,-53.594250,l3
585,069.509-1,register://electricity/0/activepower/sumli?avg=15,2021,2,2021-09-02 09:50:35.731000+00:00,0.000000,0.000000,sumli
586,069.509-1,register://electricity/0/activepower/l3?avg=15,2021,2,2021-09-02 09:50:35.731000+00:00,0.000000,0.000000,l3


In [14]:
cols = ["BOXID", "VALUE", "YEAR", "WEEK", "TOP", "BOTTOM", "L"]
df_read = (
    pd.read_parquet(parquet_file, columns=cols)
    .query("TOP == 1 & L == 'sumli'")
    .drop(columns=["TOP", "BOTTOM", "L"])
)
    

KeyboardInterrupt: 

In [None]:
df_read.info()

## get meta data about parquet for sake of speed

In [10]:
import pyarrow.parquet as pq
metadata = pq.read_metadata("../data/raw/BOXID=001.622-1/L=l1/7c3663dfe57547cb8fc54c33857b6caf.parquet")

# parquet_file = pq.ParquetFile("../data/raw/BOXID=001.622-1/L=l1/7c3663dfe57547cb8fc54c33857b6caf.parquet")
# metadata = parquet_file.metadata
# metadata.row_group(0).column(6)

## see parquet engine for possibilities to read _COMMON_METEDATA
https://arrow.apache.org/docs/python/parquet.html?highlight=pyarrow%20parquet%20partition

SyntaxError: invalid syntax (<ipython-input-10-972a6d8acd5f>, line 9)