<a href="https://colab.research.google.com/github/epogrebnyak/rides/blob/master/rides.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Цели и набор гипотез

Цель - показать сходимость треков в разных условиях.

Параметры на входе:

- путь к каталогу с фалйами JSON
- какие типы автомобилей смотрим (автобусы, грузовые, легковые, спецтранспорт)

Алгоритмы поиска:

- непрерывные дистанции
- остановки

На выходе:

- список артефактов

Иcпользуемые гипотезы:

1. первая точка каждой зарегистрированной поездки - остановка с нулевой продолжительностью
1. треки разбиваются по суткам
1. "автобусы" - автомобили вместимостью не менее 8 человек

# Импорт зависимых модулей 

In [0]:
import os
import json

from pathlib import Path
from dataclasses import dataclass

import pandas as pd

# Создадим конфигурацию

Для нового датасета нужно предоставить адрес ссылки. 

Dropbox: в обычной ссылке надо заменить адрес с `www.dropbox.com` на `dl.dropboxusercontent.com`, чтобы она стала скачиваемой напрямую.

In [0]:
# Ссылка URL c данными и имя файла с данными
RAW_DATA_URL = ("https://dl.dropboxusercontent.com" 
                "/sh/hibzl6fkzukltk9/AABTFmhvDvxyQdUaBsKl4h59a/data_samples_json.zip")

Как исполнять программу - все удалить или использовать ранее созданные промежуточные файлы.

In [0]:
# Удалять ли все промежуточные файлы в начале запуска.
# Oбычно False, но для глубокого перезапуска - True
ERASE_EVERYTHING_AT_START = False


# Внутренние пути программы

Эти параметры не нужно менять.


In [0]:
# Каталог для данных
DATADIR = Path("data")  
DATADIR.mkdir(parents=False, exist_ok=True)

def datafile(filename):
    return str(DATADIR / filename)

class Filename:  
    VEHICLES = datafile("vehicles.csv")
    CSV_SOURCE = datafile("source.csv")  
    RAW_JSON_FOLDER = str (DATADIR / "jsons")
    RAW_ZIP_FILE = datafile("jsons.zip")

# Загрузить сырые данные по трекам поездок

In [0]:
# Google Colab позволяет комбинировать код python и команды linux

# Удалить все локальные файлы?
if ERASE_EVERYTHING_AT_START:
   !rm {Filename.RAW_ZIP_FILE}
   !rm -rf {Filename.RAW_JSON_FOLDER}
   !rm {Filename.VEHICLES}
   !rm {Filename.CSV_SOURCE}

In [0]:
# Загружаем файлы, если их нет
if not os.path.exists(Filename.RAW_ZIP_FILE):
  !wget -O {Filename.RAW_ZIP_FILE} {RAW_DATA_URL}
  !mkdir {Filename.RAW_JSON_FOLDER}
  !unzip -o {Filename.RAW_ZIP_FILE}  -d {Filename.RAW_JSON_FOLDER} > /dev/null 

In [0]:
# проверим наличие файлов 
!echo Raw file count:
!find {Filename.RAW_JSON_FOLDER} -type f | wc -l
!echo Sample files:
!ls {Filename.RAW_JSON_FOLDER} | head

Raw file count:
3464
Sample files:
01035dda-aeb4-11e9-80f2-10604ba895dc.json
01035ddd-aeb4-11e9-80f2-10604ba895dc.json
01035de0-aeb4-11e9-80f2-10604ba895dc.json
01035de3-aeb4-11e9-80f2-10604ba895dc.json
01035de6-aeb4-11e9-80f2-10604ba895dc.json
01035de9-aeb4-11e9-80f2-10604ba895dc.json
01ecaf60-dac1-11e9-80f2-10604ba895dc.json
01ecaf64-dac1-11e9-80f2-10604ba895dc.json
01ecaf67-dac1-11e9-80f2-10604ba895dc.json
01ecaf6b-dac1-11e9-80f2-10604ba895dc.json


