In [0]:
# Import the necessary functions for the pi tag compile job
from pyspark.sql.functions import *
import pandas as pd
# Use arrow conversion for spark DF to pandas DF
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


**Global Variables**
- `IWW_MASTER_TAGLIST`: This reads the table IWW_PI_MASTER_TAGLIST that is in the Azure SQL Database (edsqutarmsqluw2005). This table is a Dimension table that stores the master list of PI tags that are in scope for the project. This allows to add / remove tags in one location.
- `All_tag_value`: This read the table IWW_ANALYTICS_TAGVALUE_OILSANDS_DA in the Azure SQL Database (edsqutarmsqluw2003). This table is a Fact Table that holds all the recorded tag values in PI that are streamed into Azure by Magnotix. This is table only stores approximately the last week of values

In [0]:
#Reading IWW Master PI tag list
iww_tags_SDF = spark.read.format("jdbc") \
	.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
	.option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2005") \
	.option("dbtable", "MASTER_TAG_LIST") \
	.option("user", "ABO_IWWD_Admin") \
	.option("password","!WWD_ad_2019").load()

#Change to active directory authentication
#Create key vault

#Using Pandas dataframes with arrow conversion
tag_list = iww_tags_SDF.select("*").toPandas()

# tag_list.head()

#Reading last tag update time aka high watermark... this allows only loading in new values
last_tag_SDF = spark.read.format("jdbc") \
	.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
	.option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2005") \
	.option("dbtable", "MAX_TAG_TIME") \
	.option("user", "ABO_IWWD_Admin") \
	.option("password","!WWD_ad_2019").load()

last_tag = last_tag_SDF.select("*").toPandas()

print("Last loaded tag timestamp: ", last_tag)

In [0]:
#Reading daily (DA) PI tag value list created by APM base plant project
#This means for daily PI reads... existing APM datafactory+databricks flows must be active

all_tag_value_SDF = spark.read.format("jdbc") \
	.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
	.option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2003") \
	.option("query", "SELECT * FROM [dbo].[APM_ANALYTICS_TAGVALUE_OILSANDS_DA]") \
	.option("user", "ABO_IWWD_Admin")\
	.option("password", "!WWD_ad_2019").load()
all_tag_values = all_tag_value_SDF.select("*").toPandas()

#Joining on tags only in the IWW inventory list
iww_tag_values = pd.merge(tag_list, all_tag_values, on='TagName', how='inner')



In [0]:
#Find the most recent tag
max_time = iww_tag_values['TagTime'].max()

print("Newest tag time from magnotix is: ", max_time)


In [0]:

#drop tag values that have already been transferred to IWW db

iww_tag_values_trim = iww_tag_values['TagTime'] > last_tag

import warnings
# Squashing type conversion warning messages. This is simply float/double warnings
warnings.showwarning = lambda *args, **kwargs: None
iww_tag_values_trim_SDF = spark.createDataFrame(iww_tag_values_trim)

#need to put max into a dataframe format for sql write
max_time_df = pd.DataFrame([[max_time]],columns=['Time'])
max_time_SDF = spark.createDataFrame(max_time_df)



In [0]:
#Writing the new most recent time to the watermark table

max_time_SDF.write.format("jdbc").mode("overwrite") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2005") \
    .option("dbtable", "MAX_TAG_TIME") \
    .option("user", "ABO_IWWD_Admin") \
    .option("password", "!WWD_ad_2019").save()

In [0]:
if len(iww_tag_values_trim) > 2:
  #Writing results to IWW Tag Value table for usage
  iww_tag_values_trim_SDF.write.format("jdbc").mode("append") \
      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
      .option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2005") \
      .option("dbtable", "PI_TAGVALUES_DA") \
      .option("user", "ABO_IWWD_Admin") \
      .option("password", "!WWD_ad_2019").save()

