<a href="https://colab.research.google.com/github/asleybach/googlecolab-notebooks/blob/main/script_gridduck.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Script Gridduck

In [119]:
pip install psycopg2-binary



In [152]:
import calendar
import json
import os

from datetime import datetime, timedelta, date
from dateutil.relativedelta import relativedelta

from time import gmtime, strftime
# from data_acquisition_API import GridDuck, process_readings_to_values
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.expression import update
# from connectionDb import (
#    DATABASE_CONNECTION, ConsumptionResources,
#    Metadata
#)
from sqlalchemy import (
    create_engine, Column, BigInteger,
    String, Boolean, Text, Sequence, MetaData, Table,
    DateTime, Numeric
)
from sqlalchemy.ext.declarative import declarative_base
from getpass import getpass
import requests

In [153]:
# DATA PROCESSING FUNCTIONS
def process_readings_to_values( data_readings):
  """ Returns a list of the values, descarts the indexes and timestamps
  """
  values = []
  if data_readings:
    for dd in data_readings:
      values.append(dd[2])
  return values

In [154]:
# Definition of the tables in the consumption database
Base = declarative_base()
class Metadata(Base):
    __tablename__ = 'consumption_metadata'
    
    id = Column(BigInteger, Sequence('metadata_id_seq'), primary_key=True)
    veId = Column(String(254))
    ve_name = Column(String(128))
    ve_active = Column(Boolean, unique=False)
    ve_city = Column(String(128))
    ve_contry = Column(String(128))
    postalCode = Column(String(128))
    resourceId = Column(String(254))
    resource_name = Column(String(128))
    resource_description = Column(Text)
    resource_active = Column(Boolean, unique=False)
    resource_classifier = Column(String(128))
    baseUnit = Column(String(32))

class ConsumptionResources(Base):
    __tablename__ = 'consumption_resourceconsumption'

    id = Column(BigInteger, Sequence('consumption_resourceconsumption_id_seq'), primary_key=True)
    timestamp = Column(DateTime)
    consumption = Column(String(254))
    reference_resource = Column(String(254))


Metadata.__table__.create(bind=DATABASE_CONNECTION, checkfirst=True)
ConsumptionResources.__table__.create(bind=DATABASE_CONNECTION, checkfirst=True)

In [185]:
# start request session
_DATABASE_CONSUMPTION_URL = getpass('Enter db url value: ')
DATABASE_CONNECTION = create_engine(_DATABASE_CONSUMPTION_URL)
Session = sessionmaker(bind=DATABASE_CONNECTION)
Session = sessionmaker()
Session.configure(bind=DATABASE_CONNECTION)
_session = Session()
session = requests.Session()

Enter db url value: ··········


