In [0]:
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu


# Read recipe inputs
ghost_trips_booking_pings_cleaned_transformed = dataiku.Dataset("ping_data_clean")
ghost_trips_booking_pings_cleaned_transformed_df = ghost_trips_booking_pings_cleaned_transformed.get_dataframe()

In [0]:
ghost_trips_booking_pings_cleaned_transformed_df.label.unique()

In [0]:
import math as m

In [0]:
# class DistanceFeatures:
def create_ping_features(df):
    """Engineer features using distance between pings

    List of additional features:
    - `distance`
    - `calculated_speed`
    - `time_diff`

    The feature that is used in the `aggregated_features` is `calculated_speed`.

    Args:
        df (pd.DataFrame): filtered df

    Returns:
        pd.DataFrame: df with additional features `distance`, `time_diff`, `calculated_speed`
    """

    df["latlong"] = list(zip(df["latitude"], df["longitude"]))
    df["distance"] = df.groupby("order_no")["latlong"].transform(
        calc_distance_between_pings
    )
    df["time_diff"] = df.groupby("order_no")["seconds"].diff()
    df["calculated_speed"] = df["distance"] / df["time_diff"]
    df["calculated_speed"] = df["calculated_speed"].replace(
        [np.inf, -np.inf], np.nan
    )
    df = df.drop("latlong", axis=1)

    return df


def calc_distance_between_pings(coords):
    """Calculate distance between pings

    Important.
    - The ping sequence has to be originated from the same driver.

    Args:
        coords (array-like): list of tuples of latitude longitude

    Returns:
        list: distance between pings
    """

    if isinstance(coords, pd.DataFrame) or isinstance(coords, pd.Series):
        coords = coords.values
    temp = [0]
    for i in range(1, len(coords)):
        temp.append(calculate_dist(coords[i], coords[i - 1]))
    return temp


