# Weather Forecast Sample Data Ingest
This template fetches the current forecast for a particular zip code and persists those data to the Metis Machine data store.
This example actually leverages two external APIs, one to query geographic coordinates for a given zip code, and the second to fetch weather data for those coordinates.

In [1]:
import os
import requests
from datetime import datetime
import pandas as pd

In [2]:
from skafossdk import *
print('initializing the SDK connection')
skafos = Skafos()

initializing the SDK connection
2018-01-05 20:04:04,223 - skafossdk.data_engine - INFO - DataEngine Connection Opened


In [4]:
# Weather API Details
weather_api_key = os.environ['DARKSKY_KEY']
weather_api_url = "https://api.darksky.net/forecast/"

# ZipCode to Long, Lat API Details
location_api_key = os.environ['ZIPCODEAPI_KEY']
location_api_url = "https://www.zipcodeapi.com/rest/"   

2018-01-05 20:04:05,201 - skafossdk.monitor - INFO - Monitor Connection Opened


In [5]:
def get_location_for_zip(zipcode):
    """ use the zipcodeapi.com endpoint
        Args:
            zipcode (int): the requested location by zip code, cast to string if not already
        Returns:
            dict: dictionary containing keys 'lng' and 'lat'
    """
    url = location_api_url + location_api_key + '/info.json/' + str(zipcode).strip() + '/degrees'
    print('fetching {}'.format(url))
    return requests.get(url).json()

In [6]:
def get_forecast(lon, lat):
    """ Use the darksky.net endpoint
        Args:
            lon (float): longitude (x coordinate) to request weather forecast for
            lat (float): lattitude (y coordinate) to request weather forecast for
        Returns:
            dict: the darksky forecast json as a dictionary
    """
    url = weather_api_url + weather_api_key + '/{},{}'.format(lat, lon)
    print('fetching {}'.format(url))
    return requests.get(url).json()

In [7]:
location_zipcodes = ["23250"]  # 23250 Richmond Airport

In [8]:
def forecast_rows(zipcodes):
    """ Map a list of zipcodes into individual data rows containing forecast data per day
        Args:
            zipcodes (list(str)): locations to fetch weather forecasts for
        Returns:
            list(dict): data rows per forecast day, per location
    """
    date_fetched = datetime.now()
    for zipcode in zipcodes:
        ll = get_location_for_zip(zipcode)
        forecast = get_forecast(ll['lng'], ll['lat'])
        for day in forecast['daily']['data']:
            yield {
                'source': 'Darksky',
                'date_fetched': date_fetched,
                'date': datetime.fromtimestamp(day['time']),
                'zipcode': zipcode,
                'latitude': ll['lat'],
                'longitude': ll['lng'],
                'tmax': day['temperatureHigh'],
                'tmin': day['temperatureLow'],
                'humidity': day['humidity'],
                'wind_speed': day['windSpeed'],
                'pressure': day['pressure'],
                'precip_total': day['precipIntensityMax'],
                'precip_prob': day['precipProbability'],
                'sunrise': datetime.fromtimestamp(day['sunriseTime']),
                'sunset': datetime.fromtimestamp(day['sunsetTime']),
                'cloud_cover': day['cloudCover'],
                'heat_index': day['apparentTemperatureHigh']
            }

In [9]:
# forecast_rows returns a list of dictionaries, which is directly convertable to a Pandas dataframe
forecast_data = pd.DataFrame(forecast_rows(location_zipcodes))

fetching https://www.zipcodeapi.com/rest/XJyHXErW0Xzm6WqyPPJDZrlSKse2jlHh4gY2UqYUoyhvuI3kxNXwejpZMD25ZJm3/info.json/23250/degrees
fetching https://api.darksky.net/forecast/0ab192d88a91b5cbf1904ee551f58b2c/37.504787,-77.320651


In [10]:
# cast datetimes to just date for persisting to the database
forecast_data['date'] = forecast_data['date'].apply(lambda d: d.date())
forecast_data['date_fetched'] = forecast_data['date_fetched'].apply(lambda d: d.date())

In [11]:
# validate that the returned data is what we expect
forecast_data.iloc[:3]

Unnamed: 0,cloud_cover,date,date_fetched,heat_index,humidity,latitude,longitude,precip_prob,precip_total,pressure,source,sunrise,sunset,tmax,tmin,wind_speed,zipcode
0,0.0,2018-01-05,2018-01-05,16.6,0.46,37.504787,-77.320651,0.14,0.0023,1021.53,Darksky,2018-01-05 12:25:32,2018-01-05 22:06:06,24.93,6.13,6.6,23250
1,0.0,2018-01-06,2018-01-05,8.41,0.51,37.504787,-77.320651,0.12,0.0003,1031.82,Darksky,2018-01-06 12:25:31,2018-01-06 22:06:58,20.21,4.51,8.03,23250
2,0.16,2018-01-07,2018-01-05,18.87,0.51,37.504787,-77.320651,0.05,0.0002,1036.41,Darksky,2018-01-07 12:25:29,2018-01-07 22:07:51,24.01,16.07,2.02,23250


### Persist forecast data
Save these forecast data for later use via the Skafos SDK. This requires specifying a schema for how we want to store these records.

In [12]:
# types here are as-specified in SQL (CQL really) rather than python
schema = {
    "table_name": "weather_forecast_by_zip",
    "options": {
        "primary_key": ['date', 'date_fetched', 'zipcode', 'source'],
        "order_by": ['date_fetched desc']
    },
    "columns": {
        'source': 'text',
        'date_fetched': 'date',
        'date': 'date',
        'zipcode': 'text',
        'latitude': 'float',
        'longitude': 'float',
        'tmax': 'float',
        'tmin': 'float',
        'humidity': 'float',
        'wind_speed': 'float',
        'pressure': 'float',
        'precip_total': 'float',
        'precip_prob': 'float',
        'sunrise': 'timestamp',
        'sunset': 'timestamp',
        'cloud_cover': 'float',
        'heat_index': 'float'
    }
}

In [13]:
data_out = forecast_data.dropna().to_dict(orient='records')

In [16]:
dataresult = skafos.engine.save(schema, data_out).result()

2018-01-05 20:07:25,260 - skafossdk.data_engine - INFO - Saving Data ...
2018-01-05 20:07:25,262 - skafossdk.data_engine - INFO - Sending msg...


In [17]:
dataresult

{'data': [{'success': True}]}