In [1]:
import requests
from getpass import getpass
import pandas as pd
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch, helpers

In [2]:
def connect_to_nasa():
    url = "https://api.nasa.gov/neo/rest/v1/feed"
    nasa_api_key = getpass("NASA API Key: ")
    today = datetime.now()
    params = {
        "api_key": nasa_api_key,
        "start_date": today - timedelta(days=7),
        "end_date": datetime.now(),
    }
    return requests.get(url, params).json()

In [3]:
response = connect_to_nasa()

NASA API Key:  ········


In [4]:
def create_df(response):
    all_objects = []
    for date, objects in response["near_earth_objects"].items():
        for obj in objects:
            obj["close_approach_date"] = date
            all_objects.append(obj)
    df = pd.json_normalize(all_objects)
    return df.drop("close_approach_data", axis=1)

In [5]:
df = create_df(response)
df.head()

Unnamed: 0,id,neo_reference_id,name,nasa_jpl_url,absolute_magnitude_h,is_potentially_hazardous_asteroid,is_sentry_object,close_approach_date,links.self,estimated_diameter.kilometers.estimated_diameter_min,estimated_diameter.kilometers.estimated_diameter_max,estimated_diameter.meters.estimated_diameter_min,estimated_diameter.meters.estimated_diameter_max,estimated_diameter.miles.estimated_diameter_min,estimated_diameter.miles.estimated_diameter_max,estimated_diameter.feet.estimated_diameter_min,estimated_diameter.feet.estimated_diameter_max,sentry_data
0,2434786,2434786,434786 (2006 PW),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,18.16,False,False,2024-08-20,http://api.nasa.gov/neo/rest/v1/neo/2434786?ap...,0.620233,1.386883,620.233153,1386.883492,0.385395,0.861769,2034.885738,4550.142836,
1,3756789,3756789,(2016 PO),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,25.1,False,False,2024-08-20,http://api.nasa.gov/neo/rest/v1/neo/3756789?ap...,0.025384,0.05676,25.383703,56.759685,0.015773,0.035269,83.279868,186.219446,
2,3781330,3781330,(2017 QX35),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,22.24,False,False,2024-08-20,http://api.nasa.gov/neo/rest/v1/neo/3781330?ap...,0.094745,0.211856,94.744711,211.855615,0.058872,0.131641,310.842239,695.064376,
3,54203000,54203000,(2018 UY37),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,22.53,False,False,2024-08-20,http://api.nasa.gov/neo/rest/v1/neo/54203000?a...,0.0829,0.18537,82.900085,185.370226,0.051512,0.115184,271.981915,608.170051,
4,54317182,54317182,(2022 UG2),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,27.11,False,False,2024-08-20,http://api.nasa.gov/neo/rest/v1/neo/54317182?a...,0.010059,0.022493,10.059004,22.492617,0.00625,0.013976,33.001982,73.794676,


In [6]:
df = df.drop(['links.self', 'sentry_data'], axis=1)

In [7]:
df.isnull().values.any()

False

In [8]:
def connect_to_elastic():
    elasticsearch_endpoint = getpass("Host Address: ")
    elasticsearch_api_key = getpass("Elastic API Key: ")
    return Elasticsearch(hosts=elasticsearch_endpoint, api_key=elasticsearch_api_key)

In [9]:
es = connect_to_elastic()

Host Address:  ········
Elastic API Key:  ········


In [10]:
index_name = "data_loading_talk_nyc"
es.indices.create(index=index_name)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'data_loading_talk_nyc'})

In [11]:
def doc_generator(df, index_name):
    for index, document in df.iterrows():
        yield {
            "_index": index_name,
            "_id": f"{document['id']}",
            "_source": document.to_dict(),
        }

In [12]:
helpers.bulk(es, doc_generator(df, index_name))

(114, [])

In [13]:
for success, info in helpers.parallel_bulk(es, doc_generator(df, 'parallel_bulk_nyc'), thread_count=4):
    if not success:
        print('A document failed:', info)

In [14]:
def updated_last(es, index_name):
    query = {
        "size": 0,
        "aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
    }
    response = es.search(index=index_name, body=query)
    last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
    datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
    return datetime_obj.strftime("%Y-%m-%d")

In [15]:
last_update_date = updated_last(es, index_name)
print(last_update_date)

2024-08-27


In [16]:
def update_new_data(df, es, last_update_date, index_name):
    if isinstance(last_update_date, str):
        last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")

    last_update_date = pd.Timestamp(last_update_date).normalize()

    if not df.empty and "close_approach_date" in df.columns:
        df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])

    today = pd.Timestamp(datetime.now().date()).normalize()

    if df is not None and not df.empty:
        update_range = df.loc[
            (df["close_approach_date"] > last_update_date)
            & (df["close_approach_date"] < today)
        ]
        if not update_range.empty:
            helpers.bulk(es, doc_generator(update_range, index_name))
        else:
            print("No new data to update.")
    else:
        print("The DataFrame is None.")

In [17]:
try:
    if df is None:
        raise ValueError("DataFrame is None. There may be a problem.")
    update_new_data(df, es, last_update_date, index_name)
    print(updated_last(es, index_name))
except Exception as e:
    print(f"An error occurred: {e}")

No new data to update.
2024-08-27