def calculate_dist(loc_next, loc_current, R=6373.0):
    """Calculate distance between two coordinates

    Args:
        loc_next (tuple): latitude longitude
        loc_current (tuple): latitude longitude
        R (float, optional): Defaults to 6373.0. earth's radius

    Returns:
        float: distance between `loc_next` and `loc_current`
    """

    lat1, lon1 = loc_current
    lat2, lon2 = loc_next
    lat1, lon1, lat2, lon2 = map(m.radians, [lat1, lon1, lat2, lon2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = m.sin(dlat / 2) ** 2 + m.cos(lat1) * m.cos(lat2) * m.sin(dlon / 2) ** 2
    c = 2 * m.atan2(m.sqrt(a), m.sqrt(1 - a))
    distance = R * c * 1000
    return distance


# class CoordsFeatures:

def create_coord_features(df):
    """Engineer features based on latitude longitude

    List of additional features:
    - `lat_change`
    - `long_change`
    - `weak_dir_change`
    - `strong_dir_change`

    Args:
        df (pd.DataFrame): filtered df

    Returns:
        pd.DataFrame: df with additional features
    """

    df = create_position_changes(df)
    df = create_dir_changes(df)

    return df


def create_position_changes(df):
    df["lat_change"] = df.groupby("order_no")["latitude"].transform(
        position_change
    )
    df["long_change"] = df.groupby("order_no")["longitude"].transform(
        position_change
    )
    return df


def position_change(coord):
    """latitude or longitude change

    Pseudocode:

        if the coord is the same, assign 0
        else if the current coord is greater than the previous coord, assign -1
        else assign 1

    Args:
        coord (array-like): list of latitudes or longitudes

    Returns:
        list: coord changes
    """

    if isinstance(coord, pd.DataFrame) or isinstance(coord, pd.Series):
        coord = coord.values
    temp = [0]
    for i in range(1, len(coord)):
        if np.isnan(coord[i]) or np.isnan(coord[i - 1]):
            temp.append(np.nan)
            continue
        if coord[i] == coord[i - 1]:
            temp.append(0)
        elif coord[i] > coord[i - 1]:
            temp.append(-1)
        else:
            temp.append(1)
    return temp


def create_dir_changes(df):
    df["lat_long_change"] = list(zip(df["lat_change"], df["long_change"]))
    df["weak_dir_change"] = df.groupby("order_no")["lat_long_change"].transform(
        lambda x: dir_change(x, "weak")
    )
    df["strong_dir_change"] = df.groupby("order_no")["lat_long_change"].transform(
        lambda x: dir_change(x, "strong")
    )
    df = df.drop(["lat_long_change"], axis=1)
    return df


def dir_change(coords_changes, change="weak"):
    """Direction Change

    Calculate change of latitude or longitude of the pings

    Weak direction change:

        if lat_change or long_change changes, then 1
        else 0

    Strong direction change:

        if lat_change and long_change change, then 1
        else 0

    Args:
        coords_changes ([type]): list of latitude longitude changes tuples.
        change (str, optional): Defaults to 'weak'. [description]

    Returns:
        list: list of direction changes indicator
    """

    if isinstance(coords_changes, pd.DataFrame) or isinstance(
        coords_changes, pd.Series
    ):
        coords_changes = coords_changes.values
    temp = [0]
    for i in range(1, len(coords_changes)):

        if coords_changes_isnull(
            coords_changes[i]
        ) or coords_changes_isnull(coords_changes[i - 1]):
            temp.append(np.nan)
            continue

        lat_change1, long_change1 = coords_changes[i]
        lat_change2, long_change2 = coords_changes[i - 1]

        if change == "weak":
            # Weak direction change
            temp.append(
                1
                if not (lat_change1 == lat_change2)
                or not (long_change1 == long_change2)
                else 0
            )
        else:
            # Strong direction change
            temp.append(
                0
                if (lat_change1 == lat_change2) or (long_change1 == long_change2)
                else 1
            )

    return temp


def coords_changes_isnull(coords_changes):
    lat_change, long_change = coords_changes
    return np.isnan(lat_change) or np.isnan(long_change)

In [0]:
ping_features = create_ping_features(ghost_trips_booking_pings_cleaned_transformed_df)

In [0]:
ping_features.info()

In [0]:
coord_features = create_coord_features(ghost_trips_booking_pings_cleaned_transformed_df)

In [0]:
coord_features.info()

In [0]:
def assemble(disaggregated_df, key, mode="train"):
    """Aggregate features with respect to `key`

    If mode == `test`, do not append `label` column to the `aggregated_df`.

    Args:
        disaggregated_df (pd.DataFrame): df to aggregate
        key (str): key pivot to aggregate the features

    Returns:
        pd.DataFrame: aggregated df
    """

    disaggregated_df = disaggregated_df.sort_values(
        ["order_no", "seconds"]
    ).reset_index(drop=True)

    aggregated_dfs = []
    if key == "order_no":
        # Index is `order_no`

        aggregated_dfs += [assemble_TripLevelFeatures(disaggregated_df)]

        aggregated_dfs += [assemble_TripLevelDriverStatusFeatures(disaggregated_df)]

    aggregated_df = join_df_on_index(aggregated_dfs)

    # NOTE: Need to beware of the index
    aggregated_df = aggregated_df[aggregated_features(key)]

    if mode == "train" and key == "order_no":
        booking_labels = (
            disaggregated_df[["order_no", "label"]]
            .drop_duplicates()
            .set_index("order_no")
        )
        aggregated_df = aggregated_df.merge(
            booking_labels, left_index=True, right_index=True
        ).reset_index()

    return aggregated_df


def join_df_on_index(aggregated_dfs):
    """Join difference `aggregated_df` on index

    Args:
        aggregated_dfs (list): list of pd.DataFrame `aggregated_df`

    Returns:
        pd.DataFrame: joined df
    """
    aggregated_df = reduce(
        lambda left, right: pd.merge(
            left, right, left_index=True, right_index=True
        ),
        aggregated_dfs,
    )
    return aggregated_df


def concatenate_dfs(dfs, columns_if_empty):
    if len(dfs) == 0:
        return pd.DataFrame([], columns=columns_if_empty)
    return pd.concat(dfs, axis=0).reset_index(drop=True)

In [0]:
def assemble_TripLevelFeatures(disaggregated_df, suffix=None):

        aggregated_dfs = [
            trip_statistics(disaggregated_df, "calculated_speed"),
            trip_statistics(disaggregated_df, "altitude_in_meters"),
            trip_statistics(disaggregated_df, "accuracy_in_meters"),
            #trip_statistics(disaggregated_df, "measured_speed"),
            trip_statistics(disaggregated_df, "weak_dir_change", ["mean"]),
            trip_statistics(disaggregated_df, "strong_dir_change", ["mean"]),
            trip_statistics(disaggregated_df, "distance", ["sum"]),
            trip_statistics(disaggregated_df, "latitude", ["mean"]),
            trip_statistics(disaggregated_df, "longitude", ["mean"]),
            trip_statistics(disaggregated_df, "time_diff"),
            trip_statistics(disaggregated_df, "lat_change", ["std"]),
            trip_dir_change_perc(
                disaggregated_df, ["lat_change", "long_change"], "up"
            ),
            trip_dir_change_perc(
                disaggregated_df, ["lat_change", "long_change"], "down"
            ),
            trip_duration(disaggregated_df),
            trip_pings_count(disaggregated_df),
            trip_pings_overlapping_count(disaggregated_df),
            time_on_point_accept(disaggregated_df),
        ]

        aggregated_df = pd.concat(aggregated_dfs, axis=1)
        aggregated_df = generate_interaction_features(aggregated_df)

        return aggregated_df


def trip_statistics(
    disaggregated_df, column, stats=["mean", "median", "max", "min", "std"]
):

    # calculate statistics of trip
    names = {stat: "ping_sequence_{}_{}".format(column, stat) for stat in stats}

    if disaggregated_df[column].isnull().all():
        # If the whole column is Null, set every statistics to null
        df = pd.DataFrame(
            None,
            columns=names.values(),
            index=pd.Index(disaggregated_df["order_no"].unique(), name="order_no"),
        )
        return df

    df = (
        disaggregated_df.groupby("order_no")[column]
        .agg(stats)
        .rename(columns = names)
    )
    return df


def trip_dir_change_perc(disaggregated_df, columns, mode="up"):

    val = 1 if mode == "up" else -1
    names = {
        column: "ping_sequence_{}_{}_perc".format(column, mode)
        for column in columns
    }
    df = (
        disaggregated_df.groupby("order_no")[columns]
        .apply(lambda x: np.sum(x == val) / len(x))
        .rename(columns = names)
    )

    return df


def trip_duration(disaggregated_df):

    df = disaggregated_df.groupby("order_no")["seconds"].apply(
        lambda x: np.nanmax(x) - np.nanmin(x)
    )
    df.name = "ping_sequence_total_duration"
    return df


def trip_pings_count(disaggregated_df):
    df = disaggregated_df.groupby("order_no").size()
    df.name = "ping_sequence_count"
    return df


def generate_interaction_features(aggregated_df):

    aggregated_df["ping_sequence_avg_speed"] = (
        aggregated_df["ping_sequence_distance_sum"]
        / aggregated_df["ping_sequence_total_duration"]
    )
    return aggregated_df


def time_on_point_accept(disaggregated_df):
    # TODO: Add unit tests
    df = disaggregated_df.groupby("order_no").apply(_time_on_point_accept)
    df.name = "seconds_on_same_latlong_as_ping_bef_accept"
    return df


def _time_on_point_accept(pings):
    # TODO: Add unit test

    available_pings = pings[pings.driver_status == "AVAILABLE"]
    if available_pings.empty:
        return np.nan

    available_ping = available_pings.iloc[-1]
    lat_accept = available_ping["latitude"]
    long_accept = available_ping["longitude"]
    seconds_accept = available_ping["seconds"]

    suspicious_pings = pings[
        (pings["latitude"] == lat_accept)
        & (pings["longitude"] == long_accept)
        & (pings["driver_status"].isin(["OTW_PICKUP", "OTW_DROPOFF"]))
    ]

    if suspicious_pings.empty:
        return 0
    return suspicious_pings["seconds"].iloc[-1] - seconds_accept


def trip_dir_change_mean(aggregated_df):
    # TODO: Add unit tests
    aggregated_df["ping_sequence_dir_change_mean"] = aggregated_df.apply(
        _trip_dir_change_mean, axis=1
    )
    return aggregated_df


def _trip_dir_change_mean(row):
    # TODO: Add unit tests
    Xw = row["ping_sequence_weak_dir_change_mean"]
    Xs = row["ping_sequence_strong_dir_change_mean"]
    if Xs >= 0.86:
        return 0
    elif Xs > 0.6 and Xs <= 0.8:
        w = 0.6
    else:
        w = 1.25
    return (w * Xw) + (1 - w) * Xs


def trip_pings_overlapping_count(disaggregated_df):
    # TODO: Add unit tests
    # disaggregated_df["rank_ping_dist"] = (
    #     disaggregated_df.groupby("order_no")["distance"].transform(
    #         lambda x: (x != x.shift()).cumsum()
    #     )
    #     - 1
    # )
    # df = disaggregated_df.groupby("order_no")["rank_ping_dist"].apply(
    #     lambda x: (x.value_counts() >= 15).sum()
    # )

    df = disaggregated_df.groupby("order_no")["distance"].apply(
        _trip_pings_overlapping_count
    )
    df.name = "ping_sequence_overlapping_count"
    return df


def _trip_pings_overlapping_count(distance):
    # TODO: Add unit tests
    rank_ping_dist = (distance != distance.shift()).cumsum()
    return (rank_ping_dist.value_counts() >= 15).sum()

# @staticmethod
# def _trip_pings_overlapping_count(group):
#     # TODO: Add unit tests
#     group["rank_ping_dist"] = (
#         group["distance"] != group["distance"].shift()
#     ).cumsum()
#     rank_dist = group["rank_ping_dist"].unique()
#     overlapping_count = 0
#     for y in rank_dist:
#         if group[group["rank_ping_dist"] == y].shape[0] >= 15:
#             overlapping_count += 1
#     return overlapping_count

In [0]:
def assemble_TripLevelDriverStatusFeatures(disaggregated_df):
    pings_before_accept, pings_during_pickup = separate_pings_by_driver_status(
        disaggregated_df
    )

    aggregated_dfs = [
        assemble_for_group(pings_before_accept, group="before_accept"),
        assemble_for_group(pings_during_pickup, group="during_pickup"),
    ]

    aggregated_df = join_df_on_index(aggregated_dfs)
    return aggregated_df


def assemble_for_group(disaggregated_df, group):

    if group == "before_accept":

        aggregated_dfs = [
            trip_statistics(
                disaggregated_df, "accuracy_in_meters", ["min", "mean"]
            ),
            trip_statistics(
                disaggregated_df, "weak_dir_change", ["mean"]
            ),
            trip_statistics(
                disaggregated_df, "strong_dir_change", ["mean"]
            ),
            trip_dir_change_perc(
                disaggregated_df, ["lat_change", "long_change"], "up"
            ),
            trip_dir_change_perc(
                disaggregated_df, ["lat_change", "long_change"], "down"
            ),
        ]

    elif group == "during_pickup":

        aggregated_dfs = [
            trip_statistics(
                disaggregated_df, "accuracy_in_meters", ["min"]
            ),
            trip_statistics(
                disaggregated_df, "weak_dir_change", ["mean"]
            ),
            trip_statistics(
                disaggregated_df, "strong_dir_change", ["mean"]
            ),
            trip_dir_change_perc(
                disaggregated_df, ["lat_change", "long_change"], "up"
            ),
            trip_dir_change_perc(
                disaggregated_df, ["lat_change", "long_change"], "down"
            ),
        ]

    else:
        raise ValueError

    aggregated_df = pd.concat(aggregated_dfs, axis=1)

    if group == "before_accept":
        aggregated_df = trip_dir_change_mean(aggregated_df)

    suffix = "_" + group
    columns = list(aggregated_df.columns)
    aggregated_df = aggregated_df.rename(
        columns={col: col + suffix for col in columns}
    )

    return aggregated_df


def separate_pings_by_driver_status(disaggregated_df):
    """Group pings into different subsets based on `driver_status`

    NOTE: Assume that for every order, the pings would come in the order

        AVAILABLE/UNAVAILABLE -> OTW_PICKUP -> OTW_DROPOFF

    """

    driver_status = disaggregated_df.driver_status

    pings_before_accept = disaggregated_df[
        driver_status.isin(["AVAILABLE", "UNAVAILABLE"])
    ].reset_index(drop=True)

    pings_during_pickup = disaggregated_df[
        driver_status.isin(["OTW_PICKUP"])
    ].reset_index(drop=True)

    return pings_before_accept, pings_during_pickup

In [0]:
def aggregated_features(key=None):
    if key is None:
        return aggregated_features("order_no")
    if key == "order_no":
        return [
            #####################
            ###### Overall ######
            #####################
            # "ping_sequence_calculated_speed_mean",
            # "ping_sequence_calculated_speed_median",
            # "ping_sequence_calculated_speed_max",
            # "ping_sequence_calculated_speed_min",
            # "ping_sequence_calculated_speed_std",
            # "ping_sequence_altitude_mean",
            # "ping_sequence_altitude_median",
            # "ping_sequence_altitude_max",
            # "ping_sequence_altitude_min",
            # "ping_sequence_altitude_std",
            # "ping_sequence_accuracy_mean",
            # "ping_sequence_accuracy_median",
            # "ping_sequence_accuracy_max",
            # "ping_sequence_accuracy_min",
            # "ping_sequence_accuracy_std",
            # 'ping_sequence_measured_speed_mean',
            # 'ping_sequence_measured_speed_median',
            # 'ping_sequence_measured_speed_max',
            # 'ping_sequence_measured_speed_min',
            # 'ping_sequence_measured_speed_std',
            # "ping_sequence_weak_dir_change_mean",
            # "ping_sequence_strong_dir_change_mean",
            "ping_sequence_lat_change_std",
            # "ping_sequence_lat_change_down_perc",
            # "ping_sequence_lat_change_up_perc",
            # "ping_sequence_long_change_down_perc",
            # "ping_sequence_long_change_up_perc",
            # "ping_sequence_avg_speed",
            "seconds_on_same_latlong_as_ping_bef_accept",
            "ping_sequence_overlapping_count",
            #####################
            ### Before Accept ###
            #####################
            "ping_sequence_accuracy_in_meters_min_before_accept",
            "ping_sequence_accuracy_in_meters_mean_before_accept",
            "ping_sequence_weak_dir_change_mean_before_accept",
            "ping_sequence_strong_dir_change_mean_before_accept",
            #####################
            ### During Pickup ###
            #####################
            "ping_sequence_accuracy_in_meters_min_during_pickup",
        ]

In [0]:
trip_level_features = assemble(coord_features, 'order_no', mode="train")

In [0]:
# Compute recipe outputs from inputs
# TODO: Replace this part by your actual code that computes the output, as a Pandas dataframe
# NB: DSS also supports other kinds of APIs for reading and writing data. Please see doc.

ghost_trip_features_df = trip_level_features # For this sample code, simply copy input to output

###

# Write recipe outputs
ghost_trip_features = dataiku.Dataset("revelio_production_features")
ghost_trip_features.write_with_schema(ghost_trip_features_df)
####