In [0]:
# Read in the tags from the database
iww_tags = spark.read.format("jdbc") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2005") \
    .option("dbtable", "PI_TAGVALUES") \
    .option("user", "ABO_IWWD_Admin") \
    .option("password", "!WWD_ad_2019").load()

#Using Pandas dataframes with arrow conversion
tag_list = iww_tags.select("*").toPandas()

In [0]:
# Get the dimensions of the dataframe
cols = len(tag_list.columns)
rows = len(tag_list)

# Initiate a blank column to act as a placeholder before the new column is populated
blanks = [None] * rows

# Add columns for the sampling point and sampling type
tag_list.insert(loc=cols, column='Sampling Point', value=blanks, allow_duplicates=False)
tag_list.insert(loc=cols, column='Sampling Type', value=blanks, allow_duplicates=False)

In [0]:
# Create a list of the possible sampling points
sampling_points = '34DAF_OUT', 'P32_32G40', 'POND_4G', 'POND_A_OUT', 'PONDC', 'PONDD_TO_B', 'PONDE', 'WW_PONDA_OUT', 'WW_PONDE_OUT', 'WW_PONDA', 'WW_PONDE'

# Create a list of the possible sampling types
sampling_types = 'CL_AQ', 'RECYCLE_R24HR_AVG', 'RECYCLE_YSTD_MAX', 'RECYCLE_YSTD_MIN', 'CHLORIDE_AQ', 'CHLORIDE_AQ_R1HR_AVG', 'CHLORIDE_AQ_R24HR_AVG', 'OG_24HR_AVG', 'OG_R1HR_AVG', 'OG_R24HR_AVG', 'OG_YSTD_MAX', 'OG_YSTD_MIN', 'CHLORIDE_DELTA', 'CHLORIDE_R1HR_AVG', 'CHLORIDE_R24HR_AVG', 'CHLORIDE_YSTD_MAX', 'CHLORIDE_YSTD_MIN', 'OG_HEX', 'TSS_WATER', 'OG', 'COD', 'TSS', 'PH', 'CHLORIDE'

In [0]:
# Iterate through the rows and pull out the sampling point and type from the tag name - ignore all flow rate tags
for i, row in tag_list.iterrows():
  tag_name = tag_list.at[i, 'TagName']
  sampling_point = ''
  sampling_type = ''
  
  if isinstance (tag_name, str):
    for point in sampling_points:
      if point in tag_name:
        if point == 'P32_32G40' or point == 'PONDC':
          sampling_point = 'Pond C'
        elif point == 'POND_A_OUT' or point == 'WW_PONDA_OUT' or point == 'WW_PONDA':
          sampling_point = 'Pond A Outfall'
        elif point == 'PONDE' or point == 'WW_PONDE_OUT' or point == 'WW_PONDE':
          sampling_point = 'Pond E Outfall'
        elif point == '34DAF_OUT':
          sampling_point = '34DAF'
        elif point == 'POND_4G':
          sampling_point = 'Pond 4G'
        elif point == 'PONDD_TO_B':
          sampling_point = 'Pond D to B'
        else:
          sampling_point = point
      tag_list.at[i, 'Sampling Point'] = sampling_point
      
    for tp in sampling_types:
        if tp in tag_name:
          sampling_type = tp.replace("_", " ")
    if sampling_type is '':
      print('Unhandled sampling_type: ' + tag_name)
    else:
      tag_list.at[i, 'Sampling Type'] = sampling_type

In [0]:
#Writing results to New IWW Tag Value table for usage
iww_tag_values_SDF = spark.createDataFrame(tag_list)
iww_tag_values_SDF.write.format("jdbc").mode("overwrite") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .option("url", "jdbc:sqlserver://sqlqutarmsvruw2003.database.windows.net;databaseName=edsqutarmsqluw2005") \
    .option("dbtable", "PI_TAGVALUES_PROCESSED") \
    .option("user", "ABO_IWWD_Admin") \
    .option("password", "!WWD_ad_2019").save()