# Module Code

In [54]:
import json, glob, boto3, os
import pdb
import pandas as pd
from fastparquet import write, ParquetFile
import pytz
import s3fs
import botocore
from collections import Iterable
from typing import List, Union
from copy import deepcopy
import warnings

In [5]:
session = boto3.Session()
BUCKET_NAME = 'snowbot-pv'

# S3 Connect
s3 = session.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)

In [72]:
# parquet engines don't handle shifted timezones
TZ = pytz.timezone('America/Vancouver')

DATA_DIR = "../data/"
MERGED_JSON_FILENAME = "merged_file.json"
merged_json_file = DATA_DIR + MERGED_JSON_FILENAME

# Used for weather data in jsons_to_df()
weather_meta_fields = [
    'newSnow', 'last24Hours', 'last48Hours', 'last7Days', 'midMountainBase',
    'resortID'
]
weather_record_path = ['weather', 'weatherForecast']
weather_meta = [['weather', i] for i in weather_meta_fields]
weather_meta.append('timestamp')

# Used for lift and terrain status in jsons_to_df()
# Important to set categories because when writing incrementally to parquet, some increments
# may not include all statuses.  Manually setting the categories avoids errors due to
# different catergory indexing between increments.
status_cat_dtype = pd.api.types.CategoricalDtype(categories=['X', 'H', 'O'],
                                                 ordered=True)
groomed_cat_dtype = pd.api.types.CategoricalDtype(categories=['No', 'Yes'],
                                                  ordered=True)

# Column dtypes that are to be set for each dataframe
df_dtypes = {
    "lifts": {
        'liftID': 'category',
        'resortID': 'category',
        'liftName': 'category',
        'status': status_cat_dtype,
        'timeToRide': 'object'
    },
    'terrain': {
        'runID': 'category',
        'resortID': 'category',
        'groomed': groomed_cat_dtype,
        'runName': 'category',
        'runType': 'category',
        'status': status_cat_dtype,
        'terrainName': 'category'
    },
    'weather': {
        'resortID': 'category',
        'forecast.dayDescription': 'object',
        'forecast.daycode': 'category',
        'forecast.forecastString': 'object',
        'forecast.iconName': 'object',
        'forecast.summaryDescription': 'object',
        'forecast.temperatureHigh': 'object',
        'forecast.temperatureLow': 'object',
        'weather.last24Hours': 'object',
        'weather.last48Hours': 'object',
        'weather.last7Days': 'object',
        'weather.midMountainBase': 'object',
        'weather.newSnow': 'object'
    }
}


def flatten(items):
    """Yield items from any nested iterable"""
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
            for sub_x in flatten(x):
                yield sub_x
        else:
            yield x


# The columns that serve to identify records for each topic
topic_ID_col_names = {
    'lifts': ['resortID', 'liftName'],
    'terrain': ['resortID', 'runID', 'terrainName'],
    'weather': 'resortID',
    'all_topics': 'timestamp'
}
# All of the column names that serve to identify records in at least one of the topics
all_ID_col_names = set(flatten(topic_ID_col_names.values()))

In [91]:
# from https://alexwlchan.net/2019/07/listing-s3-keys/
def get_matching_s3_objects(bucket, prefix="", suffix=""):
    """
    Generate objects in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch objects whose key starts with
        this prefix (optional).
    :param suffix: Only fetch objects whose keys end with
        this suffix (optional).
    """
    s3 = boto3.client("s3")
    paginator = s3.get_paginator("list_objects_v2")

    kwargs = {'Bucket': bucket}

    # We can pass the prefix directly to the S3 API.  If the user has passed
    # a tuple or list of prefixes, we go through them one by one.
    if isinstance(prefix, str):
        prefixes = (prefix, )
    else:
        prefixes = prefix

    for key_prefix in prefixes:
        kwargs["Prefix"] = key_prefix

        for page in paginator.paginate(**kwargs):
            try:
                contents = page["Contents"]
            except KeyError:
                return

            for obj in contents:
                key = obj["Key"]
                if key.endswith(suffix):
                    yield obj


