In [1]:
import numpy as np
import pandas as pd
import benedict

import config

client = config.client
db = client[config.database]
col = db[config.weathers_collection]

In [2]:
pd.set_option('display.max_colwidth', None)
pd.options.display.float_format = '{:,.2f}'.format

In [3]:
def update_keys(ref, check, as_kv_list=False):
    ''' Make sure that the keys for check are the same as those in ref.
    
    :param ref: the dictionary whose keys are to be referenced
    :type ref: dict
    :param check: the dict to have its keys checked and uppdated
    :type check: dict
    '''
    
    if not as_kv_list:
        keys1 = ref.keys()
        keys2 = check.keys()
    else:
        keys1 = [tup[0] for tup in ref]
        keys2 = [tup[0] for tup in check]
    diff1 = keys2 - keys1  #keys in dict2 that are not in dict1
    diff2 = keys1 - keys2  #keys in dict1 that are not in dict2
    for item in diff1:
        check.pop(item)
    for item in diff2:
        check[item] = None
    return

def strip_keys(df):
    ''' Take a pandas.DataFrame and replace each dict with the list of
    its values.
    '''
    
    t = []
    
    def dict_strip(x):
        ''' Strip the keys from a dict. '''
        
        if isinstance(x, dict):
            return [x for x in x.values()]
        else:
            return x
    
    for row in df.iterrows():
        temp = []
        for item in row[1]:
            temp.append(dict_strip(item))
        t.append(pd.Series(temp, name=row[1].name, dtype=object))
    return pd.concat(t, axis=1, ignore_index=False)

def compare_dicts(one, the_other, return_type='dict', as_kv_list=False):
    ''' Compare the values of two dicts, key by common key. When the values are
    numbers, return the difference: when strings, return 0: if the strings are
    equal, 0, 1 if they are different: when dicts, run this function: if it's a
    list then step through it, running this function on each element: when
    NoneType, set it to a flag value.

    :params one, the_other: dictionaries with the same set of keys and sub-keys
    :type one, the_other: dict
    '''
    
    delta = {}  # The delta document. Contains all the forecast errors
    
    if as_kv_list:
        for (k, v) in one:
            try:
                # Check and compare dictionaries according to their value type
                if type(v) == int or type(v) == float:
                    if type(the_other[k]) == int or type(the_other[k]) == float:
                        delta[k] = v - the_other[k]
                elif type(v) == dict:
                    delta[k] = compare_dicts(
                        v, the_other[k], return_type='list', as_kv_list=True)
                elif type(v) == str:
                    if v == the_other[k]:
                        delta[k] = 0
                    else:
                        delta[k] = 1
                elif type(v) == list:
                    delta[k] = [
                        compare_dicts(item, other_item)
                        for item, other_item
                        in list(zip(v, the_other[k]))
                    ]
                elif type(v):
                    delta[k] = 0
            except KeyError as e:
                print(f'missing key..... {e}')
    else:
        for (k, v) in one.items():
            try:
                # Check and compare dictionaries according to their value type
                if type(v) == int or type(v) == float:
                    if type(the_other[k]) == int or type(the_other[k]) == float:
                        delta[k] = v - the_other[k]
                elif type(v) == dict:
                    delta[k] = compare_dicts(v, the_other[k], return_type='list')
                elif type(v) == str:
                    if v == the_other[k]:
                        delta[k] = 0
                    else:
                        delta[k] = 1
                elif type(v) == list:
                    delta[k] = [
                        compare_dicts(item, other_item)
                        for item, other_item
                        in list(zip(v, the_other[k]))
                    ]
                elif type(v):
                    delta[k] = 0
            except KeyError as e:
                print(f'missing key..... {e}')
    if return_type == 'dict':
        return delta
    if return_type == 'list':
        return [v for v in delta.values()]

def tups_to_dict(tups):
    ''' Convert a list of tuples to a dictionary. '''
    dicti = {}
    for a, b in tups:
        dicti.setdefault(a, b)
    return dicti 


In [4]:
def read_mongo_to_df(collection, filters={}, limit=None):
    ''' Read a MongoDB cursor to a pandas DataFrame.
    Arguments are "collection", which must be a MongoDB
    client.database.collection object, and "filters", which
    can be a well formed mongo query. "limit" will limit
    the number of documents returned on the cursor.
    '''

    documents = collection.find(filters)[:limit]
    return pd.DataFrame.from_records([doc for doc in documents])

