#Data Hub to Spark Dataframe Example

##Install Data Hub Python Library

In [None]:
!python -m pip install --upgrade pip
!python -m pip install  adh_sample_library_preview --upgrade

##Imports

In [None]:
from adh_sample_library_preview import ADHClient
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from itertools import repeat
import json
import os
import pandas as pd
import time

##Create a Data Hub client and specify data to pull

In [None]:
client = ADHClient(
  'v1',
  dbutils.secrets.get(scope = "key-vault", key = "DatabricksTenantId"),
  'https://uswe.datahub.connect.aveva.com',
  dbutils.secrets.get(scope = "key-vault", key = "DatabricksClientId"),
  dbutils.secrets.get(scope = "key-vault", key = "DatabricksclientSecret"))
namespace_id = dbutils.secrets.get(scope = "key-vault", key = "DatabricksNamespaceId")

client.request_timeout = 3000

data_view_id = 'Wind Turbine Analysis'
start_index = '2022-04-01T00:00:00.000Z'
end_index = '2022-05-01T00:00:00.000Z'
interval = timedelta(minutes = 1)

##Write function to pull data from Data View and generate a Spark Data Frame

In [None]:
def retryWithBackoff(fn, *args, **kwargs):
  result = None
  failures = 0
  while result is None:
    try:
      result = fn(*args, **kwargs)
    except Exception as error:
      print('request failed retrying...')
      if (2^failures-1) < 3600:
        time.sleep(2 ** failures)
        failures += 1
      else:
        time.sleep(3600)
  
  return result

def dataViewRequest(client, namespace_id, data_view_id, start_index, end_index, interval, cache):
  data_page, next_page, first_page = retryWithBackoff(client.DataViews.getDataInterpolated, namespace_id, data_view_id, start_index=start_index, end_index=end_index, interval = interval, cache = cache)
  df = spark.read.json(sc.parallelize([json.dumps(data_page)]))

  # iterate through each subsequent page of results until there are no more pages
  while next_page:
    data_page, next_page, first_page = retryWithBackoff(client.DataViews.getDataInterpolated, url = next_page)
    df = df.union(spark.read.json(sc.parallelize([json.dumps(data_page)])))
    
  return df

##Simple data request

In [None]:
df = dataViewRequest(client, namespace_id, data_view_id, start_index, end_index, interval, 'Refresh')
print(df.count())
df.show()

##Parallel data request

In [None]:
# split time range from start_index to end_index into a list of intervals equal to the number of parallel executors
number_of_executors = min(32, os.cpu_count() + 4)

# split time range from start_index to end_index into a list of intervals equal to the number of parallel executors
intervals = pd.date_range(start=start_index, end=end_index, freq=interval)
intervals = intervals[0::round(len(intervals)/(number_of_executors+1))]
print(intervals)

# convert the list of intervals to a list of start_indexes and end_indexes
start_indexes = intervals[:-1].tolist()
start_indexes = [i.strftime("%Y-%m-%dT%H:%M:%S") for i in start_indexes]
end_indexes = intervals[1:].tolist()
end_indexes = [(i - interval).strftime("%Y-%m-%dT%H:%M:%S") for i in end_indexes]
end_indexes[-1] = end_index

with ThreadPoolExecutor(max_workers=number_of_executors) as pool:
  # for each start_index and end_index, start a thread to pull data into a data_frame
  data_frames = pool.map(dataViewRequest, repeat(client), repeat(namespace_id), repeat(data_view_id), start_indexes, end_indexes, repeat(interval), repeat('Refresh'))
  
  # combine all the data_frames into one
  df = next(data_frames)
  for data_frame in data_frames:
    df = df.union(data_frame)
    
print(df.count())
df.show()