In [180]:
# definition of the class corresponding to the interaction with the Gridduck api
class GridDuck:
  """ Model for GridDuck API
  """
  def __init__(self, token):
    """
    Creates and checks connection
    Args: token

    """
    self.token = token
    self.url = URL_GD
    # self.resources = []

    self._test_connection()

    self.resources = self.get_resources_list()
    self.resources_metadata = self.get_resources_meta()

    self.names = self.get_names()
    self.good_resources = []
    
  def _test_connection(self):
    """
      Prints if connection is successful
    """
    headers = {"Authorization": f"Bearer {self.token}"}
    url = f"{self.url}/asset?offset=0&limit=500"
    response = session.get(url, headers=headers)
    response.raise_for_status()
    data = response.json()
    if not data:
      print("ERROR - No connection, check API")
    else:
      print('API connection ok')
      return data
    if response.ok:
      return response.json()
    if response.status_code == 404:
      print(f"Url not found {url} -- Status code: {response.status_code} -- {response.text}")
      return {}
      print(f"Faild to connect to Glowmarkt API - code:{response.status_code}")
      response.raise_for_status()
      return {}

  def get_names(self):
    """Returns list of site names"""
    data = self.get_resources_meta()
    names = []
    for dd in data:
      site = dd.get("site_name", 'NO_NAME')
      if not site in names:
        names.append(site)
      names = sorted(names)
      return names

  def get_resources_by_keyword(self, keyword):
    """
    Returns list of resource id that match a keyword on the site name
    """
    names_matched = []
    resources = []

    for res in self.resources_metadata:
      site_name = res.get("site_name", 'NO_NAME')
      if not site_name.find(keyword) == -1:
        if not site_name in names_matched: names_matched.append(site_name)
        resources.append(res.get("id","NO_ID"))

        if resources:
          print(f'Found {len(resources)} resources in {len(names_matched)} site/sites')
        else:
          print(f'Did NOT find any resources or matches for {keyword}')

        return resources

  def is_data_good(self, values, print_stats = False, return_stats= False):
    return _is_data_good(values, print_stats, return_stats)

  # GD API SET OF METHODS 1 - GET REOURCES - /asset?offset=0&limit=500

  def get_resources_meta(self) -> list:
    """
    Gets list of all resources METADATA
    Makes every required query in order to fetchthe complete set
    of available assets for the current oauth token.        
    Returns:
      A list of dictionaries
      Previous name: retrieve_all_resources_raw_version
    """
    # Get assest. Beware of pagination
    headers = {"Authorization": f"Bearer {self.token}"}
    custom_url = f"{self.url}/asset?offset=0&limit=500"
    response = session.get(custom_url, headers=headers)
    response.raise_for_status()

    data = response.json()
    if not data:
      return []

    assets = data.get("results")
    pagination_urls = data.get("links")

    while pagination_urls.get("next") is not None:
      try:
        custom_url = pagination_urls.get("next")  # python 3.8  <3
        more, pagination_urls = self._get_resources_meta(custom_url, headers)
        assets.extend(more)
      except Exception:
        #logger.exception("error retrieve all")
        print("error retrieve all in pagination")
        break
    return assets

  def _get_resources_meta(self, custom_url, headers):
    """Aux function of retrieve_all_resources_raw_version to deal with pagination
    """
    response = session.get(custom_url, headers=headers)
    response.raise_for_status()
    data = response.json()
    if data.get("results"):
      return data["results"], response.get("links")
    return [], response.get("links")

  def get_resources_list(self) -> list:
    """
    Gets a list of all resources ids
    Previous name: retrieve_all_resources
    """
    return [r.get("id") for r in self.get_resources_meta() ]

  #  metadata for all resources
  def get_resources_meta_summary(self):
    """
      Gets list of resources with metadata
      previous name: retrieve_all_resources_metadata_GD
    """
    return [
      {
        "id": r.get("id"),
        "name": r.get("name").strip(),
        "site_name": r.get("site_name"),
        "site_id": r.get("site_id"),
        "code": r.get("code"),
        "installed_at": r.get("created_stamp"),
        "status": r.get("status")
      }
      for r in self.get_resources_meta()
        ]

  def get_resource_meta(self, res_id):
    """
      Gets resource meta data, all metadata available
    """
    for resource in self.resources_metadata:
      if res_id == resource.get("id"):
        return resource
    return {}
  def get_resource_meta_summary(self, res_id):
    """
      Gets resource meta data, from get_resources_meta_summary so a cleaned version of data
    """
    meta = self.get_resources_meta_summary()
    for resource in meta:
      if resource.get('id') == res_id:
        return resource
      return []

  '''def get_resource_meta(self, didentifier):
    """
    Returns all the metadata for a specific resource
    """
    #resources = self.get_resources_list()
    resources = self.get_resources_meta()
    for resource in resources:
      if identifier == resource.get("id"):
        return resource
    return {}'''

  def get_resource(self, res_id: str) -> dict:
    """Retrieves a single asset by id
      Calls to the asset
    """
    headers = {"Authorization": f"Bearer {self.token}"}
    custom_url = f"{self.url}/asset/{res_id}"
    response = session.get(custom_url, headers=headers)
    response.raise_for_status()
    return response.json()

  # CONSUMPTION READINGS
  def _get_readings(self, asset_id: str, timestamp_start, timestamp_end, granularity) -> list:
    """
      # Gets the data given parameters
      Previous name: _retrieve_readings_params_GD
    """
    MIN = 60
    HOUR = 60 * 60
    if granularity == "min": granularity = int(MIN * 1) 
    if granularity == "1min": granularity = int(MIN * 1)
    if granularity == "2min": granularity = int(MIN * 2)
    if granularity == "5min": granularity = int(MIN * 5)
    if granularity == "10min": granularity = int(MIN * 10)
    if granularity == "15min": granularity = int(MIN * 15)
    if granularity == "30min": granularity = int(MIN * 30)
    if granularity == "H": granularity = int(HOUR * 1)
    if granularity == "D": granularity = int(HOUR * 24)
    if granularity == "W": granularity = int(HOUR * 24 * 7)
    if granularity == "M": granularity = int(HOUR * 24 * 30)
    #if granularity == "Y": granularity = "P1Y"

    headers = {"Authorization": f"Bearer {self.token}"}
    custom_url = (
        f"{self.url}/asset/{asset_id}/consumption/{timestamp_start}"
        f"/{timestamp_end}/{granularity}"
    )
    response = session.get(custom_url, headers=headers)

    if response.status_code == 400:
        print(f"Bad request {response.text}")

    response.raise_for_status()
    data = response.json()
        
    if not data:
        return []

    readings = []
    for rr in data:
      readings.append([rr.get('time'),datetime.utcfromtimestamp(rr.get('time')).strftime("%Y-%m-%d %H:%M"), rr.get('consumption')])
      
    return readings 
  def get_readings(self, asset_id, start_date, period_days, granularity = 3600)  -> list:
    """
    Gets the data - queries day by day
      
    start_date -> datetime(2020, 11, 1)
    period -> number of days
    granularity -> number of seconds
    """

    DAYSEC = 86400
    SLEEP_TIME = 0.01

    start_date = start_date.strftime('%s')

    try:
      readings = []
      for ii in range(period_days):
        starti = int(start_date) + (ii * DAYSEC)
        endi = starti + DAYSEC
        res = self._get_readings(asset_id, str(starti), str(endi), granularity)
        if res:
          for rr in res:
            readings.append(rr)
            time.sleep(SLEEP_TIME)
      return readings
    except Exception as e:
      print(f"ERROR during resource {asset_id} on day {starti}")
      print(e)
 
  def get_readings_new(self, resid, prints = False):
    """
      For 1 resource id: returns Readings for the prototyping period.
      Checks for good data
    """ 
    timestamp_start = datetime(2020,11,10,0,0).strftime('%s')
    timestamp_end = datetime(2020,12,28,0,0).strftime('%s')
    granularity = 60*60*24

    #START_DATE = datetime(2020, 11, 1)
    #PERIOD_DAYS = 32
        
    data = self._get_readings(resid, timestamp_start, timestamp_end, granularity)
        
    if data:
      values = process_readings_to_values(data)
      is_good_data = self.is_data_good(values, prints)
      if is_good_data == False:
        print(f'-This resource does not have good data-')
      return values, data

    else:
      print("Error reading data. Seems empty. Data:")
      print(data)
      return None

  def save_readings(self, user_res_id, start_date, period_days, granularity = 3600, filename_prefix = "", filename_suffix = ""):
    """ 
    Retreives data and saves to a file
    start_date format -> datetime(2020, 11, 1)
    """

    if filename_suffix == "": filename_suffix = "_" + granularity 
    filename = filename_prefix + user_res_id + filename_suffix + ".csv"
      
    readings = self.get_readings(user_res_id, start_date, period_days, granularity)
    readings = process_readings_trim_start(readings)
    df = pd.DataFrame(readings, columns = ['datetime', 'date', 'data'] )
    df = df.set_index('datetime')
    #df.to_csv(r"~/Downloads/" + user_res_id + FILETYPE)
    df.to_csv(filename)
    print(f"File saved: {filename}")