def records_to_rows(col, filters={}, limit=100):
    ''' Request records from the database collection and convert it to a
    pandas.DataFrame. All records are set with keys as column names and
    '_id' as the index.
    '''
    
    docs = col.find(filters, batch_size=100)[:limit]
    weathers = pd.DataFrame()
    temp = []
    for row in docs:
        if isinstance(row, dict):
            # Lookout for the occurance of a list and handle appropriately.
            for v in row.values():
                if isinstance(v, list):
                    row['weather'] = row['weather'][0]
            # These next lines convert the dicts to benedicts before
            # flattening, sorting by keys, and then converting back to dicts.
            bene = benedict.benedict.flatten(row)
            flat_bene = benedict.benedict(bene)
            sorted_flat_bene = flat_bene.items_sorted_by_keys()
            sorted_flat_dict = tups_to_dict(sorted_flat_bene)
            # Store in temp list as a pandas.DataFrame.
            temp.append(pd.DataFrame(sorted_flat_dict, index=[row['_id']]))
    return pd.concat(temp)

def read_mongo_a(col, filters={}, limit=None):
    ''' Retrieve data from the Mongo database and transform it to a pandas
    DataFrame; return the DataFrame.

    :param col: the MongoDB collection to be read
    :type collection: pymongo.collection.Collection
    :param filters: a well formed MongoDB query
    :type filters: dict
    :param limit: optional limiter to the number of documents retrieved
    :type limit: int
    '''

    # Shorten the cursor length if limit is given, otherwise get everything;
    # transform the retrieved data to a pandas.DataFrame and return it.
    docs = col.find(filters)[:limit]
    weathers = []
    for doc in docs:
        if isinstance(doc, dict):
            for v in doc.values():
                if isinstance(v, list):
                    doc['weather'] = doc['weather'][0]
        # Convert the dict to a benedict, flatten it, sort it, convert it back
        # to a dict, and finally transform the dict to a DataFrame and append
        # it to a list to tbe concatted to together.
        bene = benedict.benedict(doc).flatten().items_sorted_by_keys()
        dic = tups_to_dict(bene)
        df = pd.DataFrame.from_dict(dic, orient='index')
        weathers.append(df.transpose())
    if limit:
        print(f'The length of your df has been limited to {limit}.')
    return pd.concat(weathers)


In [13]:
def find_item_with_kv_pair(series, key, value):
    '''Find and return the first item in a given pandas Series that has the
    given key-value pair.
    
    :param series: a pandas series
    :type series: pandas.Series
    :param key: the key the function should search for
    :type key: str
    :param value: the value the function should compare to
    :type value: I think anything that '==' can be used with
    
    :returns: the object found or None or raises TypeError
    '''
    
    if isinstance(series, pd.Series):
        for item in series:
            if isinstance(item, dict):
                if key in item:
                    if item[key] == value:
                        return item
            elif isinstance(item, list):
                for elem in item:
                    if elem[0] == key:
                        if elem[1] == value:
                            return item
        return None
    else:
        raise TypeError("find_item_with_key() wants a pandas.Series.")
        return
def flatten_to_series(df):
    ''' A function to convert a DataFrame to a Series.
    This function takes each row of the dataframe and represents it as a
    Series with a given index made by the string concatenation of the row
    number and the column name.
    
    :param df: the dataframe to be flattened
    :type df: pandas.DataFrame
    '''

    index = []
    data = []
    for row in df.iterrows():
        for d, i in zip(row[1], row[1].index):
            index.append(str(i)+str(row[0]))
            data.append(d)
    d = pd.DataFrame(data, index=index)
    return d

def flatten_to_single_row(df):
    ''' A function to convert a DataFrame to a single row DataFrame.
    This function takes each row of the dataframe and represents it as a
    single DataFrame row with a given index made by the string concatenation
    of the row number and the column name.
    
    :param df: the dataframe to be flattened
    :type df: pandas.DataFrame
    '''

    df.reset_index(inplace=True)
    index = []
    data = []
    for row in df.iterrows():
        for d, i in zip(row[1], row[1].index):
            index.append(str(i)+str(row[0]))
            data.append(d)
    d = pd.DataFrame(data, index=index)
    return d.transpose()

