In [1]:
import os
import sys
import warnings
import numpy as np
import pandas as pd
import time
import datetime as dt
import re

from IPython.display import display, HTML, clear_output
from IPython.core.interactiveshell import InteractiveShell
import ipywidgets as widgets

from matplotlib import pyplot as plt
import seaborn as sns

from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
warnings.filterwarnings('ignore')
InteractiveShell.ast_node_interactivity = "all"
display(HTML("<style>.container { width:79% !important; }</style>"))

# easier-to-read notebook:
from IPython.display import display, HTML, clear_output
pd.options.display.max_columns = 70
pd.options.display.max_rows = 500
%load_ext autoreload
%autoreload 2

r = re.compile('.*maoz*.')
if len(list(filter(r.match, sys.path))) != 0:
    PHYTECH_DRIVE_PATH = os.environ['PHYTECH_DRIVE_PATH']
    GITHUB_PATH = PHYTECH_DRIVE_PATH + 'GitHub'
    DATA_WD = PHYTECH_DRIVE_PATH + '/Data Integrity/SM anomalies/data'
    CERT_PATH = PHYTECH_DRIVE_PATH + '/Data'
    if CERT_PATH not in sys.path:
        sys.path.append(CERT_PATH)
    from sql_import_export import SqlImporter
    import cert_aws as c
    sql_importer = SqlImporter(database = c.database_research, user = c.user_research, password = c.password_research,
                                host = c.host_research, port = c.port_research, verbose=True)

local aws_cert.py loaded


In [2]:
import project_class_data_extract
from logic_parameters import default_latitude, default_height
from tqdm import tqdm_notebook
#import common_db as cdb

- You call the 'Project' class
- you bring in the data: project.load_sm_project_data
- you apply transformers on the data (to fill missing values, for example): project.apply_transformers()
 - and then you group the TS by depths:
    project.group_data_to_depths()

In [3]:
SM_HOURLY_DIFF_FIRST_DEPTH = 0.5
SM_HOURLY_DIFF_SECOND_DEPTH = 0.25
MIN_IRR_AMOUNT = 1
MAX_RESPONDING_EVENTS = 1
LOW_RESPONSE_FACTOR = 0.5


