# Análisis de viajes en taxi de 2013 en Nueva York 

El objetivo de este cuaderno es explorar los datos de los viajes en taxi durante el año 2013 en la ciudad de Nueva York haciendo uso de la librería Dask. Para empezar, inicializamos dask e importamos el dataset

In [1]:
from dask.distributed import Client

client = Client(n_workers=4)

In [2]:
import dask.bag as db
import os
#files = [os.path.join('Datos','2015-01-0%d-*.json.gz' % i) for i in range(1,2)]
files = os.path.join('trip_data', '*_1.csv')
b = db.read_text(files)
print(files)
print(b)

trip_data/*_1.csv
dask.bag<bag-from-delayed, npartitions=1>


Ahora, miremos el primer archivo

In [3]:
b.take(2)

('medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude\n',
 '89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171\n')

Sin embargo,como los archivos están en formato csv se pueden leer y manejar en dataframes de una mejor manera

In [4]:
import dask.dataframe as dd

df = dd.read_csv('trip_data/*_8.csv',dtype={' passenger_count': 'float64',
       ' rate_code': 'float64',
       ' trip_time_in_secs': 'float64'})
df

Unnamed: 0_level_0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
npartitions=27,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
,object,object,object,float64,object,object,object,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Ahora, viendo la cabecera del primer dataframe

In [5]:
df.head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,3418135604CD3F357DD9577AF978C5C0,B25386A1F259C87449430593E904FDBC,VTS,1.0,,2013-08-30 07:57:00,2013-08-30 08:30:00,5.0,1980.0,14.58,-73.791359,40.645657,-73.922501,40.758766
1,6D3B2A7682C30DCF64F3F12976EF93B6,A603A9D5FAA46E8FF2A97A143328D938,CMT,1.0,N,2013-08-30 23:26:23,2013-08-30 23:46:01,2.0,1177.0,11.0,-73.862724,40.769062,-73.976845,40.764595
2,6D49E494913752B75B2685E0019FF3D5,3F0BFE90A5D71741840B25600A89E225,CMT,1.0,N,2013-08-30 09:18:10,2013-08-30 09:24:08,1.0,357.0,0.8,-73.991653,40.750324,-73.98642,40.742924
3,4C4A0AFC432A1A87E97ED8F18403FF6E,BA20A20E2CF85EF7B00162D711394C7E,CMT,1.0,N,2013-08-26 23:27:11,2013-08-26 23:42:49,4.0,938.0,7.7,-73.975372,40.756237,-73.867119,40.721886
4,1258CA1DF5E2A9E9A9F7848408A7AAEB,8C14DCF69CAA2A9A0DFAFD99E00536A1,CMT,1.0,N,2013-08-29 10:57:56,2013-08-29 11:19:06,2.0,1270.0,2.1,-73.99102,40.750912,-73.996727,40.767578


Ahora miremos los tipos de datos almacenados 

In [6]:
df.dtypes

medallion               object
 hack_license           object
 vendor_id              object
 rate_code             float64
 store_and_fwd_flag     object
 pickup_datetime        object
 dropoff_datetime       object
 passenger_count       float64
 trip_time_in_secs     float64
 trip_distance         float64
 pickup_longitude      float64
 pickup_latitude       float64
 dropoff_longitude     float64
 dropoff_latitude      float64
dtype: object

Y de paso, miremos la parte final del último dataframe

In [7]:
df.tail()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
300912,0076C8327A95E988E721AC33B0FA9D67,7F29DC4519B93DA549E67576A6F1FA50,CMT,1.0,N,2013-08-25 08:37:32,2013-08-25 08:41:16,1.0,224.0,0.8,-73.987053,40.755119,-73.977295,40.753895
300913,6D2A7531278E37E98994C8A28D5934A4,439BF2C0D49D6F06DDE1F9ED43982ABC,CMT,1.0,N,2013-08-22 02:19:12,2013-08-22 02:23:40,1.0,267.0,0.9,-73.995468,40.744133,-74.007576,40.752285
300914,79B960341B970153F8158B5CA93D9A79,F50D473528362D478240517E4754C617,CMT,1.0,N,2013-08-24 22:43:19,2013-08-24 22:48:07,1.0,287.0,1.2,-73.965034,40.755943,-73.978951,40.747639
300915,22CD971B4E1D4B5D82B129D0232648AB,1A85E6E33CF0F153BE0F1A7FD0421CD5,CMT,1.0,N,2013-08-24 23:31:31,2013-08-24 23:38:52,1.0,440.0,1.4,-73.872673,40.774246,-73.874741,40.756763
300916,5B45A2FD44F343,,,,,,,,,,,,,


Como podemos ver el último registro tiene valores NaN, por ello, eliminamos este tipo de valores de la misma manera que lo hacemos en pandas

In [8]:
df = df.dropna()
df.tail()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
300911,05015CC5687C5A59BB9A1F8EB66A9DE7,88D37ED31BFD7A512DA4DA8B0D99B145,CMT,1.0,N,2013-08-21 19:25:21,2013-08-21 19:32:08,1.0,406.0,1.1,-74.004593,40.747158,-73.99308,40.757015
300912,0076C8327A95E988E721AC33B0FA9D67,7F29DC4519B93DA549E67576A6F1FA50,CMT,1.0,N,2013-08-25 08:37:32,2013-08-25 08:41:16,1.0,224.0,0.8,-73.987053,40.755119,-73.977295,40.753895
300913,6D2A7531278E37E98994C8A28D5934A4,439BF2C0D49D6F06DDE1F9ED43982ABC,CMT,1.0,N,2013-08-22 02:19:12,2013-08-22 02:23:40,1.0,267.0,0.9,-73.995468,40.744133,-74.007576,40.752285
300914,79B960341B970153F8158B5CA93D9A79,F50D473528362D478240517E4754C617,CMT,1.0,N,2013-08-24 22:43:19,2013-08-24 22:48:07,1.0,287.0,1.2,-73.965034,40.755943,-73.978951,40.747639
300915,22CD971B4E1D4B5D82B129D0232648AB,1A85E6E33CF0F153BE0F1A7FD0421CD5,CMT,1.0,N,2013-08-24 23:31:31,2013-08-24 23:38:52,1.0,440.0,1.4,-73.872673,40.774246,-73.874741,40.756763


Luego, examinamos la estructura de los datos usando

In [9]:
df.values

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, 14)","(nan, 14)"
Count,81 Tasks,27 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan, 14) (nan, 14) Count 81 Tasks 27 Chunks Type object numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, 14)","(nan, 14)"
Count,81 Tasks,27 Chunks
Type,object,numpy.ndarray


Una vez con esto, comenzamos a hacer consultas y a extraer información. Primero miramos el número de registros

In [10]:
%%time
len(df)

CPU times: user 7.41 s, sys: 944 ms, total: 8.35 s
Wall time: 1min 38s


5183794

Ahora, extraemos la información relacionada con los pasajeros y calculamos el número máximo de pasajeros en un viaje. 

In [11]:
%%time
pasajeros = df[' passenger_count']
pasajeros.max().compute()

CPU times: user 6.77 s, sys: 791 ms, total: 7.56 s
Wall time: 1min 26s


9.0

Sin embargo, podemos hacer el calculo directamente y vemos que el tiempo es ligeramente menor

In [12]:
%%time
df[' passenger_count'].max().compute()

CPU times: user 6.55 s, sys: 894 ms, total: 7.45 s
Wall time: 1min 20s


9.0

Ahora, modifiquemos los nombres de las columnas

In [13]:
df.columns.values

array(['medallion', ' hack_license', ' vendor_id', ' rate_code',
       ' store_and_fwd_flag', ' pickup_datetime', ' dropoff_datetime',
       ' passenger_count', ' trip_time_in_secs', ' trip_distance',
       ' pickup_longitude', ' pickup_latitude', ' dropoff_longitude',
       ' dropoff_latitude'], dtype=object)

In [14]:
df = df.rename(columns={' hack_license': 'hack_license',  ' trip_time_in_secs': 'trip_time_in_secs', ' trip_distance':'trip_distance'})

Ahora, guardemos la información de las licencias y el tiempo de viaje

In [16]:
df2 = df[['hack_license','trip_time_in_secs']]
df2

Unnamed: 0_level_0,hack_license,trip_time_in_secs
npartitions=27,Unnamed: 1_level_1,Unnamed: 2_level_1
,object,float64
,...,...
...,...,...
,...,...
,...,...


In [17]:
df2.head()

Unnamed: 0,hack_license,trip_time_in_secs
1,A603A9D5FAA46E8FF2A97A143328D938,1177.0
2,3F0BFE90A5D71741840B25600A89E225,357.0
3,BA20A20E2CF85EF7B00162D711394C7E,938.0
4,8C14DCF69CAA2A9A0DFAFD99E00536A1,1270.0
5,95E1B89BC718FB1DC76B939937E043F3,1392.0


Ahora, busquemos el viaje con más duración para cada uno de los conductores

In [10]:
%%time
df2.groupby("hack_license").trip_time_in_secs.max().compute()


CPU times: user 6.29 s, sys: 896 ms, total: 7.19 s
Wall time: 1min 18s


hack_license
0002555BBE359440D6CEB34B699D3932    3065.0
0008B3E338CE8C3377E071A4D80D3694    4519.0
000B8D660A329BBDBF888500E4BD8B98    5686.0
000C15EFBDF58CAED528C09E597484EC    3009.0
0011B1575B9F5398BBC0F27EA560D631    2961.0
                                     ...  
E0F8F0C8AF43ECC64472280CC007A91D     840.0
EE5989A60E797496121E1C4BD9E6A974    1089.0
F0934BDB0EBB147B32A821851F83FB0E    1570.0
F84A7C01588C593BFBF4CCF1DA52DBD2     234.0
FD959C247791E390F850ED2A979E82AE       9.0
Name: trip_time_in_secs, Length: 17035, dtype: float64

Esto también lo podemos calcular directamente y los tiempos son similares

In [18]:
%%time
df.groupby("hack_license").trip_time_in_secs.max().compute()

CPU times: user 6.16 s, sys: 854 ms, total: 7.01 s
Wall time: 1min 19s


hack_license
0002555BBE359440D6CEB34B699D3932    3065.0
0008B3E338CE8C3377E071A4D80D3694    4519.0
000B8D660A329BBDBF888500E4BD8B98    5686.0
000C15EFBDF58CAED528C09E597484EC    3009.0
0011B1575B9F5398BBC0F27EA560D631    2961.0
                                     ...  
E0F8F0C8AF43ECC64472280CC007A91D     840.0
EE5989A60E797496121E1C4BD9E6A974    1089.0
F0934BDB0EBB147B32A821851F83FB0E    1570.0
F84A7C01588C593BFBF4CCF1DA52DBD2     234.0
FD959C247791E390F850ED2A979E82AE       9.0
Name: trip_time_in_secs, Length: 17035, dtype: float64

In [23]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
ERROR:asyncio:_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