In [186]:
# gridDuck token
TOKEN_GD = getpass('Enter gridDuck token: ')
URL_GD = "https://v1.api.gridduck.com"
gd = GridDuck(TOKEN_GD)

Enter gridDuck token: ··········
API connection ok


In [193]:
## Handling of the data obtained corresponding to the resources
all_resources = gd.resources_metadata
# variable to control the last 3 months
month_period = 3
# current date
#date_now = datetime.now()
now = date.today()
date_now = datetime.strptime(now.ctime(), '%c')
# start of the date from the control month
start_date = date_now - relativedelta(months=month_period)
# days of the month
period_days = calendar.monthrange(start_date.year, start_date.month)[1] + 1
# type of period to obtain consumption data
period = 'min'

In [188]:
data_consumption = {
    'timestamp': [],
    'consumption': [],
    'reference_resource': []
}

data_resources_inactive = {
    'veId': [],
    've_name': [],
    've_active': [],
    've_city': [],
    've_contry': [],
    'postalCode': [],
    'resourceId': [],
    'resource_name': [],
    'resource_description': [],
    'resource_active': [],
    'resource_classifier': [],
    'baseUnit': []
}

In [None]:
## Verification and loading of metadata for all available active resources
for resource in all_resources:
  if resource.get('status') == 'CONNECTED':
    # print(resource.get('id'))
    start_date = date_now - relativedelta(months=month_period)
    verify = _session.query(Metadata).filter(Metadata.resourceId == resource.get('id')).first()
    
    if verify:
      _session.execute(
        update(
          Metadata,
          values= {
            'veId':resource.get('site_id'),
            've_name':resource.get('site_name'),
            've_active':True if resource.get('status') == 'CONNECTED' else False,
            've_city':resource.get('site_name'),
            've_contry':resource.get('site_name'),
            'postalCode':resource.get('sku'),
            'resource_name':resource.get('name'),
            'resource_description':resource.get('gateway_name'),
            'resource_active':True if resource.get('status') == 'CONNECTED' else False,
            'resource_classifier':resource.get('user_state'),
            'baseUnit':"Kwh"
          }
          ).where(
              Metadata.resourceId == resource.get('id')
          )
      )
      _session.commit()
    else:
      metadata = Metadata(
        veId=resource.get('site_id'),
        ve_name=resource.get('site_name'),
        ve_active=True if resource.get('status') == 'CONNECTED' else False,
        ve_city=resource.get('site_name'),
        ve_contry=resource.get('site_name'),
        postalCode=resource.get('sku'),
        resourceId=resource.get('id'),
        resource_name=resource.get('name'),
        resource_description=resource.get('gateway_name'),
        resource_active=True if resource.get('status') == 'CONNECTED' else False,
        resource_classifier=resource.get('user_state'),
        baseUnit="Kwh"
        )
      _session.add(metadata)
      _session.commit()
      print(F"The resource data is saved in the metadata table: {resource.get('id')}, Date: {start_date}")
      for data in range(month_period):
        reading = gd.get_readings(resource.get('id'), start_date, period_days, period)
        values = process_readings_to_values(reading)
        is_good_data = gd.is_data_good(values, False)
          
        if is_good_data:
          for consumption in reading:
            time_consumption = consumption[1]
            consumption_value = consumption[2]
            if consumption_value != 0 and consumption_value is not None:
              verify = session.query(ConsumptionResources).filter(
                  ConsumptionResources.timestamp == time_consumption,  ConsumptionResources.reference_resource == resource.get('id')
              ).first()
              if not verify:
                consumptio_resources = ConsumptionResources(
                    timestamp=time_consumption,
                    consumption=consumption_value,
                    reference_resource=resource.get('id')
                )
                _session.add(consumptio_resources)
                _session.commit()
                print(F"The resource data is saved in the consumption table: {resource.get('id')}, Date: {time_consumption}")
              else:
                print(F"The consumption data already exists. Date exists: {time_consumption} and resource exists: {resource.get('id')}")
        start_date = start_date + relativedelta(months=1)
        period_days = calendar.monthrange(start_date.year, start_date.month)[1] + 1
  else:
    print(F"Resource not active >> : {resource.get('id')}")
    data_resources_inactive['veId'].append(resource.get('site_id'),)
    data_resources_inactive['ve_name'].append(resource.get('site_name'),)
    data_resources_inactive['ve_active'].append(True if resource.get('status') == 'CONNECTED' else False,)
    data_resources_inactive['ve_city'].append(resource.get('site_name'),)
    data_resources_inactive['ve_contry'].append(resource.get('site_name'),)
    data_resources_inactive['postalCode'].append(resource.get('sku'),)
    data_resources_inactive['resourceId'].append(resource.get('id'),)
    data_resources_inactive['resource_name'].append(resource.get('name'),)
    data_resources_inactive['resource_description'].append(resource.get('gateway_name'),)
    data_resources_inactive['resource_active'].append(True if resource.get('status') == 'CONNECTED' else False,)
    data_resources_inactive['resource_classifier'].append(resource.get('user_state'),)
    data_resources_inactive['baseUnit'].append("Kwh")
    
  
_session.close()
