In [51]:
import json, requests
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import BadRequestError, NotFoundError
import warnings
warnings.filterwarnings("ignore")
from string import Template

In [2]:
BOM = {
    "Mel_Olympic_Park)": "https://reg.bom.gov.au/fwo/IDV60901/IDV60901.95936.json"
}

In [3]:
response = requests.get(BOM["Mel_Olympic_Park)"])
response

<Response [200]>

In [9]:
result = response.json()["observations"]

In [11]:
# extract state, station name from header
header = result["header"][0]
station_name = header["name"]
state = header["state"]
print(station_name+' '+state)

Melbourne (Olympic Park) Victoria


In [17]:
data = result["data"]
len(data)

146

In [21]:
row = data[0]
row_pd = pd.Series(row, name= "data")

In [22]:
useful_feature = ["wmo", "local_date_time_full", "lat", "lon","apparent_t","delta_t","gust_kmh","gust_kt","air_temp","dewpt","press","rain_trace","rel_hum","wind_spd_kmh","wind_spd_kt","wind_dir"]

In [24]:
clear_row = row_pd[useful_feature]

In [27]:
client = Elasticsearch (
    "https://127.0.0.1:9200",
    verify_certs= False,
    basic_auth=('elastic', 'elastic'),
    request_timeout=300)
index_list = client.cat.indices(format="json")
index_list



ListApiResponse([{'health': 'green', 'status': 'open', 'index': 'vic_roads_crash', 'uuid': 'uecBFYBZTM2oRVi8sAyrpA', 'pri': '3', 'rep': '1', 'docs.count': '78586', 'docs.deleted': '0', 'store.size': '43.1mb', 'pri.store.size': '21.5mb'}, {'health': 'green', 'status': 'open', 'index': 'population', 'uuid': 'YNyjimPARPKVHdry7UAsOg', 'pri': '3', 'rep': '1', 'docs.count': '3040', 'docs.deleted': '0', 'store.size': '1.4gb', 'pri.store.size': '742.2mb'}])

In [49]:
BOM_mappings = {
    "settings": {
        "index": {
            "number_of_shards": 3,
            "number_of_replicas":1
        }
    },
    "mappings": {
        "properties": {
            "STATION": {
                "type":"keyword"
            },
            "STATE": {
                "type": "keyword"
            },
            "DATE": {
                "type": "date"
            },
            "Location" : {
                "type": "geo_point"
            },
            "wmo": {
                "type": "integer"
            },
            "air_temp": {
                "type": "double"
            },
            "apparent_t": {
                "type": "double",
            },
            "dewpt": {
                "type": "double"
            },
            "rel_hum": {
                "type": "integer"
            },
            "delta_t": {
                "type": "double",
            },
            "wind_dir": {
                "type": "keyword"
            },
            "wind_spd_kmh": {
                "type": "integer"
            },
            "wind_spd_kt": {
                "type": "integer"
            },
            "gust_kmh": {
                "type": "integer"
            },
            "gust_kt": {
                "type": "integer"
            },
            "press": {
                "type": "double"
            },
            "rain_trace": {
                "type": "double"
            }
        }
    }
}

In [81]:
index_name = "bom_weather"
try:
    client.indices.create(index=index_name, body=BOM_mappings)
except BadRequestError as e:
    print("The index has been already in Elastic Search")
client.indices.exists(index=index_name)

HeadApiResponse(True)

In [82]:
def parse_time(obj: str):
    yy = obj[0:4]
    mm = obj[4:6]
    dd = obj[6:8]
    hh = obj[8:10]
    mm2= obj[10:12]
    ss = obj[-2:]

    return f"{yy}-{mm}-{dd}T{hh}:{mm2}:{ss}"

processed_data = []
for row in data:
    row_pd = pd.Series(row, name= "data")
    clear_row = row_pd[useful_feature]
    clear_row["local_date_time_full"] = parse_time(clear_row["local_date_time_full"])
    processed_data.append(clear_row)

In [83]:
query_template = Template('''
{
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "DATE": "$DATE"
                    }
                },
                {
                    "match": {
                        "STATION": "$STATION"
                    }
                }
            ]
        }
    }
}
''')
expr = query_template.substitute({"STATION":station_name, "DATE":"2024-04-30T14:30:00"})
body = json.loads(expr)
res = client.search(index=index_name, body=body)

In [84]:
res["hits"]["total"]["value"]

0

In [85]:
processed_data[0].index

Index(['wmo', 'local_date_time_full', 'lat', 'lon', 'apparent_t', 'delta_t',
       'gust_kmh', 'gust_kt', 'air_temp', 'dewpt', 'press', 'rain_trace',
       'rel_hum', 'wind_spd_kmh', 'wind_spd_kt', 'wind_dir'],
      dtype='object')

In [86]:
doc_template = Template('''{
            "STATION": "$station",
            "STATE": "$state",
            "DATE": "$local_date_time_full",
            "Location":{"lat":$lat, "lon":$lon},
            "wmo": $wmo,
            "air_temp": $air_temp,
            "apparent_t": $apparent_t,
            "dewpt": $dewpt,
            "rel_hum": $rel_hum,
            "delta_t": $delta_t,
            "wind_dir": "$wind_dir",
            "wind_spd_kmh": $wind_spd_kmh,
            "wind_spd_kt": $wind_spd_kt,
            "gust_kmh": $gust_kmh,
            "gust_kt": $gust_kt,
            "press": $press,
            "rain_trace": $rain_trace
        }''')

In [87]:
def para2dict(row):
    f = {idx: row[idx] for idx in row.index}
    s = {"station":station_name, "state":state}
    return {**f,**s}

In [88]:
from tqdm import tqdm
for record in tqdm(processed_data):
    date = record["local_date_time_full"]
    expr = query_template.substitute({"STATION":station_name, "DATE":date})
    query_body = json.loads(expr)
    res = client.search(index=index_name, body=query_body)
    if res["hits"]["total"]["value"] == 0:
        expr = doc_template.substitute(para2dict(record))
        doc_body = json.loads(expr)
        client.index(index=index_name, body=doc_body)

100%|██████████| 146/146 [00:31<00:00,  4.58it/s]