# Типы автомобилей

Получаем константы `VEHICLES` и `VEHICLE_TYPES`.

In [0]:
class Folder:
    """Получение JSON файлов из каталога."""

    def __init__(self, directory_path: str):
        """Use *directory_path* as JSON file source."""
        self.directory = Path(directory_path)
        assert self.directory.is_dir()

    def filenames(self):
        return list(self.directory.glob("*.json"))

    def count(self):
        return len(self.filenames())

    def yield_jsons(self):
        for filename in self.filenames():
            yield self.load_json(filename)

    @staticmethod
    def load_json(filepath: str):
        """"Load a JSON file content"""
        with open(filepath, encoding="utf-8") as f:
            return json.load(f)

class Ride:
    def __init__(self, dict_):
        self.summary = dict_["info"]
        self.points = dict_["data"]

    @property
    def passengers(self):
        return self.summary["car_passengers"]

    @property
    def vehicle_type(self):
        return self.summary["category"]

    @property
    def duration(self):
        a, b = [to_date(self.summary[key]) for key in ("start_dt", "end_dt")]
        return round((b - a).total_seconds() / (60 * 60), 1)

    def route_locations(self):
        return [(p[2], p[1]) for p in self.points]

    def route_dataframe(self):
        df = pd.DataFrame(self.points, columns=KEYS)
        df.time = df.time.apply(to_date)
        return df

    @property
    def df(self):
        df = self.route_dataframe()["time lat lon".split()]
        df["dist"] = distance_deltas(df.lat, df.lon)  # km
        df["time_delta"] = df.time.diff().apply(lambda x: x.total_seconds())
        df["speed"] = (60 * 60) * (df["dist"] / df["time_delta"]).replace(
            [np.inf, -np.inf], np.nan
        )  # km/h
        df["elapsed_hours"] = (df.time - df.time[0]).apply(
            lambda x: x.total_seconds()
        ) / (
            60 * 60
        )  # hours
        return df

def get_vehicle_summary_raw(directory_path: str):
    rides = map(Ride, Folder(directory_path).yield_jsons())
    return pd.DataFrame([r.summary for r in rides])


def get_vehicle_summary(directory_path: str):
    """
    Use vehicles('data_samples_json').to_csv('vehicles.csv') to persist data.    
    """
    df = get_vehicle_summary_raw(directory_path)
    cols = ["category", "car_passengers", "cat_carry_weight"]
    return df.groupby("car_id").first()[cols].sort_values(cols)

@dataclass
class VehicleSummary:
    directory_path: str = Filename.RAW_JSON_FOLDER
    csv_path: str = Filename.VEHICLES

    def get(self):
        if not os.path.exists(self.csv_path):
          self.__make__()
        return self.__read__() 
    
    def __make__(self): 
        cars_df = get_vehicle_summary(self.directory_path)
        cars_df.to_csv(self.csv_path)

    def __read__(self):
        dtype = dict(car_id=str, category=str, car_passengers=int, 
                     cat_carry_weight=int)
        return pd.read_csv(self.csv_path, dtype=dtype)

def has(string):
    return lambda s: string in s


def vehicle_type(cars: pd.DataFrame):
    """Разобрать машины по типам:
        
        bus        
        passenger
        freight
        special
        
    """
    cars["type"] = "other"
    # bus
    cars.loc[cars.car_passengers >= 8, "type"] = "bus"
    # passenger
    a = (cars.category == "Специальный\\Автобус ") & (cars.type == "other")
    b = cars.category.apply(has("Легковой")) & (cars.type == "other")
    cars.loc[(a | b), "type"] = "passenger"
    # freight
    ix = cars.category.apply(has("Грузовой"))
    cars.loc[ix, "type"] = "freight"
    # special
    a = cars.category.apply(has("Специальный")) & (cars.type == "other")
    b = cars.category == "Строительный\\Автокран"
    cars.loc[(a | b), "type"] = "special"
    assert len(cars[cars.type == "other"].category.unique()) == 0
    return cars.type