def flat_and_concat(flist):
    ''' Flatten a list of DataFrames and concat the flattened versions
    together and return as a single DataFrame.
    
    :param flist: At list of pandas.DataFrames.
    :type flist: list
    '''
    D = []
    if not isinstance(flist, list):
        raise TypeError('flat_and_concat() has to have a list of DataFrames.')
        return
    for item in flist:
        if isinstance(item, pandas.DataFrame):
            D.append(flatten_to_single_row(item))
    return pd.concat(D)

def make_instants(df, _return=True):
    ''' Convert the rows of the weathers collection DataFrame to a DataFrame of
    instants.
    '''
    
    d = []
    timeplaces = df.index.unique(level='timeplace')
    for tp in timeplaces:
        temp_df = df.loc[tp]
        d.append(flatten_to_single_row(temp_df))
    if _return:
        return pd.concat(d)
    else:
        np.save('instants.npy', pd.concat(d))
        return

def make_inst(df):
    ''' Create instant Series from the DataFrame: step through each row of the
    DataFrame and check the count of the row. If it is 42 or more, drop any na
    values, flatten each dict and append the Series to a new DataFrame and
    return it.
    '''
    
    instants = []
    for row in df.iterrows():
        if row[1].count() <= 37:
            continue
        row[1].dropna(inplace=True)
#         row[1].name = row[0]
        obs = find_item_with_kv_pair(row[1], 'type', 'obs')
        for item in row[1].iteritems():
            if isinstance(item[1], dict):
                for v in item[1].values():
                    if isinstance(v, list):
                        item[1]['weather'] = item[1]['weather'][0]
                if obs != None:
                    if item[1]['type'] == 'cast':
                        update_keys(item[1], obs)
            if isinstance(item[1], list) and obs != None:
                if item[1][0] == 'cast':
                    update_keys(item[1], obs)
        # These next lines convert the dicts to benedicts before flattening,
        # sorting by keys, and then converting back to dicts.
        flat_data = row[1].apply(benedict.benedict.flatten)
        sorted_items = flat_data.apply(benedict.benedict.items_sorted_by_keys)
        flat_sorted_data = sorted_items.apply(tups_to_dict)
        instants.append(flat_sorted_data)
    instants = pd.concat(instants, axis=1, ignore_index=False).transpose()
    np.save('instants.npy', instants)
    return instants

def make_data(series):
    ''' Take a pandas.Series and compare each of the items to one of the other
    items (dict comparisons) and return a pandas.Series of comparison results.
    '''

    data = []
    
    def key_strip(x):
        ''' Strip the keys from a dict. '''

        if isinstance(x, dict):
            return [x for x in x.values()]
        else:
            return x

    for item in series.iteritems():
        if isinstance(item[1], dict):
            data.append(key_strip(item[1]))
    return pd.Series(data, name=series.name, dtype=object)

def make_data_df(df):
    ''' Create the DataFrame that will contain the data to be used as the
    Data dataset to go along with the Target dataset. First make the instants
    DataFrame, then go through it row by row and remove all the items that
    are observation data. Finally save.
    '''
    
    data = []
    for row in df.iterrows():
#         data.append(make_data(row[0]))
#         row[1].name = row[0]
        obs = find_item_with_kv_pair(row[1], 'type', 'obs')
        for item in row[1].iteritems():
            if isinstance(item[1], dict):
                if obs != None:
                    if item[1]['type'] == 'obs':
#                         print(item)
                        row[1].pop(item[0])
                        break
        data.append(make_data(row[1]))
    data_df = pd.concat(data, axis=1, ignore_index=False).transpose()
#     data_df = strip_keys(data_df)#.transpose()
    np.save('forecast_values.npy', data_df)
    return data_df
    

def make_deltas(series):
    ''' Take a pandas.Series and compare each of the items to one of the other
    items (dict comparisons) and return a pandas.Series of comparison results.
    '''

    deltas = []
    obs = find_item_with_kv_pair(series, 'type', 'obs')
    for item in series:
        if isinstance(item, dict) and obs != None:
            if item['type'] == 'cast':
                update_keys(item, obs)
                deltas.append(compare_dicts(obs, item, return_type='list'))
        if isinstance(item, list) and obs != None:
            if item[0] == 'cast':
                update_keys(item, obs)
                deltas.append(compare_dicts(obs, item, return_type='list'))
    return pd.Series(deltas, name=series.name, dtype=object)

