<a href="https://colab.research.google.com/github/MarkStephens060482/Big-Data-projects/blob/main/Concurrent_API_calls.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Concurrent Historical Weather API requests
Individual historical weather observations based on Latitude and Longitude coordinate location and datetime incident.

1. Dataframe with LATITUDE, LONGITUDE, DATE and TIME features is split into batches of specified **batch_size**.
2. api keys are rotated amoungst each batch.
3. one at a time each batch is passed to the  **get_weather_in_batches()** function, under a max concurrency limit.
4. The program then loops through each row in the batch and an API query is built based on the feature values.
5. The API call is performed. If any of the API calls failure due to the API service system constraints, the whole batch is tried again up to the specified **max_retries**.
6. The results from the API call are converted to json and added to a list.
7. This is performed asynchronously for all batches and the results are gathered and awaited upon until all batches are completed.
8. All API results in JSON in each batch are flattened into a single list.
9. The complete list is converted to DataFrame and is joined to the original DataFrame in correct order, and is written to CSV.

In [None]:
import math, time
import pandas as pd
import csv
import httpx
import asyncio
import sys
import json
import datetime as dt

###API Service###
'https://www.visualcrossing.com/weather-api'

# Define global configuration variables
config = {
    'max_concurrent_batches': 5, # Adjust the value based on fair use limits
     #'time_period_minutes': 10,  # Adjust the value based on fair use limits
    'batch_size': 200, # adjust based on system performance
    'max_retries': 3,
    'api_keys': ['api_key_1', 'api_key_2', 'api_key_3'],  # enter your own API keys from service subscription account(s).
    'max_rate_limit':20 # requests per second
}

#define the fuction to perform api calls to get historical weather data.
async def get_weather_in_batches(batch, api_key,max_retries):
    '''
    '''
    async with httpx.AsyncClient() as client:
      for retry in range(max_retries + 1):
        try:
          batch_results = []
          for row in batch.to_dict(orient="records"):
            # Create the query parameters based on row features and the API key
            ApiQuery = create_query(row)
            #query parameters
            url_params = {'unitGroup':'metric', #UnitGroup sets the units of the output - SI metrics
                          'contentType':'json',
                          'include':'current', #Include current conditions of weather
                          'maxDistance':'25000',#Interpolate weather from station within max distance of 2km
                          'maxStations':'3', #Interpolate weather from 3 stations within max distance.
                          #'allowAsynch':'true', # allow synchronous requests
                          'aggregateHours':'0',
                          'aggregateMinutes':'15',
                          'combinationMethod':'aggregate',
                          'key':api_key
                          }
            # Send the API request with query parameters
            response = await client.get(ApiQuery, params = url_params)
            # Raise an exception if the response status code is an error
            response.raise_for_status()
            # Convert the response to JSON
            json_weatherdata = response.json()
            #add result to batch
            batch_results.append(json_weatherdata['currentConditions'])
            # Calculate the delay based on the maximum rate limit
            delay = 1 / config['max_rate_limit']
            # Introduce the delay before making the next request
            await asyncio.sleep(delay)

          return batch_results
        #error handling
        except httpx.HTTPError as e:
          if e.response.status_code == 429:
            print("Fair use limit exceeded. Retrying the batch...")
            #Retry the entire batch
            continue
          else:
            print(f"HTTP error occurred: {e}")
            break  # Break the retry loop for other HTTP errors
        except Exception as e:
              print(f"Exception occurred: {e}")
              break  # Break the retry loop for other errors

    # Return empty results if all retries failed
    return[]

# Create the query parameters based on row features.
def create_query(row):
  '''
  '''
  #API base url
  BaseURL = 'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/'
  # Modify this function based on the structure of your API and DataFrame
  Location = f"{row['LATITUDE']},{row['LONGITUDE']}"
  date = row['ACCIDENT_DATE']
  time = row['ACCIDENT_TIME']
  startDateTime = f"{date}T{time}"
  #basic query including location
  ApiQuery = f"{BaseURL}{Location}/{startDateTime}"
  return ApiQuery

def split_df_chunks(data_df, chunk_size):
    total_length = len(data_df)
    total_chunk_num = math.ceil(total_length / chunk_size)
    chunks = []

    for i in range(0, total_length, chunk_size):
        chunk = data_df[i : i + chunk_size]
        chunks.append(chunk)

    return chunks


# Function to process the DataFrame rows asynchronously
async def process_batches(data_df, api_keys, max_retries, batch_size):
  """
  """
  total_length = len(data_df)
  semaphore = asyncio.Semaphore(config['max_concurrent_batches'])
  tasks = []

  # split df into batches
  batches = split_df_chunks(data_df,batch_size)

  request_number = 0
  elapsed_time = 0
  for batch_count,batch in enumerate(batches):

    # sequentially assign an API key based on the batch
    api_key = api_keys[batch_count % len(api_keys)]
    # Start time of batch processing
    start_time = time.perf_counter()
    # Perform chunk API call in loop with retry batch method
    async with semaphore:
          # perform batch API tasks in loop
          results = asyncio.create_task(get_weather_in_batches(batch, api_key,max_retries))
          tasks.append(results)

          request_number += len(batch)
          # Print the progress of successfully processed batches
          print(f"Processed {request_number*100/total_length: 0.2f}% of total requests successfully.")
    # End time of batch processing
    end_time = time.perf_counter()
    elapsed_time += (end_time - start_time)
    print(f"Total time taken to process batches: {elapsed_time: 0.4f} seconds")
    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks,return_exceptions=True)

  return results


### Main Code ###
async def main(df):
  start_time = time.perf_counter()
  # Perform asynchronous processing on each partition in batches
  results = await process_batches(df,
                                  api_keys = config['api_keys'],
                                  max_retries = config['max_retries'],
                                  batch_size = config['batch_size'])

  # Convert the results to Row objects with the defined schema
  Flatten_results = [result for batch_results in results for result in batch_results]

  # Create DataFrame using the rows with schema
  df_with_results = pd.DataFrame(Flatten_results)

  # Append the new DataFrame to the original DataFrame
  output_df = pd.concat([df, df_with_results], axis=1,copy = True)
  end_time =  time.perf_counter()
  time_duration = end_time - start_time
  print(f'Total computation time is : {time_duration: 0.1f} seconds')
  print(f'Average computation time is : {time_duration/len(df_with_results): 0.1f} seconds per row')
  return output_df

if __name__ == '__main__':

  output_df = await main(weather_df)
