In [11]:
import logging
import asyncio
import os
import sys
import json

import pandas as pd
import matplotlib.pyplot as plt

# XXX temporary hack to import from src
try:
    path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "src")
except:
    path = "~/gith/domschl/indrajala/python_indrajala/src"
    # expand ~
    path = os.path.expanduser(path)
# print(path)
sys.path.append(path)

from indralib.indra_event import IndraEvent
from indralib.indra_client import IndraClient
from indralib.indra_downloader import IndraDownloader

In [12]:
logging.basicConfig(
    format="%(asctime)s %(levelname)s %(name)s %(message)s", level=logging.INFO
)


In [13]:

dl = IndraDownloader(cache_dir="geodata/cache")
dfs = dl.get_datasets(data_sources_dir="geodata/data_sources")
for df_name in dfs:
    print("-----------------------------------------------")
    print(df_name)
    print(dfs[df_name]["metadata"])
    print(dfs[df_name]["data"].head())
print(f"Number of datasets: {len(dfs)}")


2023-06-06 17:26:21,679 INFO Downloader processing: geodata/data_sources/MaunaLoaCO2MonthlyMean.toml
2023-06-06 17:26:22,627 INFO Downloader Read https://gml.noaa.gov/webdata/ccgg/trends/co2/co2_mm_mlo.csv from cache at geodata/cache/co2_mm_mlo.csv
2023-06-06 17:26:22,628 INFO Downloader Creating dataset MaunaLoaMonthlyCO2
2023-06-06 17:26:22,641 INFO Downloader processing: geodata/data_sources/EuropeanMeanTemperatureSinceRomanTime_EuroMed2k.toml
2023-06-06 17:26:24,481 INFO Downloader Read https://www.ncei.noaa.gov/pub/data/paleo/pages2k/EuroMed2k/eujja_2krecon_nested_cps.txt from cache at geodata/cache/eujja_2krecon_nested_cps.txt
2023-06-06 17:26:24,482 INFO Downloader Creating dataset euromed2k
2023-06-06 17:26:24,490 INFO Downloader processing: geodata/data_sources/LawDome2006.toml
2023-06-06 17:26:25,670 INFO Downloader Read https://www.ncei.noaa.gov/pub/data/paleo/icecore/antarctica/law/law2006.txt from cache at geodata/cache/law2006.txt
2023-06-06 17:26:25,670 INFO Downloader C