def get_vehicle_dataframe():
    cars_df = VehicleSummary().get()
    cars_df["type"] = vehicle_type(cars_df)
    cars_df["qty"] = 1
    return cars_df


VEHICLES = get_vehicle_dataframe()
VEHICLE_TYPES = VEHICLES.type.unique().tolist()
assert set(VEHICLE_TYPES) == set(['bus', 'freight', 'passenger', 'special'])

# Функции расстояний

In [0]:
import pandas as pd
import numpy as np
from geopy.distance import great_circle


class City:
    Gvardeisk = (54 + 39 / 60, 21 + 4 / 60)  # Ориентир для центирования карты
    Kaliningrad = (54 + 43 / 60, 20 + 30 / 60)


def _distance(a, b):
    # great_circle - менее точен, но быстрее чем geopy.distance.distance
    # https://geopy.readthedocs.io/en/stable/#module-geopy.distance
    return great_circle(a, b)

def distance_km(a: tuple, b: tuple):
    return _distance(a, b).km


def safe_distance(lat, lon, prev_lat, prev_lon):
    a = (lat, lon)
    b = (prev_lat, prev_lon)
    if a == b:
        return 0
    try:
        return distance_km(a, b)
    except ValueError:
        return np.nan


# Функции создания датафрейма

- make_raw_csv()
- get_dataframe_from_raw_csv()

In [0]:
import os
from dataclasses import dataclass
from itertools import islice
from time import time
from functools import wraps
from typing import Optional

from tqdm import tqdm
import pandas as pd

def timing(f):
    @wraps(f)
    def wrap(*args, **kwargs):
        ts = time()
        result = f(*args, **kwargs)
        te = time()
        print(f"{f.__name__}() took {te-ts:2.2f} sec")
        return result

    return wrap


def add_distance(rows):
    prev_row = {"ride": ""}
    for row in rows:
        if row["ride"] == prev_row["ride"]:
            row["dist"] = safe_distance(
                row["lat"], row["lon"], prev_row["lat"], prev_row["lon"]
            )
        else:
            row["dist"] = 0
        prev_row = row
        yield row


class Points:
    """Получение точек треков из каталога."""

    columns = ["time", "lon", "lat", "car", "ride"]
    dtypes = dict(time=int, lon=float, lat=float, car=str, ride=str, dist=float)

    def __init__(self, directory_path: str):
        self.directory_path = directory_path

    def iterate(self):
        for dict_ in Folder(self.directory_path).yield_jsons():
            for p in dict_["data"]:
                car = dict_["info"]["car_id"]
                ride = dict_["info"]["id"]
                yield (p[0], p[1], p[2], car, ride)

    @classmethod
    def to_dict(cls, values):
        return dict((k, v) for k, v in zip(cls.columns, values))

    def islice(self, n: int, a: int):
        return islice(self.iterate(), a, n)

    def _get_iterator(self, n, skip):
        if n is None:
            return self.iterate()
        else:
            return self.islice(n + skip, skip)

    @timing
    def raw_dataframe(self, n=None, skip=0):
        gen = self._get_iterator(n, skip)
        gen = tqdm(gen, unit=" rows")
        return pd.DataFrame(data=gen, columns=self.columns)

    @timing
    def dataframe_with_distances(self, n=None, skip=0):
        gen = self._get_iterator(n, skip)
        gen = map(self.to_dict, gen)
        gen = add_distance(gen)
        gen = tqdm(gen, unit=" rows")
        return pd.DataFrame(gen)


def to_date(x: int):
    return pd.Timestamp(x, unit="s")


@timing
def set_time(df):
    df["time"] = df.time.apply(to_date)


@timing
def set_date(df):
    df["date"] = df["time"].apply(lambda x: x.date())