#project_list = [852093, 871812, 851995, 852015]
summary_df = pd.DataFrame()
yesterday = (dt.date.today() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
start_date = (dt.date.today() - dt.timedelta(days=8)).strftime("%Y-%m-%d")
failed_list = []

In [8]:
#project_results = find_not_responding_events(project_data)
query = f"""
    SELECT distinct(pm.project_id)
    FROM projects_metadata pm
    JOIN soil_sensors_metadata ssm
    ON pm.project_id = ssm.project_id
    JOIN project_irrigation_spans_v2 pis
    ON pis.project_id = pm.project_id
    WHERE time_zone like '%Los_Angeles'
    AND season = 2023
    AND type_id IN (90, 91, 92, 98, 117, 118, 124, 127, 135, 137)
    AND start_date >= CAST((CAST('{start_date}' AS timestamp)) AS date)
    AND active=true
    ORDER BY project_id
    """
sql_importer = SqlImporter(query=query, database=c.database_production, user=c.user_production, password=c.password_production,
                                host=c.host_production, port=c.port_production, verbose=True)

sql_importer.get_data()
project_list = sql_importer.data
project_list

Loaded table with 2293 lines from projects_metadata


Unnamed: 0,project_id
0,848545
1,848547
2,848548
3,849303
4,849304
...,...
2288,877954
2289,878012
2290,878013
2291,878566


In [None]:
for p_id in project_list['project_id'][:50]:
    #print(list(project_list['project_id']).index(p_id))
    try:
        project_data = load_project_data(project_id=p_id, min_date=start_date,
                      max_date=yesterday, min_depth=10, max_depth=91, debug=False)
        if project_data.valid_project:
            if len(project_data.df_irrigation) == 0:
                continue
            project_df = get_project_results(project_data)
            summary_df = pd.concat([summary_df,project_df],axis=0)
        else:
            failed_list.append(p_id)
    except Exception as e:
        print(p_id, e)
        failed_list.append(p_id)
        pass

display(summary_df[(summary_df.not_responding_events_count / summary_df.irrigation_events) > 0.5])

In [118]:
test_project_data(850176)

https://app.phytech.com/14596/79944/850176


('project_data.valid_project', False)

In [88]:
def test_project_data(p_id, debug=False):
    yesterday = (dt.date.today() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
    start_date = (dt.date.today() - dt.timedelta(days=8)).strftime("%Y-%m-%d")
    try:
        project_data = load_project_data(project_id=p_id, min_date=start_date,
                      max_date=yesterday, min_depth=10, max_depth=91, debug=debug)
    except Exception as e:
        print(e)
    print(project_data.app_link)
    if project_data.valid_project & debug:
        display(project_data.df_irrigation[project_data.df_irrigation.amount > MIN_IRR_AMOUNT])

    if project_data.valid_project:
        projects_df = get_project_results(project_data, False)
        return(projects_df)
    else:
        return('project_data.valid_project', project_data.valid_project)

In [111]:
summary_df.head()
summary_df['percent_not responding'] = summary_df.not_responding_events_count / summary_df.irrigation_events

Unnamed: 0,project_id,sensor_id,probe_depths,irrigation_events,not_responding_events_count,event_timestamp,support_status,support_updated_at,remarks,link,timezone,max_moisture_diff,days_since_task_complete
0,848547,974505,"[16.0, 31.0]",1,1,{2023-05-01 09:19:00},,,"{, Not responding to irrigation}",https://app.phytech.com/13909/78721/848547,America/Los_Angeles,,
0,848548,952232,"[16.0, 31.0]",1,0,{},,,{},https://app.phytech.com/13909/78722/848548,America/Los_Angeles,,
0,849303,948251,"[16.0, 31.0]",1,0,{},,,{},https://app.phytech.com/14031/78561/849303,America/Los_Angeles,,
0,849304,971593,"[16.0, 31.0]",1,0,{},new,2023-04-30 00:15:10.953637,{},https://app.phytech.com/14031/78561/849304,America/Los_Angeles,,
0,849305,971740,"[16.0, 31.0]",1,0,{},new,2023-04-26 00:45:00.311411,{},https://app.phytech.com/14031/78561/849305,America/Los_Angeles,,


0         1.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         1.0
0    0.666667
0         0.0
0         0.0
0         0.5
0         0.0
0         0.0
0         1.0
0         0.0
0    0.444444
0         0.2
0         0.0
0         0.0
0         1.0
0         0.0
0         0.0
0         0.0
0    0.666667
0        0.25
0         0.0
0         0.5
0         0.0
0         0.0
0         0.0
0         1.0
0         1.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         0.0
0         1.0
0    0.666667
0         0.0
0         1.0
0    0.666667
0         1.0
0         1.0
0    0.333333
0         0.0
0         0.0
0         1.0
0         0.4
0    0.166667
0    0.166667
0         1.0
0         1.0
0         1.0
0         1.0
0    0.333333
0         1.0
0         1.0
0         1.0
0         0.0
0         0.5
0         1.0
dtype: object

In [97]:
dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
import time
 
# ts stores the time in seconds
time.time()#
input_dict = {'2015-01-01': {'time': '8', 'capacity': '5'}, 
  '2015-01-02': {'time': '8', 'capacity': '7'},
  '2015-01-03': {'time': '8', 'capacity': '8'}}
max(int(d['capacity']) for d in input_dict.values() if d['capacity'] < '8')
d = {}
d.update({'a':1})
d
d.values()


'2023-05-09 12:59:19'

1683626359.6398451

7

{'a': 1}

dict_values([1])

In [107]:
def get_project_results(project_data, debug=False):
    project_results = find_not_responding_events(project_data,debug)
    df_irr = project_data.df_irrigation[project_data.df_irrigation.amount > MIN_IRR_AMOUNT]

    projects_df = pd.DataFrame(columns=['project_id','sensor_id','probe_depths','irrigation_events',
                                      'not_responding_events_count','max_SM_diff', 'event_timestamp',
                                       'support_status', 'support_updated_at','days_since_task_complete',
                                        'remarks', 'link','timezone'])
    project_dict = {}
    #max_val = 0
    remarks = set()
    
    project_dict.update({'project_id' : project_data.project_id,
                         'sensor_id' : project_data.sensor_id,
                         'probe_depths': project_results.get('probe_depths'),
                         'irrigation_events' : len(df_irr),
                         'not_responding_events_count' : project_results['events_count'], #max_val,
                         'link' : project_data.app_link,
                         'timezone' : project_data['timezone']})
    
    # No not_responding events found
    if not project_results['events_details']: # empty dict = No events
        project_dict.update({'max_SM_diff': None,
                             #'probe_depths': None,
                             'event_timestamp' : None,
                             'remarks' : None})
    else:
        # find probe with max not responding events
        for d in project_results.get('probe_depths'):
            remarks.add(project_results.get('events_details')[d]['remarks'])
            project_dict.update({'max_SM_diff': max(d['probe SM diff'] for d in project_results['events_details'].values()),
                                 #'probe_depths': project_results.get('probe_depths'),
                                 'event_timestamp' : project_results.get('event_timestamp'),
                                 'remarks' : remarks})
    # Find support status information
    if not project_data.sensor_support_status:
        sensor_status = {'status': None, 'updated_at': None, 'days_since_task_complete': None}
    else:
        sensor_status = project_data.sensor_support_status_dict[0]
        sensor_status['days_since_task_complete'] = (dt.datetime.today().date() -
                                    sensor_status['updated_at'].date()).days
    
    project_dict.update({'support_status' : sensor_status['status'],
                         'support_updated_at' : sensor_status['updated_at'],
                         'days_since_task_complete': sensor_status['days_since_task_complete']})
    
    
    projects_df = pd.concat([projects_df, pd.DataFrame.from_dict(project_dict,orient='index').T], ignore_index=True)
    #projects_df['days_since_task_complete'] =  (dt.datetime.today().date() - projects_df['support_updated_at'][0].date()).days
    cols = ['project_id', 'sensor_id', 'probe_depths', 'irrigation_events',
       'not_responding_events_count', 'max_SM_diff', 'event_timestamp',
       'support_status', 'support_updated_at','days_since_task_complete', 'remarks', 'link', 'timezone']
    projects_df = projects_df[cols]
    
    return(projects_df)

In [116]:
def find_not_responding_events(project_data, debug=False):
    # initialize
    not_responding_SM_sensors_project_dict,probe_events_dict = {},{}
    event_timestamp = set()
    probe_depths = project_data.df_sm_data.depth_cm.unique()
    total = 0
    not_responding_SM_sensors_project_dict.update({'probe_depths': list(probe_depths),
                                                   'events_count': total,
                                                   'events_details': {}})
    
    df_irr = project_data.df_irrigation[project_data.df_irrigation.amount > MIN_IRR_AMOUNT]
    for irr_event_counter,row in df_irr.iterrows():

        if debug:
            print(f"irrigation event: {df_irr[irr_event_counter]}")
            
        not_responding = False
        if row['amount'] > MIN_IRR_AMOUNT:
            for probe_depth in probe_depths:
                counter=0
                ProbeMinMoisture, ProbeMaxMoisture = 0,0
                probe_events_list = []

                if debug:
                    print("\nprobe_depth", probe_depth)

                probe_dict = {}
                probe_depth_index = list(probe_depths).index(probe_depth)

                probe_local_saturation = project_data.local_saturation_by_depth[probe_depth_index][1]
                if project_data.multi_depths_sm[probe_depth].empty: # no SM data for this depth
                    probe_dict.update({'irrigation_events': len(df_irr),
                                        'events dates': [],
                                        'probe SM diff': 0,
                                        'remarks': f"|missing SM data for {probe_depth}"})
                    probe_events_dict[probe_depth] = probe_dict
                    continue # check next depth

                df = project_data.multi_depths_sm[probe_depth].reset_index(drop=False)
                df['date'] = df.local_time.dt.date
                #################################
                # Not responding conditions:
                #
                # time frame irrigation span -1hr/ +3hr
                # soil moisture hourly diff > SM_HOURLY_DIFF according to depth
                # initial probe moisture is less than local saturation minus 0.5%
                #################################
                df_irr_span = df[(df.local_time > df_irr.start[irr_event_counter] - pd.Timedelta(hours=1)) 
                                 & (df.local_time < df_irr.end[irr_event_counter] + pd.Timedelta(hours=4))
                                ]
                if debug:
                    print(df_irr.start[irr_event_counter],df_irr.end[irr_event_counter],
                            df_irr_span)
                ProbeMinMoisture, ProbeMaxMoisture = min(df_irr_span.sm_val), max(df_irr_span.sm_val)
                
                SM_HOURLY_DIFF = SM_HOURLY_DIFF_FIRST_DEPTH if probe_depth_index==0 else SM_HOURLY_DIFF_SECOND_DEPTH
                probe_dict['remarks'] = ''
                # check if probe responding = at least one hourly diff above SM_HOURLY_DIFF
                if max(df_irr_span.sm_diff) < SM_HOURLY_DIFF:
                    # check if the probe initial moisture is near local saturation 
                    if ("not_responding" in project_data.flag) | (max(df_irr_span.sm_val.iloc[:3]) < probe_local_saturation - 0.5):
                        counter+=1 # count number of not responding probes
                        not_responding = True
                        #not_responding_sensor_id = int(project_data.df_sm_data[project_data.df_sm_data['depth_cm']==probe_depth].loc[0,'sensor_id'])
                        probe_events_list.append((probe_depth,df_irr.start[irr_event_counter]))
                        event_timestamp.add(df_irr.start[irr_event_counter])
                        probe_dict['remarks'] = '|Not responding to irrigation'
                # Find sensors with low response (low peak in sensor graph)
                        if (max(df_irr_span.sm_diff) > SM_HOURLY_DIFF*LOW_RESPONSE_FACTOR) & (max(df_irr_span.sm_diff) < SM_HOURLY_DIFF):
                            probe_dict['remarks'] += '|Low sensor respnse'
                    else:
                        probe_dict['remarks'] += '|probe_local_saturation'
                    
                    probe_dict['irrigation_events'] = len(df_irr)
                    probe_dict['events dates'] = probe_events_list
                    probe_dict['probe SM diff'] = ProbeMaxMoisture - ProbeMinMoisture
                    probe_events_dict[probe_depth] = probe_dict
                else:
                    if debug:
                        print(f"responding well, {df_irr.start[irr_event_counter]}")
                    probe_dict.update({'irrigation_events': len(df_irr),
                                        'events dates': [],
                                        'probe SM diff': max(df_irr_span.sm_val) - min(df_irr_span.sm_val),
                                        'remarks': ''})
                    probe_events_dict[probe_depth] = probe_dict
                    #break

                if debug:
                    print(f"max hourly diff: {max(df_irr_span.sm_diff)}",
                            f"probe local saturation: {probe_local_saturation}",
                            f"initial probe SM: {max(df_irr_span.sm_val.iloc[:3])}")
            
            # count number of not responding probes per irrigation span
            if counter == len(probe_depths): # all probes are NOT responding in this irrigation span
                probe_dict['remarks'] += '|All probes not responding'

            if not_responding:
                total+=1
            
            if debug:
                print(f"""total: {total} ,{df_irr.start[irr_event_counter]} finished\n######################""")
            not_responding_SM_sensors_project_dict.update({'probe_depths': list(probe_depths),
                                                           'events_count': total,
                                                           'event_timestamp': event_timestamp,
                                                           'events_details': probe_events_dict})
        else: # irrigation event amount less than minimum
            continue

    return(not_responding_SM_sensors_project_dict)

In [14]:
def find_not_responding_events_old(project_data, debug=False):
    not_responding_SM_sensors_project_dict,probe_events_dict = {},{}
    event_timestamp = set()
    probe_depths = project_data.df_sm_data.depth_cm.unique()
    for probe_depth in probe_depths:
        probe_events_list = []
        
        if debug:
            print("\nprobe_depth", probe_depth)
            
        probe_dict = {}
        counter = 0
        probe_depth_index = list(probe_depths).index(probe_depth)
        
        for irr_event_counter,row in project_data.df_irrigation.iterrows():
            #probe_depth = project_data.df_sm_data.depth_cm.unique()[0]
            probe_local_saturation = project_data.local_saturation_by_depth[probe_depth_index][1]
            df = project_data.multi_depths_sm[probe_depth].reset_index(drop=False)
            df['date'] = df.local_time.dt.date

            if row['amount'] > MIN_IRR_AMOUNT:
                #################################
                # Not responding conditions:
                #
                # time frame irrigation span -1hr/ +3hr
                # soil moisture hourly diff > SM_HOURLY_DIFF according to depth
                # initial probe moisture is less than local saturation minus 0.5%
                #################################
                df_irr_span = df[(df.local_time > project_data.df_irrigation.start[irr_event_counter] - pd.Timedelta(hours=1)) 
                                 & (df.local_time < project_data.df_irrigation.end[irr_event_counter] + pd.Timedelta(hours=4))
                                ]
                if debug:
                    display(project_data.df_irrigation.start[irr_event_counter],project_data.df_irrigation.end[irr_event_counter],
                            df_irr_span)
                
                
                SM_HOURLY_DIFF = SM_HOURLY_DIFF_FIRST_DEPTH if probe_depth_index==0 else SM_HOURLY_DIFF_SECOND_DEPTH
                probe_dict['remarks'] = ''
                # check if probe responding = at least one hourly diff above SM_HOURLY_DIFF
                if max(df_irr_span.sm_diff) < SM_HOURLY_DIFF:
                    # check if the probe initial moisture is near local saturation 
                    if ("not_responding" in project_data.flag) | (max(df_irr_span.sm_val.iloc[:3]) < probe_local_saturation - 0.5):
                        counter+=1 # Not responding to irrigation
                        not_responding_sensor_id = int(project_data.df_sm_data[project_data.df_sm_data['depth_cm']==probe_depth].loc[0,'sensor_id'])
                        probe_events_list.append((probe_depth,project_data.df_irrigation.start[irr_event_counter]))
                        event_timestamp.add(project_data.df_irrigation.start[irr_event_counter])
                        probe_dict['remarks'] = 'Not responding to irrigation'
                # Find sensors with low response (short peak in sensor graph)
                    if (max(df_irr_span.sm_diff) > SM_HOURLY_DIFF*LOW_RESPONSE_FACTOR) & (max(df_irr_span.sm_diff) < SM_HOURLY_DIFF):
                        probe_dict['remarks'] = 'Low sensor respnse'

                if debug:
                    display(f"max hourly diff: {max(df_irr_span.sm_diff)}",
                            f"probe local saturation: {probe_local_saturation}",
                            f"initial probe moisture: {max(df_irr_span.sm_val.iloc[:3])}")
                    
            probe_dict['irrigation_events'] = len(project_data.df_irrigation)
            probe_dict['not_responding_events_count'] = counter
            probe_dict['events dates'] = probe_events_list
            probe_events_dict[probe_depth] = probe_dict

    not_responding_SM_sensors_project_dict.update({'probe_depths':list(probe_depths),
                                                 'event_timestamp':event_timestamp,
                                                 'events_details':probe_events_dict})
    return(not_responding_SM_sensors_project_dict)



In [412]:
query = f"""
            SELECT serial_number as sensor_id, updated_at, status
            FROM work_order_line_items
            WHERE serial_number = '{project_data.sensor_id}'
            --AND status not in ('closed','completed')
            order by updated_at desc
            limit 1
        """
sql_importer = SqlImporter(query=query, database=c.database_ruby_production, user=c.user_production, password=c.password_production, host=c.host_production, port=c.port_production, verbose=True)

sql_importer.get_data()
sql_importer.data

Loaded table with 1 lines from work_order_line_items



Unnamed: 0,sensor_id,updated_at,status
0,934741,2023-04-27 08:52:33.235331,new


In [49]:
def load_project_data(project_id, min_date, max_date, min_depth=10, max_depth=91, debug=False):
    from logic_parameters import default_latitude, default_height,default_max_depth
    from project_class_data_extract import Project
    project = Project(
        project_id=project_id,
        min_depth=min_depth,
        max_depth=default_max_depth,  # set the depth range we're interested in
        min_date=min_date,
        max_date=max_date,
        debug=debug)

    project.load_project_metadata()
    project.get_sm_depths()

    if len(project.depths_found) == 0:
        project.valid_project = False
        return project

    project.load_sm_project_data(min_depth=project.depths_found[0], max_depth=project.depths_found[-1], )  # change the min/max_depth if you dont want to load all depths
    if not project.valid_project:
        return project
    #project.load_project_weather_data(future=14)  # load the weather date until max_date + 14 days.
    project.apply_transformers()
    project.group_data_to_depths()
    project.load_irrigation_spans()
    project.find_probe_local_saturation()

    project.meta_data = {'project_id': project.project_id, 'latitude': project.latitude if project.latitude else default_latitude,
                         'height': project.height if project.height else default_height, 'app_link': project.app_link}
    project.sensor_support_status_dict = project.get_sensor_support_status()
    if debug:
        print(project.df_sm_data)
    return project

In [29]:
def transform_data(project_data, depth, max_datetime, max_history_days):
    from config.logic_parameters import max_hours_after_peak, rise_th, peak_th
    dt = DataTransformer(project_data['meta_data'], project_data['multi_depths_sm'], project_data['weather_data'], depth=depth, max_history_days=max_history_days,
                         max_datetime=max_datetime, rise_th=rise_th, peak_th=peak_th)

    dt.collect_historical_peak2rise_events()
    dt.calc_sm_diffs()
    dt.extract_data_from_p2r(max_hours_after_peak=max_hours_after_peak)
    dt.add_lower_depth_data()
    dt.add_weather_data()
    project_data.update({'model_data': dt.data})

    model_data = dict(meta_data=project_data['meta_data'], model_data=project_data['model_data'])
    run_data = {'depth': dt.depth, 'max_datetime': dt.max_datetime, 'max_history_days': dt.max_history_days}
    model_data.update(run_data)
    return model_data

In [None]:
def load_irrigation_spans(self):
        query = f"""
            SELECT project_id, amount, start_ts, end_ts, psi, start_date, 
            timezone('{self.timezone}', to_timestamp(start_ts / 1000)) start_lt,
            timezone('{self.timezone}', to_timestamp(end_ts / 1000)) end_lt
            FROM {DB_TABLES['project_irrigation_spans']}
            WHERE project_id = {self.project_id} 
            AND start_date >= CAST((CAST('{self.min_date}' AS timestamp)) AS date)
            AND start_date <= CAST((CAST('{self.max_date}' AS timestamp)) AS date)
        """

        sql_importer = SqlImporter(query, conn_str=os.environ['DATABASE_URL'], verbose=self.debug)
        sql_importer.get_data()
        
        self.df_irrigation = sql_importer.data.copy().dropna()
        self.df_irrigation.rename(columns={'start_lt': 'start', 'end_lt': 'end'}, inplace=True)