def make_deltas_df(df):
    ''' Build the complete deltas DataFrame. '''
    
    deltas = []
    deltas_df = pd.DataFrame()
    
    # Create a DataFrame of the delta documemnts derived from the rows of
    # the supplied DataFrame. Add the DataFrame to a list so that it all
    # concatinates to a DataFrame. Then, row by row create the "deltas" for
    # the data and add it to the list. Finally concat all that together.
    deltas.append(deltas_df)
    for row in df.iterrows():
        deltas.append(make_deltas(row[1]))
    deltas_df = pd.concat(deltas, axis=1, ignore_index=False).transpose()
    np.save('delta_values.npy', deltas_df)
    return deltas_df


In [6]:
records_df = records_to_rows(col, limit=100000)

In [7]:
records_df.shape

(100000, 29)

In [20]:
records_df.columns

Index(['clouds_all', 'instant', 'location_lat', 'location_lon',
       'main_feels_like', 'main_grnd_level', 'main_humidity', 'main_pressure',
       'main_sea_level', 'main_temp', 'main_temp_kf', 'main_temp_max',
       'main_temp_min', 'type', 'visibility', 'wind_deg', 'wind_speed',
       'rain_3h'],
      dtype='object')

In [8]:
rdf = records_df
rdf.shape

(100000, 29)

In [9]:
import pinky

drop_cols = ['_id',
 'dt',
 'dt_txt',
 'pop',
 'sys_pod',
 'weather_description',
 'weather_icon',
 'weather_id',
 'weather_main',
]
rdf.drop(columns=drop_cols, inplace=True)
rdf['tt_inst'] = rdf.loc[:, 'tt_inst'].apply(pinky.favor, trans=False)
rdf.set_index(['timeplace', 'tt_inst'], inplace=True)
rdf.sort_index(inplace=True)
rdf.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,clouds_all,instant,location_lat,location_lon,main_feels_like,main_grnd_level,main_humidity,main_pressure,main_sea_level,main_temp,main_temp_kf,main_temp_max,main_temp_min,type,visibility,wind_deg,wind_speed,rain_3h
timeplace,tt_inst,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
dnh00s0000001599825600,270000,34,1599825600,33.77,-84.35,298.11,984,82,1018,1018,296.46,0,296.46,296.46,cast,10000,73,2.96,
dnh00s0000001599825600,280800,87,1599825600,33.77,-84.35,298.93,984,82,1018,1018,296.75,0,296.75,296.75,cast,10000,71,2.4,
dnh00s0000001599825600,291600,87,1599825600,33.77,-84.35,298.93,984,82,1018,1018,296.75,0,296.75,296.75,cast,10000,71,2.4,
dnh00s0000001599825600,302400,9,1599825600,33.77,-84.35,298.27,985,76,1018,1018,297.11,0,297.11,297.11,cast,10000,61,3.27,
dnh00s0000001599825600,313200,9,1599825600,33.77,-84.35,298.27,985,76,1018,1018,297.11,0,297.11,297.11,cast,10000,61,3.27,


In [10]:
rdf.shape

(100000, 18)

In [11]:
rdf.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,clouds_all,instant,location_lat,location_lon,main_feels_like,main_grnd_level,main_humidity,main_pressure,main_sea_level,main_temp,main_temp_kf,main_temp_max,main_temp_min,type,visibility,wind_deg,wind_speed,rain_3h
timeplace,tt_inst,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
dnh00s0000001599825600,270000,34,1599825600,33.77,-84.35,298.11,984,82,1018,1018,296.46,0,296.46,296.46,cast,10000,73,2.96,
dnh00s0000001599825600,280800,87,1599825600,33.77,-84.35,298.93,984,82,1018,1018,296.75,0,296.75,296.75,cast,10000,71,2.4,
dnh00s0000001599825600,291600,87,1599825600,33.77,-84.35,298.93,984,82,1018,1018,296.75,0,296.75,296.75,cast,10000,71,2.4,
dnh00s0000001599825600,302400,9,1599825600,33.77,-84.35,298.27,985,76,1018,1018,297.11,0,297.11,297.11,cast,10000,61,3.27,
dnh00s0000001599825600,313200,9,1599825600,33.77,-84.35,298.27,985,76,1018,1018,297.11,0,297.11,297.11,cast,10000,61,3.27,


