# Imports

In [47]:
import datetime
import pandas
import numpy
import dask
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client
import dask.array as da

#### Initialize dask local cluster

In [2]:
cluster = LocalCluster()
client = Client(cluster)
client

2022-08-26 11:28:30,352 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-s3rkvr61', purging
2022-08-26 11:28:30,352 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-e4o83bid', purging
2022-08-26 11:28:30,352 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-npuq_0y0', purging
2022-08-26 11:28:30,352 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-yjz4lc7r', purging
2022-08-26 11:28:30,352 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-6yt_nf_t', purging
2022-08-26 11:28:30,353 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-fiafljnr', purging
2022-08-26 11:28:30,353 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-7kpw1dg7', purging

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 62.82 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34231,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 62.82 GiB

0,1
Comm: tcp://127.0.0.1:34373,Total threads: 2
Dashboard: http://127.0.0.1:41055/status,Memory: 15.70 GiB
Nanny: tcp://127.0.0.1:41817,
Local directory: /tmp/dask-worker-space/worker-i0l5aymi,Local directory: /tmp/dask-worker-space/worker-i0l5aymi

0,1
Comm: tcp://127.0.0.1:40217,Total threads: 2
Dashboard: http://127.0.0.1:33247/status,Memory: 15.70 GiB
Nanny: tcp://127.0.0.1:37413,
Local directory: /tmp/dask-worker-space/worker-0y800adh,Local directory: /tmp/dask-worker-space/worker-0y800adh

0,1
Comm: tcp://127.0.0.1:39003,Total threads: 2
Dashboard: http://127.0.0.1:39965/status,Memory: 15.70 GiB
Nanny: tcp://127.0.0.1:43819,
Local directory: /tmp/dask-worker-space/worker-ccv9kxfr,Local directory: /tmp/dask-worker-space/worker-ccv9kxfr

0,1
Comm: tcp://127.0.0.1:36077,Total threads: 2
Dashboard: http://127.0.0.1:45807/status,Memory: 15.70 GiB
Nanny: tcp://127.0.0.1:44081,
Local directory: /tmp/dask-worker-space/worker-jkk8kxl4,Local directory: /tmp/dask-worker-space/worker-jkk8kxl4


# Variables

*TRANSFER_DELTA* - timedelta, during which the difference between adjacent trips is considered a transfer

In [3]:
TRANSFER_DELTA = datetime.timedelta(minutes=20)

# Initial dataset things

### Convert dataset from csv to parquet (do not run until you dont have parquet data!!!)

#### Load dataset from csv 

In [60]:
initial_trips = dd.read_csv("trips_04.csv",
                                on_bad_lines='skip',
                                parse_dates=['start_trip_dttm', 'day'],
                                dtype={
                                    "hash_ticket_uid": "string",
                                    "start_latitude": "float32",
                                    "start_longitude": "float32",
                                    "start_entrance_id": "UInt32",
                                    "start_entrance_nm": "string",
                                    "start_station_id": "UInt32",
                                    "start_station_nm": "string",
                                    "start_line_id": "UInt32",
                                    "start_line_nm": "string",
                                    "start_stop_id": "UInt32",
                                    "start_stop_nm": "string",
                                    "vehicle_type": "string",
                                    "end_station_id": "UInt32",
                                    "end_station_nm": "string",
                                    "end_line_id": "UInt32",
                                    "end_line_nm": "string",
                                    "end_stop_id": "UInt32",
                                    "end_stop_nm": "string",
                                    "end_latitude": "float32",
                                    "end_longitude": "float32",
                                    "route_num": "string"
                                })

#### Convert dataset to parquet

In [61]:
initial_trips.to_parquet('data/trips_04.parquet', engine='pyarrow')



### Read data from parquet

In [67]:
trips_df = dd.read_parquet('data/trips_04.parquet', engine='pyarrow',
                           columns=["hash_ticket_uid", "start_trip_dttm",
                                    "end_trip_dttm", "end_latitude"])

#### Check dataset info

In [68]:
trips_df

Unnamed: 0_level_0,hash_ticket_uid,start_trip_dttm,end_trip_dttm,end_latitude
npartitions=53,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,string,"datetime64[ns, UTC+03:00]",object,float32
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [6]:
trips_df.groupby("hash_ticket_uid").get_group("700702083475E41178E1F18CC7926A16").compute()



Unnamed: 0,hash_ticket_uid,start_trip_dttm,end_trip_dttm
0,700702083475E41178E1F18CC7926A16,2022-04-04 14:56:40+03:00,2022-04-04T15:05:39.000+03:00
173208,700702083475E41178E1F18CC7926A16,2022-04-04 15:08:23+03:00,2022-04-04T15:13:53.000+03:00
145193,700702083475E41178E1F18CC7926A16,2022-04-04 11:34:28+03:00,2022-04-04T12:07:56.000+03:00
145381,700702083475E41178E1F18CC7926A16,2022-04-04 16:17:42+03:00,2022-04-04T16:21:42.000+03:00
147676,700702083475E41178E1F18CC7926A16,2022-04-04 16:48:08+03:00,2022-04-04T17:04:18.000+03:00
126396,700702083475E41178E1F18CC7926A16,2022-04-04 14:01:12+03:00,2022-04-04T14:21:06.000+03:00
47408,700702083475E41178E1F18CC7926A16,2022-04-04 13:29:04+03:00,2022-04-04T13:58:54.000+03:00
20821,700702083475E41178E1F18CC7926A16,2022-04-04 12:20:28+03:00,2022-04-04T12:37:01.000+03:00
22463,700702083475E41178E1F18CC7926A16,2022-04-04 15:32:57+03:00,2022-04-04T15:45:57.000+03:00
23009,700702083475E41178E1F18CC7926A16,2022-04-04 17:35:20+03:00,2022-04-04T17:40:57.000+03:00


# Group trips into complete rides

## **WARNING** THIS WOULD TAKE A LOT OF TIME (about an hour)

In [None]:
print(1)

def trips_to_rides(trips_part: list) -> list[list]:
    if trips_part.empty:
        return pandas.DataFrame()
    if trips_part.shape[0] > 1:
        trips_part = trips_part.sort_values(by="start_trip_dttm")
        temp = []
        for index, obj in enumerate(trips_part.iterrows()):
            df_index, trip = obj
            if isinstance(trip["end_trip_dttm"], float):
                continue
            trip["end_trip_dttm"] = pandas.to_datetime(trip["end_trip_dttm"])
            if index + 1 == trips_part.shape[0]:
                break
            diff = (trips_part.iloc[index + 1]["start_trip_dttm"] - trip["end_trip_dttm"])
            if diff <= TRANSFER_DELTA:
                temp.append(trip)
                temp.append(trips_part.iloc[index + 1])
            else:
                if temp:
                    return pandas.DataFrame(data=temp)
                else:
                    return pandas.DataFrame()
            
rides = trips_df.groupby("hash_ticket_uid").apply(trips_to_rides).compute()

1


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  rides = trips_df.groupby("hash_ticket_uid").apply(trips_to_rides).compute()