-----------------------------------------------
MaunaLoaMonthlyCO2
{'title': 'Mauna Loa CO2 monthly mean data', 'description': 'Data from March 1958 through April 1974 have been obtained by C. David Keeling of the Scripps Institution of Oceanography (SIO) and were obtained from the Scripps website (scrippsco2.ucsd.edu). Monthly mean CO2 constructed from daily mean values Scripps data downloaded from http://scrippsco2.ucsd.edu/data/atmospheric_co2 Monthly values are corrected to center of month based on average seasonal cycle. Missing days can be asymmetric which would produce a high or low bias. Missing months have been interpolated, for NOAA data indicated by negative stdev and uncertainty. We have no information for SIO data about Ndays, stdv, unc so that they are also indicated by negative numbers', 'authors': ['Dr. Pieter Tans', 'Dr. Ralph Keeling'], 'publication_date': '2022-06', 'last_update': '2022-06', 'publisher': 'Dr. Pieter Tans, NOAA/GML (gml.noaa.gov/ccgg/trends/) and Dr. 

In [14]:
from datetime import timedelta, datetime

def convert_partial_year(number):
    year = int(number)
    d = timedelta(days=(number - year) * 365)
    day_one = datetime(year, 1, 1)
    date = d + day_one
    return date

In [3]:
cl = IndraClient(config_file="ws_indra.toml", verbose = True)
await cl.init_connection(verbose=True)

<websockets.legacy.client.WebSocketClientProtocol at 0x1463fb250>

In [29]:
async def async_entire_history(domain):
    # data = await asyncio.create_task(cl.get_wait_history(domain, -1000000000))
    return await cl.get_wait_history(domain, -1000000000)

In [50]:
df = dfs["MaunaLoaMonthlyCO2"]['data']
print(df.head())
md = dfs["MaunaLoaMonthlyCO2"]['metadata']
print(md['indra_id'], md['indra_domain'])
domain = md['indra_domain'].replace("{indra_id}", md['indra_id'])
print(domain)

existing_data = await async_entire_history(domain)
print(f"Start, existing data: {len(existing_data)} records")
# Enum rows:
dups=0
for i, row in df.iterrows():
    # get column 'decimal year':
    # print(f"{i}: {row['year']}-{row['month']}  z{row['decimal date']}, {row['average']}")
    # convert fractional year into datetime:
    dt = convert_partial_year(row['decimal date'])
    jd = IndraEvent.datetime2julian(dt)
    found = False
    for (j, e) in enumerate(existing_data):
        # print(jd,e[0])
        if abs(jd-e[0])<0.0001:
            # print(f"Found existing data for {jd}")
            found = True
            dups += 1
            break
    if found:
        continue
    ie = IndraEvent()
    ie.domain = domain
    ie.from_id = "py/MaunaLoaMonthlyCO2"
    ie.to_scope = "public"
    ie.time_jd_start = jd
    ie.data_type = "number/float/co2"
    ie.data = json.dumps(row['average'])
    print("Send new: ", ie.to_json())
    # await cl.send_event(ie)
print(f"Already existing data: {dups} records, new: {len(df)-dups} records")

2023-06-06 17:45:34,722 INFO IndraClient Sending: {"domain": "$trx/db/req/event/history", "from_id": "ws/python", "uuid4": "d4a5b553-a8c1-421c-9e8f-b9e010e733fb", "to_scope": "", "time_jd_start": 2460103.2399852113, "data_type": "eventrequest", "data": "{\"domain\": \"$event/geodata/historical/manualoaco2monthlymean\", \"time_jd_start\": -1000000000, \"time_jd_end\": null, \"max_count\": null, \"mode\": \"Interval\"}", "auth_hash": "", "time_jd_end": null}
2023-06-06 17:45:34,737 INFO IndraClient Received message: {"domain":"Ws.1/127.0.0.1:49882","from_id":"SQLx.1","uuid4":"d4a5b553-a8c1-421c-9e8f-b9e010e733fb","to_scope":"$event/geodata/historical/manualoaco2monthlymean","time_jd_start":2460102.1566520217,"data_type":"vector/tuple/jd/float","data":"[[2436279.4855,315.7],[2436310.5105,317.45],[2436340.5135000004,317.51],[2436371.502,317.24],[2436401.5050000004,315.86],[2436432.4935,314.93],[2436463.4820000003,313.2],[2436493.485,312.43],[2436524.51,313.33],[2436554.5130000003,314.67],[

   year  month  decimal date  average  deseasonalized  ndays  sdev  unc
0  1958      3     1958.2027   315.70          314.43    NaN   NaN  NaN
1  1958      4     1958.2877   317.45          315.16    NaN   NaN  NaN
2  1958      5     1958.3699   317.51          314.71    NaN   NaN  NaN
3  1958      6     1958.4548   317.24          315.14    NaN   NaN  NaN
4  1958      7     1958.5370   315.86          315.18    NaN   NaN  NaN
manualoaco2monthlymean $event/geodata/historical/{indra_id}
$event/geodata/historical/manualoaco2monthlymean
Future:  <Future pending>
from_json:  {"domain": "Ws.1/127.0.0.1:49882", "from_id": "SQLx.1", "uuid4": "d4a5b553-a8c1-421c-9e8f-b9e010e733fb", "to_scope": "$event/geodata/historical/manualoaco2monthlymean", "time_jd_start": 2460102.1566520217, "data_type": "vector/tuple/jd/float", "data": "[[2436279.4855,315.7],[2436310.5105,317.45],[2436340.5135000004,317.51],[2436371.502,317.24],[2436401.5050000004,315.86],[2436432.4935,314.93],[2436463.4820000003,313.2

In [28]:
domain

'$event/geodata/historical/manualoaco2monthlymean'

In [31]:
data = await async_entire_history(domain)


2023-06-06 17:36:25,999 INFO IndraClient Sending: {"domain": "$trx/db/req/event/history", "from_id": "ws/python", "uuid4": "0408da50-0be0-46c2-9d93-e6be639ff19c", "to_scope": "", "time_jd_start": 2460103.233634257, "data_type": "eventrequest", "data": "{\"domain\": \"$event/geodata/historical/manualoaco2monthlymean\", \"time_jd_start\": -1000000000, \"time_jd_end\": null, \"max_count\": null, \"mode\": \"Interval\"}", "auth_hash": "", "time_jd_end": null}
2023-06-06 17:36:26,016 INFO IndraClient Received message: {"domain":"Ws.1/127.0.0.1:49882","from_id":"SQLx.1","uuid4":"0408da50-0be0-46c2-9d93-e6be639ff19c","to_scope":"$event/geodata/historical/manualoaco2monthlymean","time_jd_start":2460102.1503010723,"data_type":"vector/tuple/jd/float","data":"[[2436279.4855,315.7],[2436310.5105,317.45],[2436340.5135000004,317.51],[2436371.502,317.24],[2436401.5050000004,315.86],[2436432.4935,314.93],[2436463.4820000003,313.2],[2436493.485,312.43],[2436524.51,313.33],[2436554.5130000003,314.67],[2

Future:  <Future pending>
from_json:  {"domain": "Ws.1/127.0.0.1:49882", "from_id": "SQLx.1", "uuid4": "0408da50-0be0-46c2-9d93-e6be639ff19c", "to_scope": "$event/geodata/historical/manualoaco2monthlymean", "time_jd_start": 2460102.1503010723, "data_type": "vector/tuple/jd/float", "data": "[[2436279.4855,315.7],[2436310.5105,317.45],[2436340.5135000004,317.51],[2436371.502,317.24],[2436401.5050000004,315.86],[2436432.4935,314.93],[2436463.4820000003,313.2],[2436493.485,312.43],[2436524.51,313.33],[2436554.5130000003,314.67],[2436585.5015,315.58],[2436616.49,316.48],[2436644.4855,316.65],[2436675.5105,317.72],[2436705.5135000004,318.29],[2436736.502,318.15],[2436766.5050000004,316.54],[2436797.4935,314.8],[2436828.4820000003,313.84],[2436858.485,313.33],[2436889.51,314.81],[2436919.5130000003,315.58],[2436950.4650000003,316.43],[2436981.3805,316.98],[2437010.2885000003,317.58],[2437041.204,319.03],[2437071.134,320.04],[2437102.0494999997,319.59],[2437131.9795000004,318.18],[2437162.8949

In [33]:
data

[[2436279.4855, 315.7],
 [2436310.5105, 317.45],
 [2436340.5135000004, 317.51],
 [2436371.502, 317.24],
 [2436401.5050000004, 315.86],
 [2436432.4935, 314.93],
 [2436463.4820000003, 313.2],
 [2436493.485, 312.43],
 [2436524.51, 313.33],
 [2436554.5130000003, 314.67],
 [2436585.5015, 315.58],
 [2436616.49, 316.48],
 [2436644.4855, 316.65],
 [2436675.5105, 317.72],
 [2436705.5135000004, 318.29],
 [2436736.502, 318.15],
 [2436766.5050000004, 316.54],
 [2436797.4935, 314.8],
 [2436828.4820000003, 313.84],
 [2436858.485, 313.33],
 [2436889.51, 314.81],
 [2436919.5130000003, 315.58],
 [2436950.4650000003, 316.43],
 [2436981.3805, 316.98],
 [2437010.2885000003, 317.58],
 [2437041.204, 319.03],
 [2437071.134, 320.04],
 [2437102.0494999997, 319.59],
 [2437131.9795000004, 318.18],
 [2437162.8949999996, 315.9],
 [2437193.8104999997, 314.17],
 [2437223.704, 313.83],
 [2437254.6195000005, 315.0],
 [2437284.5494999997, 316.19],
 [2437316.5015, 316.89],
 [2437347.49, 317.7],
 [2437375.4855, 318.54],