In [None]:
inst_df = make_instants(records_df)

In [17]:
inst_df.head()

Unnamed: 0,tt_inst0,clouds_all0,instant0,location_lat0,location_lon0,main_feels_like0,main_grnd_level0,main_humidity0,main_pressure0,main_sea_level0,...,main_sea_level10,main_temp10,main_temp_kf10,main_temp_max10,main_temp_min10,type10,visibility10,wind_deg10,wind_speed10,rain_3h10
0,270000,34,1599825600,33.77,-84.35,298.11,984,82,1018,1018,...,1018.0,297.52,0.0,297.52,297.52,cast,10000.0,83.0,1.32,
0,280800,92,1599836400,33.77,-84.35,301.3,985,73,1018,1018,...,1018.0,302.46,0.0,302.46,302.46,cast,10000.0,98.0,1.69,
0,291600,91,1599847200,33.77,-84.35,304.34,984,58,1017,1017,...,1016.0,305.02,0.0,305.02,305.02,cast,10000.0,107.0,1.36,
0,302400,83,1599858000,33.77,-84.35,303.42,981,60,1014,1014,...,,,,,,,,,,
0,313200,89,1599868800,33.77,-84.35,300.79,982,72,1015,1015,...,,,,,,,,,,


In [18]:
inst_df.describe()

Unnamed: 0,tt_inst0,clouds_all0,instant0,location_lat0,location_lon0,main_feels_like0,main_grnd_level0,main_humidity0,main_pressure0,main_sea_level0,...,main_sea_level10,main_temp10,main_temp_kf10,main_temp_max10,main_temp_min10,type10,visibility10,wind_deg10,wind_speed10,rain_3h10
count,16054,16054,16054,16054.0,16054.0,16054.0,16054,16054,16054,16054,...,1545,1545.0,1545,1545.0,1545.0,1545,1545,1545,1545.0,489.0
unique,15,101,15,32.0,40.0,1598.0,143,47,7,7,...,4,865.0,1,865.0,865.0,1,3,289,185.0,179.0
top,313200,100,1599868800,33.9,-84.0,297.01,988,96,1016,1016,...,1018,301.33,0,301.33,301.33,cast,10000,146,1.17,0.29
freq,1080,2456,1080,592.0,480.0,42.0,593,1316,5045,5045,...,687,9.0,1545,9.0,9.0,1545,1543,22,30.0,10.0


In [15]:
for row in inst_df.columns:
    print(row)

tt_inst0
clouds_all0
instant0
location_lat0
location_lon0
main_feels_like0
main_grnd_level0
main_humidity0
main_pressure0
main_sea_level0
main_temp0
main_temp_kf0
main_temp_max0
main_temp_min0
type0
visibility0
wind_deg0
wind_speed0
rain_3h0
tt_inst1
clouds_all1
instant1
location_lat1
location_lon1
main_feels_like1
main_grnd_level1
main_humidity1
main_pressure1
main_sea_level1
main_temp1
main_temp_kf1
main_temp_max1
main_temp_min1
type1
visibility1
wind_deg1
wind_speed1
rain_3h1
tt_inst2
clouds_all2
instant2
location_lat2
location_lon2
main_feels_like2
main_grnd_level2
main_humidity2
main_pressure2
main_sea_level2
main_temp2
main_temp_kf2
main_temp_max2
main_temp_min2
type2
visibility2
wind_deg2
wind_speed2
rain_3h2
tt_inst3
clouds_all3
instant3
location_lat3
location_lon3
main_feels_like3
main_grnd_level3
main_humidity3
main_pressure3
main_sea_level3
main_temp3
main_temp_kf3
main_temp_max3
main_temp_min3
type3
visibility3
wind_deg3
wind_speed3
rain_3h3
tt_inst4
clouds_all4
instant4
lo

In [None]:
make_data_df(inst)

In [None]:
make_deltas_df(inst)