@timing
def set_time_delta(df):
    df["time_delta"] = df.time.diff()
    # Первая точка любой поездки считается остановкой.
    # Время пребывания в этой остановке равно 0.
    if "prev_ride" not in df.columns:
        df["prev_ride"] = df.ride.shift(+1)
    ix = df.ride != df.prev_ride
    df.loc[ix, "time_delta"] = 0
    assert df.time_delta.min() == 0
    del df["prev_ride"]
    return df


@timing
def merge_with_car_types(df):
    return df.merge(
        VEHICLES[["car_id", "type"]], left_on="car", right_on="car_id"
    ).drop(columns="car_id")


@timing
def to_canonic(raw_df):
    # add new data
    df = set_time_delta(raw_df)
    df = merge_with_car_types(df)
    # convert and decorate
    set_time(df)
    set_date(df)
    return df

@dataclass
class LocalFile
    path: str 

    def warn_file_exists(self):
       print(f"File {self.path} already exists.")

    def raise_path_invalid(self):
       raise FileNotFoundError(f"Provided path not valid: {self.path}")

    def touch(self):
        try:
            pd.read_csv(self.csvpath, nrows=10)
        except OSError:
            self.raise_path_invalid()

@dataclass
class RawData:
    source_folder: str = Filename.RAW_JSON_FOLDER
    csvpath: str = Filename.CSV_SOURCE
    nrows: Optional[int] = None
    skip: int = 0

    def get_dataframe(self) -> pd.DataFrame:
        p = Points(self.source_folder)
        return p.dataframe_with_distances(self.nrows, self.skip)

    def to_csv(self, force=False) -> None:
        if not os.path.exists(self.csvpath) or force is True:
            df = self.get_dataframe()
            df.to_csv(self.csvpath, index=False)
        else:
            LocalFile(self.csvpath).warn_file_exists()

    @timing
    def from_cached_file(self) -> pd.DataFrame:
        LocalFile(self.csvpath).touch()
        return pd.read_csv(
            self.csvpath, nrows=self.nrows, header=0, dtype=Points.dtypes
        )


def make_raw_csv(force=False):
    """Переписать треки из файлов JSON в CSV файл.
    Примерное время исполнения: от 5-6 минут.
    """
    RawData().to_csv(force)


@timing
def get_dataframe_from_jsons(
    nrows: Optional[int] = None, 
    source_folder: str = Filename.RAW_JSON_FOLDER
):
    """Получить типовой (канонический) набор данных из файлов JSON.
    Это более медленный способ, он занимает 5-6 минут.
    """
    print("Reading from JSON files...")
    raw_df = RawData(nrows=nrows, source_folder=source_folder).get_dataframe()
    print("Finished reading JSON files, creating dataframe...")
    return to_canonic(raw_df)


@timing
def get_dataframe_from_raw_csv(
    nrows: Optional[int] = None, 
    path: str = Filename.CSV_SOURCE,
):
    """Получить типовой (канонический) набор данных из сохраненного 
    файла CSV. Это более быстрый способ, до 1-1.5 минуты.
    
    Файл должен быть подготовлен командой `make_raw_csv()`.
    """
    print("Reading raw CSV file...")
    raw_df = RawData(csvpath=path, nrows=nrows).from_cached_file()
    print("Finished reading raw CSV file", path)
    print("Creating dataframe...")
    return to_canonic(raw_df)


def describe(df, stops_df):
    r = len(stops_df) / len(df) * 100
    print(
        f"""Points considered: {len(df):>9}
Stops: {len(stops_df):>9} ({r:.1f})%"""
    )
    return len(stops_df), len(df), r

SyntaxError: ignored

# Получить полный набор треков 

Создаем переменную `df_full`. 

Переменную также можно проверить на корректную длину. Проверка убрана из кода, потому что заранее длина фрейма на новых данных неизвестна.

```
assert df_full.shape == (9065205, 9)
```

In [0]:
make_raw_csv(force=ERASE_EVERYTHING_AT_START)
try:
  df_full  
except NameError:  
  df_full = get_dataframe_from_raw_csv()
df_full.head()

In [0]:
df_full.describe()