def get_matching_s3_keys(bucket, prefix="", suffix=""):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix (optional).
    :param suffix: Only fetch keys that end with this suffix (optional).
    """
    for obj in get_matching_s3_objects(bucket, prefix, suffix):
        yield obj["Key"]


def merge_matching_jsons_on_s3(save_file, prefix="", suffix=""):
    """Merges json files on S3 that match the suffix into a new json and save it
    as the save_file on S3."""

    result = []

    for f in get_matching_s3_keys(BUCKET_NAME, prefix=prefix, suffix=suffix):

        # Write the file from S3 into a local temp file
        with open('temp', 'wb') as tfw:
            bucket.download_fileobj(f, tfw)

        # Append the local temp file into the result list
        with open('temp', 'rb') as tfr:
            result.append(json.load(tfr))

    os.remove("temp")

    # Fill the output file with the merged content
    with open(save_file, "w") as outfile:
        json.dump(result, outfile)

# TBD: more efficient to go straight to df w/o saving json to file


def set_df_datatypes(df, topic):
    """Set the datatypes for a df according to the topic that
    it represents."""
    df = df.astype(df_dtypes[topic])
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    return df


def jsons_to_df(jsons, record_path, meta='timestamp'):
    """Convert a json containing one or more timestamps to a dataframe."""
    if record_path == 'weather':
        # Deal with the nested object that the weather data uses to store the weather forecast
        df = pd.json_normalize(jsons, record_path=weather_record_path,
                               meta=weather_meta, record_prefix='forecast.')
        df.rename(columns={"weather.resortID": "resortID"}, inplace=True)
    else:
        df = pd.json_normalize(jsons, record_path=record_path,
                               meta=meta)

    df = set_df_datatypes(df, record_path)
    return df


def load_json_as_df(merged_json_file, record_path):
    """Load json file containing one or more timestamps as a dataframe."""
    with open(merged_json_file, "r") as f:
        d = json.load(f)
        df = jsons_to_df(d, record_path)
        return df


def get_data_changes(df, topic, keep_oldest=False):
    """
    Filter out rows that do not represent changed data.

    Parameters
    ----------
    df : pandas.DataFrame
        Includes 'timestamp' identifying and data columns.  Lists data for each timestamp.
    keep_oldest : boolean
        Indicates if the returned DataFrame should keep the oldest record for each entity (i.e.
        lift, resort, tor terrain) even if an entity has no data changes.  This is so that the
        earliest data for each entity is not lost, and all entities are listed the returned DataFrame
        even if their data has not changed.  Use `False` when there is just one DataFrame to process.
        Use `True` is cases where the data changes will be appended to an existing dataframe that
        already has at least one row for each entity.

    Returns
    -------
    pandas.DataFrame
        Only includes the rows from the original dataframe where there was a change to new values
        in the data columns.
    """
    ID_columns = topic_ID_col_names[topic]
    data_columns = [c for c in df.columns if c not in all_ID_col_names]

    def filter_for_data_changes(df, keep_oldest=keep_oldest):
        """Filter out rows where data is unchanged for adjacent timestamps.
        Required to handle cases when there are > 2 rows per entity.
        """
        keep_idx = df[data_columns].ne(df[data_columns].shift()).any(
            axis=1).values[1:]  # True for rows with data changes
        changed_rows = df.reset_index(drop=True).drop(index=0)[keep_idx]

        if keep_oldest:
            firstrow = df.loc[df['timestamp'].idxmin()]
            keep_df = firstrow.to_frame().T.append(changed_rows)
        else:
            keep_df = changed_rows

        return keep_df

    # Drop any rows that are complete duplicates so that conditional evaluation will
    # work.  This is required for Peak 2 Peak Gondola because it is duplicated in the
    # lifts data.  Maybe others as well.
    df.drop_duplicates(inplace=True)

    # 1 means that there were up to 2 rows fond per group
    if df.groupby(ID_columns, group_keys=False).cumcount().max() < 2:
        # Most efficient method.  Only works if there are 2 or less rows per entity.
        subset = df.columns.drop('timestamp')
        df = df.sort_values('timestamp')

        if keep_oldest:
            df = df.drop_duplicates(subset=subset, keep='first')
        else:
            df = df.drop_duplicates(subset=subset, keep=False)
            df = df.drop_duplicates(subset=ID_columns, keep='last')

    else:
        # Less efficient method.  Required if there are > 2 rows per entity.
        df = df.sort_values('timestamp').groupby(ID_columns, group_keys=False)\
               .apply(filter_for_data_changes)\
               .reset_index(drop=True)

    
    records_are_unique(df, include_timestamp_in_colnames(ID_columns))
    df = set_df_datatypes(df, topic)

    return df


def records_are_unique(df: pd.DataFrame, record_id_cols: List[str]) -> bool:
    """Check if records in df can be uniquely identified using record_id_cols
    and raise warning if they are not."""
    are_unique = df.set_index(record_id_cols).index.is_unique
    if not are_unique:
        warnings.warn(f"Records in dataframe are not uniquely identified by {record_id_cols}")
    return are_unique


def include_timestamp_in_colnames(col_names: Union[List[str], str]) -> List[str]:
    """Returns a list of strings which includes 'timestamp' in addition to the list
    or sting given for `col_names`.
    
    >>> include_timestamp_in_colnames(topic_ID_col_names['terrain'])
    ['resortID', 'runID', 'terrainName', 'timestamp']
    """
    col_names = deepcopy(col_names)
    if type(col_names) == str : col_names = [col_names]
    col_names.extend(['timestamp'])
    return col_names

# Process lift json fies

In [None]:
def get_status_durations(lifts_df):
    '''Calculate values and add columns for the time difference between the
    timestamp for the current status and the timestamp for the next status
    for each lift:
    `time_diff` column: Gives the duration that the lift was in the status indicated in the `status` column.
    `time_diff_seconds` column: `time_diff` converted to seconds.
    
    lifts_status_changes_df should be TBD
    '''
    # TBD: optimize if needed via # 3 under:
    # https://towardsdatascience.com/pandas-tips-and-tricks-33bcc8a40bb9
    df = lifts_df.sort_values(by=['resortID', 'liftID', 'timestamp'])
    df['time_diff'] = df.groupby(['resortID', 'liftID'])['timestamp'].diff(1).shift(-1)

    # Fill in the durations which will be missing for the most recent status changes
    missing_time_diffs_idx = df.loc[(df['time_diff'].isnull()) & (
        df['timestamp'] >= df['timestamp'].min()), 'timestamp'].index.values

    df.loc[missing_time_diffs_idx, 'time_diff'] = df['timestamp'].max(
    ) - df.loc[missing_time_diffs_idx, 'timestamp']

    # Convert to seconds
    df['time_diff_seconds'] = df['time_diff'].dt.total_seconds()

    return df

## Whistler Lifts

In [None]:
merge_matching_jsons_on_s3(suffix="lifts.json", save_file=merged_json_file)

In [None]:
whis_lifts_df = load_json_as_df(merged_json_file, 'lifts')

whis_lifts_status_changes_df = get_data_changes(whis_lifts_df, 'lifts', keep_oldest=True)

In [None]:
lifts_status_changes_df

**NOTE:** `timeToRide` is just the time is takes to ride the lift, not the current wait time:

In [None]:
whis_lifts_df.groupby("liftName")['timeToRide'].unique()

In [None]:
whis_lifts_df

In [None]:
whis_lifts_df = get_status_durations(whis_lifts_status_changes_df)

# Uses local date formatting, otherwise Tableau will mix up month and day
# alternatively, can export to json:
# lifts_status_changes_df.to_json(DATA_DIR + "lifts_status_changes.json", orient='table')
whis_lifts_df.to_csv(DATA_DIR + "whis_lifts_status_changes.csv", date_format='%c')


In [None]:
# add:
# 
# daily: for each chair calculate most open status of the day: O > H > X
# Days since each chair was last seen open with timestamp of most recent open time.
# snowfall since last open
# save data for other mountains

### Loop through JSON files for all topics

In [None]:
for topic in ['lifts', 'terrain', 'weather']:
    merge_matching_jsons_on_s3(suffix=topic + ".json", save_file=merged_json_file)
    df = load_json_as_df(merged_json_file, topic)
    status_changes_df = get_data_changes(df, topic, keep_oldest=True)
    
    if topic == 'lifts':
        get_status_durations(status_changes_df).to_csv(DATA_DIR + 'whis_lifts_status_changes.csv', date_format='%c')
    else:
        status_changes_df.to_csv(DATA_DIR + 'whis_' + topic + '_status_changes.csv', date_format='%c')

# Storage options testing

In [None]:
df.to_pickle(DATA_DIR + "df_test.pkl")

In [None]:
from fastparquet import write

# parquet engines don't handle shifted timezones
import pytz
TZ = pytz.timezone('America/Vancouver')
df['timestamp'] = df.timestamp.dt.tz_convert(pytz.utc)

In [None]:
# Note: May need snappy-python as a req to run on AWS Lambda
df.to_parquet(DATA_DIR + "df_test.parquet", engine='fastparquet')

In [None]:
load_df = pd.read_parquet(DATA_DIR + "df_test.parquet")
load_df['timestamp'] = load_df.timestamp.dt.tz_convert(TZ) # convert back to correct timezone


In [None]:
#TBD convert back to correct datatypes
load_df.dtypes

In [None]:
df.to_csv(DATA_DIR + "df_test.csv")

Test file size results:
- json: 800 Kb?
- csv: 474 Kb
- pickle: 145 Kb
- parquet: 15 Kb

## Delta Lake Notes

Requires apache spark instance.  For future use, could set one up to work with lambda using https://aws.amazon.com/emr/features/spark/?

Otherwise databricks (similar to QxMD project)

# Parquet on S3

For all topics from the EpicMix API.  Compare most recent topic data from json on S3 and if the data has changes, append the changes to parquet file on S3.

### Module Code

In [86]:
import requests
from datetime import datetime
fs = s3fs.S3FileSystem()
myopen = fs.open
nop = lambda *args, **kwargs: None

HISTORY_SUFFIX = '_history_DEV.parquet'
PRIOR_SUFFIX = '_prior_DEV.json'


def write_dataframe_to_parquet_on_s3(df, topic, fname):
    """ Write a dataframe to a Parquet file on S3.  Creates a new parquet file if one
    doesn't already exist.
    """

    def write_parquet(df, fname, app=True):

        output_file = f"s3://{BUCKET_NAME}/{fname}"
        write(output_file,
              df,
              # partition_on=['timestamp'],
              file_scheme='hive',
              append=app,  # need to remove or catch exception to work when file doesn't exist
              open_with=myopen,
              mkdirs=nop)
        print(f"Writing {len(df)} records to {fname}.")

    # Unshift the timezone because parquet engines don't handle shifted timezones
    df.loc[:, 'timestamp'] = df.loc[:, 'timestamp'].dt.tz_convert(pytz.utc)

    s3_object = bucket.Object(fname)

    if not list(bucket.objects.filter(Prefix=fname)):
        print(f"File {fname} not found.  Creating new file.")
        # Keep oldest record for each entity because creating new file
        df = get_data_changes(df, topic=topic, keep_oldest=True)
        write_parquet(df, fname, app=False)

    else:
        print(f"File {fname} found on S3.")
        df = get_data_changes(df, topic=topic, keep_oldest=False)
        write_parquet(df, fname, app=True)


def filter_resort(data, resortID: int = None) -> dict:
    """Filter for a specific resort."""
    if resortID:
        return data["resortID"] == resortID
    else:
        return data


def get_data(filter_topic: Union[str, List] = None, filter_resortID: int = None) -> dict:
    """Get data from EpicMix API. Defaults to all resorts.  Option to filter for a
    specific resort or topic.
    """
    API_URL = 'http://www.epicmix.com/vailresorts/sites/epicmix/api/mobile/'
    # keys are used in the requests, the values and used in the response
    DATA_LIST = {'lifts': 'lifts',
                 'weather': 'snowconditions', 'terrain': 'terrains'}
    json_data = dict()

    # Create lists to filter by topic
    if filter_topic is not None:
        filtered_data_list = {k: v for k,
                              v in DATA_LIST.items() if k in filter_topic}
    else:
        filtered_data_list = DATA_LIST

    for d, name in filtered_data_list.items():
        res = requests.get(API_URL + d + '.ashx')
        res.raise_for_status()
        data = json.loads(res.text)[name]
        data = list(filter(lambda x: filter_resort(x, filter_resortID), data))
        json_data[d] = json.dumps(
            {'timestamp': str(datetime.now(TZ)), d: data})

    return json_data


def s3_object_exists(fname):
    """Check if an s3 object exists.  Returns `True` if the object exists."""
    try:
        bucket.Object(fname)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print(f"{fname} doesn't exist")
        else:
            raise
    return True


def load_dataframe_from_parquet_on_s3(fname):
    """ Load a dataframe from a Parquet file on S3. """
    if s3_object_exists(fname):
        read_file = f"s3://{BUCKET_NAME}/{fname}"
        pf = ParquetFile(read_file, open_with=myopen)
        df = pf.to_pandas()

        # Reshift the timezone because parquet engines don't handle shifted timezones
        df.loc[:, 'timestamp'] = df.loc[:, 'timestamp'].dt.tz_convert(TZ)

        return df


class api_data():
    def __init__(self, topic: str, current_json: str):
        self.topic = topic
        self.current_json = current_json
        # May not exist yet
        self.prior_fname = topic + PRIOR_SUFFIX
        self.prior_object = bucket.Object(self.prior_fname)
        self.check_prior_object()

    def check_prior_object(self):
        """Get prior data json"""
        try:
            self.prior_object.load()
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == "404":
                print(f"Prior json for {self.topic} doesn't exist")
                self.prior_exists = False
            else:
                # Something else has gone wrong.
                raise
        else:
            self.prior_exists = True
        return self.prior_exists

    def get_prior_data_json(self):
        """Get prior data json from S3."""
        if self.prior_exists == True:
            prior = self.prior_object.get()['Body'].read().decode('utf-8')
            self.prior_json = json.loads(prior)
            print(f"Loaded prior {self.topic} json data from S3")
            return self.prior_json
        else:
            print(f"Prior json for {self.topic} doesn't exist")

    def data_changed(self):
        """Compare current data json with prior data json without their timestamps.  The timestamps
        on the current json will always be more recent even when none of the other data has changed.
        """
        if self.prior_json[self.topic] == self.current_json[self.topic]:
            print(
                f"No differences between current and prior {self.topic} data were found.")
            return False
        else:
            print(
                f"Found differences between current and prior {self.topic} data.")
            return True

    def save_prior_data(self):
        """Save the current data as prior data on S3."""
        bucket.put_object(Key=self.prior_fname,
                          Body=bytes(json.dumps(self.current_json).encode('UTF-8')))


class ParquetWriter():
    """Identifies new data and writes it to Parquet file on S3."""

    def __init__(self):
        # Get current data
        self.data_current_all = get_data()  # String.

    def write_new_data_all(self):
        """Writes new data for each type (i.e. 'lift', 'weather', 'terrian')
        of data returned by the API.
        """
        for topic in self.data_current_all:
            current_json = json.loads(self.data_current_all[topic])
            data = api_data(topic, current_json)
            self.write_new_data(data)

    def write_new_data(self, api_data):
        """If current data has changed since the last update of Parquet file is, add it
        to the Parquet file.  Save the current data as json to serve as the prior for
        the next comparison.
        """

        if api_data.prior_exists:
            api_data.get_prior_data_json()
            if api_data.data_changed():
                # Get a df with the chages between the prior and current json data
                df = jsons_to_df(
                    [api_data.prior_json, api_data.current_json], record_path=api_data.topic)
                write_dataframe_to_parquet_on_s3(
                    df, api_data.topic, api_data.topic + HISTORY_SUFFIX)

                # save current data json as prior
                api_data.save_prior_data()
                print(
                    f"Replaced data in {api_data.prior_object.key} with current data.")
        else:
            print(f"Prior json for {api_data.topic} doesn't exist")
            # Create the prior file
            api_data.save_prior_data()
            print(f"Created {api_data.prior_fname}")
        print('\n')

In [87]:
%%time
pr = ParquetWriter()
pr.write_new_data_all()

Loaded prior lifts json data from S3
No differences between current and prior lifts data were found.


Loaded prior weather json data from S3
Found differences between current and prior weather data.
File weather_history_DEV.parquet found on S3.
Writing 2 records to weather_history_DEV.parquet.
Replaced data in weather_prior_DEV.json with current data.


Loaded prior terrain json data from S3
No differences between current and prior terrain data were found.


CPU times: user 289 ms, sys: 35.8 ms, total: 325 ms
Wall time: 2.77 s


In [None]:
import time
while True:
    print('\n\n\n' + time.ctime() + ':\n---------------------')
    pr = ParquetWriter()
    pr.write_new_data_all()
    time.sleep(3600)

**Warnings**

See https://github.com/dask/fastparquet/issues/477 for fastparquet warnings about `RangeIndex._start, RangeIndex._stop, RangeIndex._step`


    /Users/paul/anaconda3/lib/python3.7/site-packages/ipykernel_launcher.py:90: FutureWarning: pandas.io.json.json_normalize is deprecated, use pandas.json_normalize instead
    /Users/paul/anaconda3/lib/python3.7/site-packages/fastparquet/writer.py:655: FutureWarning: RangeIndex._start is deprecated and will be removed in a future version. Use RangeIndex.start instead
      index_cols = [{'name': index_cols.name, 'start': index_cols._start,
    /Users/paul/anaconda3/lib/python3.7/site-packages/fastparquet/writer.py:656: FutureWarning: RangeIndex._stop is deprecated and will be removed in a future version. Use RangeIndex.stop instead
      'stop': index_cols._stop, 'step': index_cols._step,
    /Users/paul/anaconda3/lib/python3.7/site-packages/fastparquet/writer.py:656: FutureWarning: RangeIndex._step is deprecated and will be removed in a future version. Use RangeIndex.step instead
      'stop': index_cols._stop, 'step': index_cols._step,

### Load parquet and save as .csv

In [None]:
parq_df = load_dataframe_from_parquet_on_s3('lifts' + HISTORY_SUFFIX)
lifts_status_changes_parq_df = get_status_durations(parq_df)
lifts_status_changes_parq_df.to_csv(
    DATA_DIR + "lifts_status_changes_parq.csv", date_format='%c')

### Testing

In [9]:
TEST_DATA_DIR = "../data/test/"
TEST_VALIDATION_DATA_DIR = TEST_DATA_DIR + "valid/"
lifts_json_test_file = TEST_DATA_DIR + "lifts_test.json"
terrain_json_test_file = TEST_DATA_DIR + "terrain_test.json"
weather_json_test_file = TEST_DATA_DIR + "weather_test.json"
merged_lifts_json_test_file = TEST_DATA_DIR + "merged_lifts_test.json"
merged_terrain_json_test_file = TEST_DATA_DIR + "merged_terrain_test.json"
merged_weather_json_test_file = TEST_DATA_DIR + "merged_weather_test.json"
merged_whis_lifts_json_test_file = TEST_DATA_DIR + "merged_whis_lifts_test.json"

#### Find column combinations to identify entities

In [10]:
from itertools import combinations

# Move to EDA notebook
# Do we really need to use all the terrain columns in topic_ID_col_names to uniquely identify each run?
ID_cols = ['resortID', 'runID', 'runName', 'terrainName']
df = load_json_as_df(terrain_json_test_file, 'terrain')

for combo in combinations(ID_cols, len(ID_cols)-1):
    print(f"Combo: {combo}\tDuplicates: {df.duplicated(combo).sum()}")
    
print(f"Combo: {ID_cols}\tDuplicates: {df.duplicated(ID_cols).sum()}")

Combo: ('resortID', 'runID', 'runName')	Duplicates: 1
Combo: ('resortID', 'runID', 'terrainName')	Duplicates: 0
Combo: ('resortID', 'runName', 'terrainName')	Duplicates: 2
Combo: ('runID', 'runName', 'terrainName')	Duplicates: 0
Combo: ['resortID', 'runID', 'runName', 'terrainName']	Duplicates: 0


We will always get one duplicate lift entry because the Peak 2 Peak Gondola is returned twice in the API data:

In [11]:
ID_cols = ['resortID', 'liftName', 'liftID']
df = load_json_as_df(lifts_json_test_file, 'lifts')

for combo in combinations(ID_cols, len(ID_cols)-1):
    print(f"Combo: {combo}\tDuplicates: {df.duplicated(combo).sum()}")

print(f"Combo: {ID_cols}\tDuplicates: {df.duplicated(ID_cols).sum()}")

Combo: ('resortID', 'liftName')	Duplicates: 1
Combo: ('resortID', 'liftID')	Duplicates: 2
Combo: ('liftName', 'liftID')	Duplicates: 1
Combo: ['resortID', 'liftName', 'liftID']	Duplicates: 1


#### Tests

In [12]:
import unittest

In [92]:
# new

# TBD add test descriptions?


class TestNotebook(unittest.TestCase):

    topic_names = ['lifts', 'terrain', 'weather']

    def test_get_data(self):
        self.assertEqual(
            list(get_data(filter_topic=['lifts']).keys()), ['lifts'],
            'Returned dictionary was not filtered for the right topic'
        )
        self.assertEqual(
            list(get_data(filter_topic=['lifts', 'terrain']).keys()),
            ['lifts', 'terrain'],
            'Returned dictionary was not filtered for the right topics'
        )
        self.assertEqual(
            set(get_data().keys()), {'lifts', 'weather', 'terrain'},
            'Returned dictionary was not filtered for all topics'
        )

    def test_get_data_changes(self):

        # 'lifts' needs to be listed twice under 'topic' because there are two tests
        # run on lift data.  The test using merged_whis_lifts_json_test_file has more than
        # 2 timepoints in order to test the special code that is used to handle that case.
        tests_df = pd.DataFrame({
            'test_file': [merged_whis_lifts_json_test_file,
                          merged_lifts_json_test_file,
                          merged_terrain_json_test_file,
                          merged_weather_json_test_file],
            'topic': ['lifts', 'lifts', 'terrain', 'weather'],
            'validation_fname_prefix': ['get_data_changes_merged_whis_lifts',
                                        'get_data_changes_merged_lifts',
                                        'get_data_changes_merged_terrain',
                                        'get_data_changes_merged_weather']
        })

        for row in tests_df.iterrows():

            test_file = row[1]['test_file']
            df = load_json_as_df(test_file, row[1]['topic'])
            df = df.sample(frac=1)  # Shuffle the data

            for keep_oldest in [True, False]:
                if keep_oldest == True:
                    validation_fname_suffix = '_keep_oldest_valid.json'
                else:
                    validation_fname_suffix = '_drop_oldest_valid.json'

                tested_df = get_data_changes(
                    df, row[1]['topic'], keep_oldest=keep_oldest)

                valid_file = TEST_VALIDATION_DATA_DIR + \
                    row[1]['validation_fname_prefix'] + validation_fname_suffix
                valid_df = pd.read_pickle(valid_file)

                #  Sort and reindex before comparison because we are not testing the indexes
                # or row orders that the functions return.
                tested_df.sort_values(
                    tested_df.columns.to_list(), ignore_index=True, inplace=True)
                valid_df.sort_values(
                    valid_df.columns.to_list(), ignore_index=True, inplace=True)

                pd.testing.assert_frame_equal(
                    tested_df, valid_df,
                    f"Result from {test_file} did not match validation dataframe {valid_file}."
                )

    def test_ID_col_names(self):
        """Make sure that records can be uniquely identified by using ID columns for each topic
        (in combination with timestamp)"""
        files = [merged_lifts_json_test_file,
                 merged_terrain_json_test_file, merged_weather_json_test_file]

        for file, topic in zip(files, self.topic_names):

            df = load_json_as_df(file, topic)

            # Drop any rows that are complete duplicates. This is required for the Peak 2 Peak
            # Gondola because it is duplicated in the lifts data.  Maybe others as well.
            df.drop_duplicates(inplace=True)
            
            record_id_cols = include_timestamp_in_colnames(topic_ID_col_names[topic])
            self.assertTrue(records_are_unique(df, record_id_cols),
                            f"{record_id_cols} are not sufficient to uniquely identify the {topic} records.")

    
# To add:    
# test records_are_unique() raises warning
    
    
    # Assert df has no NaN or NaTs:
    # assert parq_df.isnull().sum().sum() == 0


unittest.main(argv=[''], verbosity=2, exit=False)

test_ID_col_names (__main__.TestNotebook)
Make sure that records can be uniquely identified by using ID columns for each topic ... ok
test_get_data (__main__.TestNotebook) ... ok
test_get_data_changes (__main__.TestNotebook) ... ok

----------------------------------------------------------------------
Ran 3 tests in 3.942s

OK


<unittest.main.TestProgram at 0x11c860a58>

In [95]:
# TBD: test dtypes if not covered by type hint testing
test_json = load_prior_json_from_s3('lifts')
jsons_to_df(test_json, 'lifts')#.dtypes

# TBD Make sure categories are complete... by sorting columns?

Unnamed: 0,liftID,resortID,liftName,status,timeToRide,timestamp
0,2,1,Avanti Express Lift #2,O,7,2020-02-14 17:43:53.685619-08:00
1,3,1,Wildwood Express #3,O,5,2020-02-14 17:43:53.685619-08:00
2,4,1,Mountain Top Express #4,O,4,2020-02-14 17:43:53.685619-08:00
3,5,1,High Noon Express #5,O,6,2020-02-14 17:43:53.685619-08:00
4,6,1,Riva Bahn Express #6,O,9,2020-02-14 17:43:53.685619-08:00
...,...,...,...,...,...,...
318,15,17,Sunshine Quad,O,6,2020-02-14 17:43:53.685619-08:00
319,4,17,Glades Peak Quad,X,8,2020-02-14 17:43:53.685619-08:00
320,16,17,Green Ridge Triple,X,8,2020-02-14 17:43:53.685619-08:00
321,6,17,Orion's Belt Carpet,X,1,2020-02-14 17:43:53.685619-08:00


In [96]:
topic = 'terrain'

# test that datatypes are correct if not covered by hint testing?
parq_df = load_dataframe_from_parquet_on_s3(topic + HISTORY_SUFFIX)
parq_df.dtypes

runID                                   category
resortID                                category
groomed                                 category
runName                                 category
runType                                 category
status                                  category
terrainName                             category
timestamp      datetime64[ns, America/Vancouver]
dtype: object

In [97]:
# test that the status categories are complete for lifts and terrain
# maybe like this or direct access the index
test_terrain = load_dataframe_from_parquet_on_s3(topic + HISTORY_SUFFIX)
#test_terrain.status.unique()

In [98]:
parq_df = set_df_datatypes(parq_df, topic)
parq_df.dtypes

runID                                   category
resortID                                category
groomed                                 category
runName                                 category
runType                                 category
status                                  category
terrainName                             category
timestamp      datetime64[ns, America/Vancouver]
dtype: object

In [99]:

# Sort by timestamp and ID columns
record_id_cols = include_timestamp_in_colnames(topic_ID_col_names[topic])
print(f"sorted by {record_id_cols}")
parq_df.sort_values(record_id_cols)

sorted by ['resortID', 'runID', 'terrainName', 'timestamp']


Unnamed: 0_level_0,runID,resortID,groomed,runName,runType,status,terrainName,timestamp
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
127,10,1,No,Apres Vous,3,O,Back Bowls,2020-02-13 15:06:38.957648-08:00
171,10,1,Yes,Big Rock Park,2,O,Blue Sky Basin,2020-02-13 15:06:38.957648-08:00
153,10,1,No,Bolshoi Ballroom,3,O,China Bowl,2020-02-13 15:06:38.957648-08:00
0,10,1,No,Blue Ox,3,O,Golden Peak,2020-02-13 15:06:38.957648-08:00
87,10,1,No,Baccarat,2,O,Lionshead,2020-02-13 15:06:38.957648-08:00
...,...,...,...,...,...,...,...,...
1911,1749,17,No,Whispering Pines,2,O,Main Face,2020-02-13 15:06:38.957648-08:00
1912,1750,17,No,Whistler,2,O,Main Face,2020-02-13 15:06:38.957648-08:00
1913,1751,17,Yes,Zip,1,O,Main Face,2020-02-13 15:06:38.957648-08:00
1914,1752,17,No,Broken Arrow,3,X,Main Face,2020-02-13 15:06:38.957648-08:00


In [None]:
# For example code: Show data for whistler
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(parq_df.sort_values(sort).query('resortID == 13'))

In [None]:
# For example code: Filter dataframe by date
parq_df = parq_df[parq_df['timestamp'] > '2020-02-08']

In [None]:
# Example of a entities that has the same runName and resortID, but different terrainNames:
parq_df.query('runName == "Bear Paw"')

We can uniquely identify each run via a combination of: `resortID`, `runID` and `terrainName`:

In [89]:
# we don't need to include `runName`:
assert(
    parq_df.drop_duplicates(subset=['resortID', 'runID', 'terrainName', 'timestamp'], keep=False).equals(
    parq_df.drop_duplicates(subset=['resortID', 'runID', 'runName', 'terrainName', 'timestamp'], keep=False))
)

TBD: testing
- no duplicate rows
- no duplicate information in adjacent rows by time

In [None]:
# Load live whistler parquet data

%time parq_df = load_dataframe_from_parquet_on_s3(HISTORY_FNAME)

lifts_status_changes_parq_df = get_status_durations(parq_df)
lifts_status_changes_parq_df.to_csv(DATA_DIR + "lifts_status_changes_parq.csv", date_format='%c')
lifts_status_changes_parq_df

In [None]:
parq_df.status.cat.categories

In [None]:
print(*parq_df.status)

In [None]:
parq_df.sort_values(["liftName", "timestamp"])

### Issue when running get_status_durations(parq_df)
Resulting in error:

    ~/anaconda3/lib/python3.7/site-packages/pandas/core/arrays/categorical.py in from_codes(cls, codes, categories, ordered, dtype)
        705 
        706         if len(codes) and (codes.max() >= len(dtype.categories) or codes.min() < -1):
    --> 707             raise ValueError("codes need to be between -1 and " "len(categories)-1")
        708 
        709         return cls(codes, dtype=dtype, fastpath=True)

    ValueError: codes need to be between -1 and len(categories)-1


Same error seen when running `parq_df[['status']].sort_values(by=['status'])`

This was caused by missing categories (`H`) in the `status` column (and maybe others)

#### Code to inspect issue:

In [None]:
# Test for issue
parq_df[['status']].sort_values(by=['status'])

In [101]:
parq_df.status.cat.categories

Index(['X', 'H', 'O'], dtype='object')

In [116]:
# All the category codes present in the column
print(*parq_df.status.cat.codes.unique())

2 0


In [103]:
len(parq_df.status.cat.codes)

2200

In [104]:
# Should be false
parq_df.status.cat.codes.max() >= len(parq_df.status.dtype.categories)

False

In [105]:
# Should be false
parq_df.liftName.cat.codes.min() < -1

AttributeError: 'DataFrame' object has no attribute 'liftName'

In [106]:
for c in parq_df.columns:
    print(parq_df[c].cat.categories)

Int64Index([  10,   11,   12,   13,   14,   15,   16,   17,   18,   19,
            ...
            1744, 1745, 1746, 1747, 1748, 1749, 1750, 1751, 1752, 1753],
           dtype='int64', length=610)
Int64Index([1, 2, 3, 4, 5, 7, 9, 12, 13, 14, 15, 16, 17], dtype='int64')
Index(['No', 'Yes'], dtype='object')
Index(['$100 Saddle', '1/2 Load', '1/4 Load', '10th Mountain',
       '117th Street Small/Medium Rail Garden', '18' Half Pipe', '1876',
       '1st Bowl', '2 Lift Line', '3 Kings',
       ...
       'Yarrow', 'Yellow Brick Road', 'Yonder', 'Yonder Gully', 'Zachary',
       'Zeke’s Road', 'Zig Zag', 'Zip', 'Zoom Room', 'Zot'],
      dtype='object', length=1844)
Int64Index([0, 1, 2, 3, 4, 5, 7], dtype='int64')
Index(['X', 'H', 'O'], dtype='object')
Index(['7th Heaven', 'A51 Terrain Park', 'Arrowhead', 'Bachelor Gulch',
       'Back Bowls', 'Back Side', 'Backside', 'Beaver Creek',
       'Big Red - Franz's - Garbanzo', 'Birds Of Prey', 'Blue Sky Basin',
       'Bonanza/McConkey's/Pione

AttributeError: Can only use .cat accessor with a 'category' dtype

In [110]:
parq_df["timestamp"] = pd.to_datetime(pd.Series(np.asarray(parq_df["timestamp"])))

In [None]:
read_file = f"s3://{BUCKET_NAME}/{fname}.parquet"
pf = ParquetFile(read_file, open_with=myopen)

# Check the categories for a specific row group
pf.grab_cats(columns='status', row_group_index=1)

In [None]:
# If partitioning by column, gives known values for each column
pf.cats

#### Possible solutions
1. Remove partitioning by date column when writing to parquet
**2. Set status categories manually via `set_categories`. (and any other columns with the same issue.  See https://github.com/dask/dask/issues/2944**
3. Leave problem columns as text-based when writing and loading from parquet

# Testing timestamps for file loading

In [None]:
read_file = f"s3://{BUCKET_NAME}/{fname}.parquet"
pf = ParquetFile(read_file, open_with=myopen)
test = pf.to_pandas()["timestamp"]

In [None]:
# If needed: to convert for categorical datetime to regular datetime
df["timestamp"] = pd.to_datetime(pd.Series(np.asarray(df["timestamp"])))

In [None]:
test.dt = test.dt.tz_convert(tz= 'America/Vancouver')

/Users/paul/anaconda3/lib/python3.7/site-packages/pandas/core/series.py:597: FutureWarning: Converting timezone-aware DatetimeArray to timezone-naive ndarray with 'datetime64[ns]' dtype. In the future, this will return an ndarray with 'object' dtype where each element is a 'pandas.Timestamp' with the correct 'tz'.
	To accept the future behavior, pass 'dtype=object'.
	To keep the old behavior, pass 'dtype="datetime64[ns]"'.


more info: https://pandas-docs.github.io/pandas-docs-travis/whatsnew/v0.24.0.html#converting-timezone-aware-series-and-index-to-numpy-arrays

In [None]:
load_dataframe_from_parquet_on_s3(fname).dtypes

### Testing local parquet saves

In [None]:
def save_parquet(df, fname):
    # parquet engines don't handle shifted timezones
    df.loc[:, 'timestamp'] = df.loc[:, 'timestamp'].dt.tz_convert(pytz.utc)

    # Note: May need snappy-python as a req to run on AWS Lambda
    df.to_parquet(DATA_DIR + fname + '.parquet',
                  engine='fastparquet',
                  partition_on=['timestamp'],
                  file_scheme='mixed')

In [None]:
save_parquet(df[0:3].copy(), 'wb_lifts_history')

In [None]:
df.iloc[20:22, :].copy().to_parquet(DATA_DIR + 'wb_lifts_history' + '.parquet',
              engine='fastparquet',
              partition_on=['timestamp'],
              file_scheme='mixed',
              append=True)
# Catch exception that is doesn't exist here

In [None]:
# todo: change time_diff to "duration"
# test on lambda
# make datatype dict for and general set datatypes function


# Utilities

In [94]:
def load_prior_json_from_s3(topic: str) -> dict:
    """E.g. load_prior_json_from_s3('weather')"""
    prior_object = bucket.Object(topic + PRIOR_SUFFIX)
    prior = prior_object.get()['Body'].read().decode('utf-8')
    return json.loads(prior)

In [None]:
def dataframe_difference(df1, df2, which=None):
    """Find rows which are different between two DataFrames."""
    comparison_df = df1.merge(df2,
                              indicator=True,
                              how='outer')
    if which is None:
        diff_df = comparison_df[comparison_df['_merge'] != 'both']
    else:
        diff_df = comparison_df[comparison_df['_merge'] == which]
    return diff_df



In [None]:
# TBD: Add to scraping 
# check that weather_prior_json['weather'][0].keys() matches list of expected columns (in case new ones are in use)
# Add as exception handling for the other topics as well
load_prior_json_from_s3('weather')['weather'][0].keys()

In [None]:
# load a json from S3
terrain_prior_json = load_prior_json_from_s3('terrain')

# Convert json to a dataframe normally wer can use jsons_to_df()
terrain_prior_df = pd.json_normalize(
    data=terrain_prior_json,
    record_path=['terrain'],
    meta='timestamp'
)

terrain_prior_df.columns

In [None]:
def del_lifts_history():
    bucket.Object(HISTORY_FNAME).delete()

def del_lifts_prior():
    bucket.Object(PRIOR_STATUS_FNAME).delete()

In [None]:
del_lifts_history()
del_lifts_prior()

### S3 Object Deletion

In [None]:
# Delete prior jsons
bucket.Object('lifts' + PRIOR_SUFFIX).delete()
bucket.Object('terrain' + PRIOR_SUFFIX).delete()
bucket.Object('weather' + PRIOR_SUFFIX).delete()

In [None]:
# Delete parquet history files
bucket.objects.filter(Prefix='lifts' + HISTORY_SUFFIX + '/').delete()
bucket.objects.filter(Prefix='terrain' + HISTORY_SUFFIX + '/').delete()
bucket.objects.filter(Prefix='weather' + HISTORY_SUFFIX + '/').delete()

# Notes
- Terrain data: runs need to be identified via combination of `resortID`, `runName` and `runID`
- There are run IDs that repeat for the same resort (e.g. for Vail resortID == 1, runID == 10
- TBD: Are the combination of `resortID`, `runID`, and `runType` always unique?

## To do
- return data object with filter_by_resort() method
- live_data class?  Subclass for each subject?