In [73]:
project_s3_bucket = "arable-adse-dev"
project_s3_folder = "rain_classification_april_2022"

In [74]:
import json
import os
import pickle
import pandas as pd
import numpy as np
from scipy import stats
import requests
import boto3
from boto3.session import Session
from utils import df_to_s3, df_from_s3
from calval_data_access.mark_data import get_db_data, get_user_db_creds

In [75]:
def get_lat_long_from_location(location_str):
    if not os.path.isfile("location_cache.pkl"):
        location_cache = {}
    else:
        with open("location_cache.pkl", "rb") as handle:
            location_cache = pickle.load(handle)
    if location_str in location_cache.keys():
        return location_cache[location_str]
    else:
        db_creds = get_user_db_creds("jacob_goldberg", "als")
        query = f"SELECT lat, long, elev FROM model_data.location WHERE id ='{location_str}'"
        db_response = get_db_data(db_creds, query)
        if db_response.empty:
            location_data = {
                "lat": np.nan,
                "long": np.nan,
                "elev": np.nan,
            }
        else:
            location_data = {
                "lat": db_response["lat"].iloc[0],
                "long": db_response["long"].iloc[0],
                "elev": db_response["elev"].iloc[0],
            }

        location_cache[location_str] = location_data
        with open("location_cache.pkl", "wb") as handle:
            pickle.dump(location_cache, handle)

        return location_data

In [76]:
def get_ibm_creds():
    session = Session()
    client = boto3.client("secretsmanager")
    response = client.get_secret_value(SecretId="ibm-datasci")
    ibm_creds = json.loads(response["SecretString"])
    return ibm_creds


def _get_hod(lat, long, start, end, key):
    endpoint = "https://api.weather.com/v3/wx/hod/r1/direct"
    parameters = {
        "geocode": f"{lat},{long}",
        "startDateTime": start.strftime("%Y%m%dT0000"),
        "endDateTime": end.strftime("%Y%m%dT0000"),
        "format": "csv",
        "units": "metric",
        "apiKey": key,
    }
    request_url = requests.Request("GET", endpoint, params=parameters).prepare().url
    df = pd.read_csv(request_url)
    return df

In [77]:
def get_hod(lat, long, start, end, key=None):
    """
    wrapper for _get_hod that respects apis 9990 row response limit
    """
    if key is None:
        key = get_ibm_creds()["history_on_demand"]

    call_limit_hours = 9900
    days_requested = (end - start).days
    hours_requested = days_requested * 24
    if hours_requested < call_limit_hours:
        return _get_hod(
            lat,
            long,
            start,
            end,
            key,
        )
    else:
        date_chunks = pd.date_range(start, end, freq=f"{call_limit_hours}h")
        all_response_dfs = [pd.DataFrame()]
        for i in range(len(date_chunks)):
            if i == len(date_chunks) - 1:
                response_df = _get_hod(lat, long, date_chunks[-1], end, key)
            else:
                response_df = _get_hod(
                    lat,
                    long,
                    date_chunks[i],
                    date_chunks[i + 1],
                    key,
                )
            all_response_dfs.append(response_df)
        return pd.concat(all_response_dfs)

In [78]:
devdict={'C005285':[40.4621, -74.2927, pd.to_datetime('2022-9-1'), pd.to_datetime('2022-9-10')],
         'C007978':[29.634, -90.835, pd.to_datetime('2022-7-25'), pd.to_datetime('2022-7-30')],
         'C003188':[36.605, -97.489, pd.to_datetime('2022-11-1'), pd.to_datetime('2022-11-5')]
        }

In [85]:
result=pd.DataFrame()
for key, values in devdict.items():
    lat, lon, start_date, end_date = values
    print(key,lat, lon, start_date, end_date)
    _ = get_hod(lat, lon, start_date, end_date, key=None)
    _['device']=key
    result = pd.concat([result, _])
    

C005285 40.4621 -74.2927 2022-09-01 00:00:00 2022-09-10 00:00:00
C007978 29.634 -90.835 2022-07-25 00:00:00 2022-07-30 00:00:00
C003188 36.605 -97.489 2022-11-01 00:00:00 2022-11-05 00:00:00


In [86]:
result = result.rename(columns={'validTimeUtc': 'time'})[['time', 'device', 'latitude', 'longitude', 'precip1Hour', 'relativeHumidity', 'temperature']]

In [87]:
result.device.unique()

array(['C005285', 'C007978', 'C003188'], dtype=object)

In [88]:
df_to_s3(result, key=f"{project_s3_folder}/ibm_enrichment/ibm_rh_enriched.csv",bucket=project_s3_bucket)

Uploaded file to s3://arable-adse-dev/rain_classification_april_2022/ibm_enrichment/ibm_rh_enriched.csv


In [44]:
# def fetch_all_ibm_data():
#     # Generate locations and times for data fetch from Mark data
#     joined_mark_data = df_from_s3(
#         f"{project_s3_folder}/training_data/cleaned_joined_data.parquet",
#         project_s3_bucket,
#         format="parquet",
#     )

#     max_time = (
#         joined_mark_data[["time", "site_id"]]
#         .groupby("site_id")
#         .max()
#         .rename(columns={"time": "end"})
#     )
#     min_time = (
#         joined_mark_data[["time", "site_id"]]
#         .groupby("site_id")
#         .min()
#         .rename(columns={"time": "start"})
#     )
#     site_time_ranges = max_time.join(min_time)

#     # sort of painfully slow way to get most common location for site, but
#     # it works so leaving for now
#     location_keys = (
#         joined_mark_data[["site_id", "location"]]
#         .groupby("site_id")
#         .agg(lambda x: stats.mode(x)[0][0])
#     )

#     location_keys["lat"] = location_keys["location"].apply(
#         lambda x: get_lat_long_from_location(x)["lat"]
#     )
#     location_keys["long"] = location_keys["location"].apply(
#         lambda x: get_lat_long_from_location(x)["long"]
#     )

#     time_range_and_location = location_keys.join(site_time_ranges).reset_index()
#     print(f" Fetching IBM Data for:{time_range_and_location}")

#     for i, row in time_range_and_location.iterrows():
#         lat = row.lat
#         long = row.long
#         site = row.site_id
#         start = row.start
#         end = row.end

#         print(f"{site}, {lat},{long},{start},{end}")
#         try:
#             ibm_hod = get_hod(lat, long, start, end)

#             df_to_s3(
#                 ibm_hod,
#                 f"{project_s3_folder}/training_data/ibm/{site}.parquet",
#                 project_s3_bucket,
#                 format="parquet",
#             )
#         except Exception as e:
#             print(f"ERROR FOR {site}: {e}")
#             continue