In [34]:
import datetime
import asyncio

import httpx
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.server_api import ServerApi
import pandas as pd
import tqdm # for status-bar
import anyio # for parallel-processes

from sp_project.data_scraping.openweather_api_client import OpenWeatherClient

In [35]:
api_keys = [
   # """5e0a75be6198c589c6b0fd9006e1df0c""".strip(), # epy
    """***REMOVED***""".strip(), # dk
    """c40f5d0b70c6b68b62c97a0765ee76cf""".strip(), # mw
]

In [36]:
coordinates = {
    "grid00": ( 7.10, 46.22),
    "grid01": ( 7.66, 46.22),
    "grid02": ( 8.79, 46.22),
    "grid03": ( 6.53, 46.62),
    "grid04": ( 7.10, 46.62),
    "grid05": ( 7.66, 46.62),
    "grid06": ( 8.23, 46.62),
    "grid07": ( 8.79, 46.62),
    "grid08": ( 9.36, 46.62),
    "grid09": ( 9.92, 46.62),
    "grid10": ( 7.10, 47.02),
    "grid11": ( 7.66, 47.02),
    "grid12": ( 8.23, 47.02),
    "grid13": ( 8.79, 47.02),
    "grid14": ( 9.36, 47.02),
    "grid15": ( 7.10, 47.41),
    "grid16": ( 7.66, 47.41),
    "grid17": ( 8.23, 47.41),
    "grid18": ( 8.79, 47.41),
    "grid19": ( 9.36, 47.41),
}

In [37]:
async def check_data_in_DB(collection, lon, lat, dt, timelimit:datetime.timedelta=datetime.timedelta(minutes=5)):
    """Check if DB has already an entry to a specific location and specific time +/- 5 Minutes;
    return 'True' if there is already an entry and return 'False' if there is none"""
    
    result = await collection.find_one(
            dict(
                lon=lon,
                lat=lat,
                dt={"$lte":dt+timelimit, "$gte":dt-timelimit},
            ),)
    if result is None:
        return False
    return True

In [38]:
async def get_datapoints_from_OW(location, dt):
    """Collect the data from a specific location and a specific time from the OpenWeatherAPI;
    Return the data as a list with dictionaries"""
    
    data = await location.historic(dt)
    out_data = []
    # flattens the data
    for d in data["data"]:
        for k, v in data.items():
            if k=="data":
                continue
            d[k] = v
        out_data.append(d)
    return out_data

In [39]:
async def insert_data_in_DB(collection, data:list[dict]):
    """Insert the data to the collection; if there is already a data-set with the same location and time, 
    the old data is overwritten"""
    
    for d in data:
        await collection.replace_one(
            dict(
                lon=d["lon"],
                lat=d["lat"],
                dt=d["dt"],
            ),
            d,
            upsert=True,
        )

In [40]:
async def run_the_program(collection, locations, start_time, end_time):
    """Run all the above methodes"""
    
    start_time = pd.Timestamp(start_time).floor("4H")
    
    for api_key in api_keys:
        timestamps_list = pd.date_range(start_time, end_time, freq="4H")

        counter = 0
        limit_reached = False
        total = len(locations)*len(timestamps_list)
        pbar = tqdm.tqdm(total=total) # Progress-Bar
        limiter = anyio.CapacityLimiter(10)
        send_stream, receive_stream = anyio.create_memory_object_stream()

        restart_end_time = start_time # restart time for the next api-key
        
        
        async def handle(receive_stream):
            nonlocal counter, limit_reached, restart_end_time
            async with receive_stream:
                async for location, timestamp in receive_stream:
                    if limit_reached:
                        return
                    if not await check_data_in_DB(collection, lon=location.lon, lat=location.lat, dt=timestamp):
                        async with anyio.CancelScope(shield=True):
                            # ignores external cancellation e.g. when another task fails, as long as the current task is ok
                            try:
                                result = await get_datapoints_from_OW(location, timestamp)
                            except Exception as ex:
                                limit_reached = True
                                restart_end_time = max(restart_end_time, timestamp+datetime.timedelta(days=1))
                                # max() because we want the later of those arguments
                                # sets the failing timestamp on the variable end_time,
                                # so the next api-key doesn't have to start from the beginning
                                # timedelta(days=1) to make sure, the day, 
                                # the api-key failed will be included with the next api-key
                                print(f'OneCallAPI reached limit at {counter=} and {timestamp=}: {ex!r}')
                                return
                            await insert_data_in_DB(collection, result)                   
                        counter+=1
                    pbar.update()


        async with OpenWeatherClient(
            api_key = api_key
        ) as OWclient:
            async with anyio.create_task_group() as task_group:
                for _ in range(5):
                    task_group.start_soon(handle, receive_stream.clone())
                receive_stream.close()
                async with send_stream:            
                    for timestamp in reversed(timestamps_list):
                        timestamp = timestamp.to_pydatetime()                   
                        for loc_name, loc_coord in locations.items():
                            location = OWclient.historic_station_at(*loc_coord)
                            try:
                                await send_stream.send((location, timestamp))                             
                            except (anyio.BrokenResourceError, anyio.ClosedResourceError):
                                break
        
        end_time = restart_end_time
        pbar.close()
        print(f'Fetched {len(locations)} Locations and {len(timestamps_list)} Timepoints and added {counter}/{total} Elements to Database ')
        
        if restart_end_time == start_time:
            break
        else:
            await anyio.sleep(10)

In [41]:
uri = "mongodb+srv://scientificprogramming:***REMOVED***@scientificprogramming.nzfrli0.mongodb.net/test"
DBclient = AsyncIOMotorClient(uri, server_api=ServerApi('1'))
db = DBclient.data
collection = db.openweather

end_time = datetime.datetime.now().astimezone() - datetime.timedelta(days=80)
start_time = end_time - datetime.timedelta(days=400)

await run_the_program(collection, coordinates, start_time=start_time, end_time=end_time)

 27%|█████████▌                          | 12797/48020 [02:04<04:28, 131.18it/s]

OneCallAPI reached limit at counter=3 and timestamp=datetime.datetime(2022, 10, 30, 20, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200), 'CEST')): ReadTimeout('')
OneCallAPI reached limit at counter=3 and timestamp=datetime.datetime(2022, 10, 30, 20, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200), 'CEST')): ReadTimeout('')


 27%|█████████▊                           | 12810/48020 [02:15<06:11, 94.88it/s]

OneCallAPI reached limit at counter=5 and timestamp=datetime.datetime(2022, 10, 30, 16, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200), 'CEST')): ReadTimeout('')
Fetched 20 Locations and 2401 Timepoints and added 5/48020 Elements to Database 



  0%|▏                                      | 146/35360 [00:01<06:25, 91.35it/s]

OneCallAPI reached limit at counter=5 and timestamp=datetime.datetime(2022, 10, 30, 16, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200), 'CEST')): ReadTimeout('')
OneCallAPI reached limit at counter=5 and timestamp=datetime.datetime(2022, 10, 30, 16, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200), 'CEST')): ReadTimeout('')


  0%|▏                                      | 158/35360 [00:09<35:54, 16.34it/s]


Fetched 20 Locations and 1768 Timepoints and added 8/35360 Elements to Database 
