# Prepare Data

![ML Pathway1](https://github.com/Adribom/Kaggle-Competition/assets/50799373/ac672943-c1dc-41e6-8890-66a0b5dcf2fe)

---

## Loading the data with only Pandas

In [1]:
import numpy as np
import matplotlib as plt
import pandas as pd
from functools import reduce

In [2]:
# # Check how many lines in train.csv
# with open('train.csv') as file:
#     n_rows = len(file.readlines())
# n_rows

The file has 55423857 lines. Only to count the number of lines, it took 5m 23.3s!

In [2]:
%%time
train_df_chunk1 = pd.read_csv('train.csv', nrows=100)
train_df_chunk1.columns

CPU times: total: 15.6 ms
Wall time: 120 ms


Index(['key', 'fare_amount', 'pickup_datetime', 'pickup_longitude',
       'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'passenger_count'],
      dtype='object')

#### Features
- **pickup_datetime** - timestamp value indicating when the taxi ride started.
- **pickup_longitude** - float for longitude coordinate of where the taxi ride started.
- **pickup_latitude** - float for latitude coordinate of where the taxi ride started.
- **dropoff_longitude** - float for longitude coordinate of where the taxi ride ended.
- **dropoff_latitude** - float for latitude coordinate of where the taxi ride ended.
- **passenger_count** - integer indicating the number of passengers in the taxi ride.

In [3]:
train_df_chunk1.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2009-06-15 17:26:21.0000001,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,2010-01-05 16:52:16.0000002,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,2011-08-18 00:35:00.00000049,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,2012-04-21 04:30:42.0000001,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,2010-03-09 07:51:00.000000135,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


In [4]:
train_df_chunk1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 8 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   key                100 non-null    object 
 1   fare_amount        100 non-null    float64
 2   pickup_datetime    100 non-null    object 
 3   pickup_longitude   100 non-null    float64
 4   pickup_latitude    100 non-null    float64
 5   dropoff_longitude  100 non-null    float64
 6   dropoff_latitude   100 non-null    float64
 7   passenger_count    100 non-null    int64  
dtypes: float64(5), int64(1), object(2)
memory usage: 6.4+ KB


The columns that have dtype64 doesn't need such precision. We can then resuce de quantity of memory used. As an example:

- int8 can store integers from -128 to 127.
- int16 can store integers from -32768 to 32767.
- int64 can store integers from -9223372036854775808 to 9223372036854775807.


In this case, we can do the following transformations, depending on the feature comlumn:
- float64 --> float32 | float16
- int64 --> int8
- object --> string[pyarrow]

In [3]:
# Conversão de valores para cada coluna
new_dtype = {'fare_amount' : 'float16',
             'pickup_datetime' : 'string[pyarrow]', 
             'pickup_longitude' : 'float32', 
             'pickup_latitude' : 'float32',
             'dropoff_longitude' : 'float32', 
             'dropoff_latitude' : 'float32', 
             'passenger_count' : 'int8'} 

furthermore, the column 'key' isn't necessary for training. 

In [4]:
# Lista das colunas que serão utilizadas no treino
req_cols = train_df_chunk1.columns.tolist()
req_cols.pop(0)
req_cols

['fare_amount',
 'pickup_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count']

Split the files and read them in chunks can also help optimze the process.

In [11]:
nrows=1000
train_df_chunk2 = pd.read_csv('train.csv', usecols=req_cols, dtype=new_dtype, nrows=1000)
train_df_chunk2.head()

Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,4.5,2009-06-15 17:26:21 UTC,-73.844315,40.721317,-73.841614,40.712276,1
1,16.90625,2010-01-05 16:52:16 UTC,-74.016045,40.711304,-73.979271,40.782005,1
2,5.699219,2011-08-18 00:35:00 UTC,-73.982735,40.761269,-73.991241,40.750561,2
3,7.699219,2012-04-21 04:30:42 UTC,-73.987129,40.733143,-73.99157,40.758091,1
4,5.300781,2010-03-09 07:51:00 UTC,-73.968094,40.768009,-73.956657,40.783764,1


In [9]:
train_df_chunk1.memory_usage(deep= True)

Index                  128
key                  84695
fare_amount           8000
pickup_datetime      80000
pickup_longitude      8000
pickup_latitude       8000
dropoff_longitude     8000
dropoff_latitude      8000
passenger_count       8000
dtype: int64

In [10]:
train_df_chunk2.memory_usage(deep= True)

Index                  128
fare_amount           2000
pickup_datetime      27000
pickup_longitude      4000
pickup_latitude       4000
dropoff_longitude     4000
dropoff_latitude      4000
passenger_count       1000
dtype: int64

In [11]:
%%time
train_tp = pd.read_csv('train.csv', usecols=req_cols, dtype=new_dtype, iterator=True, chunksize=10000)
train_df = pd.concat(train_tp, ignore_index=True)

In [12]:
# remover NaN
train_df.dropna(how = 'any', axis = 'rows')

Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,4.500000,2009-06-15 17:26:21 UTC,-73.844315,40.721317,-73.841614,40.712276,1
1,16.906250,2010-01-05 16:52:16 UTC,-74.016045,40.711304,-73.979271,40.782005,1
2,5.699219,2011-08-18 00:35:00 UTC,-73.982735,40.761269,-73.991241,40.750561,2
3,7.699219,2012-04-21 04:30:42 UTC,-73.987129,40.733143,-73.991570,40.758091,1
4,5.300781,2010-03-09 07:51:00 UTC,-73.968094,40.768009,-73.956657,40.783764,1
...,...,...,...,...,...,...,...
55423851,14.000000,2014-03-15 03:28:00 UTC,-74.005272,40.740028,-73.963280,40.762554,1
55423852,4.199219,2009-03-24 20:46:20 UTC,-73.957787,40.765530,-73.951637,40.773960,1
55423853,14.101562,2011-04-02 22:04:24 UTC,-73.970505,40.752323,-73.960541,40.797340,1
55423854,28.906250,2011-10-26 05:57:51 UTC,-73.980904,40.764629,-73.870605,40.773964,1


In [None]:
# On my computer, cant even finish the operation without running out of memory

def travel_distance(latitude_start, longitude_start, latitude_finish, longitude_finish):
    return np.sqrt(np.abs(latitude_finish-latitude_start)**2 + np.abs(longitude_finish-longitude_start)**2)

train_df['travel_distance'] = np.vectorize(travel_distance)(train_df['pickup_latitude'], train_df['pickup_longitude'], 
                                                            train_df['dropoff_latitude'], train_df['dropoff_longitude'])

train_df.sort_values(travel_distance)

## Dask (Parallel processing)

---

### Simply using Dask instead of Pandas

In [5]:
import os
import dask.dataframe as dd

In [9]:
%%time
df = dd.read_csv('train.csv', usecols=req_cols, dtype=new_dtype)

CPU times: total: 31.2 ms
Wall time: 257 ms


In [12]:
df.head()

Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,4.5,2009-06-15 17:26:21 UTC,-73.844315,40.721317,-73.841614,40.712276,1
1,16.90625,2010-01-05 16:52:16 UTC,-74.016045,40.711304,-73.979271,40.782005,1
2,5.699219,2011-08-18 00:35:00 UTC,-73.982735,40.761269,-73.991241,40.750561,2
3,7.699219,2012-04-21 04:30:42 UTC,-73.987129,40.733143,-73.99157,40.758091,1
4,5.300781,2010-03-09 07:51:00 UTC,-73.968094,40.768009,-73.956657,40.783764,1


In [13]:
df.isnull().sum().compute()

fare_amount            0
pickup_datetime        0
pickup_longitude       0
pickup_latitude        0
dropoff_longitude    376
dropoff_latitude     376
passenger_count        0
dtype: int64

Until this step, the operations have been very fast. Thats because dask only does the real computation when we specify it with .compute.

In [18]:
%%time
df.dropna(how='any').compute()

CPU times: total: 1min 51s
Wall time: 1min 37s


Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,4.500000,2009-06-15 17:26:21 UTC,-73.844315,40.721317,-73.841614,40.712276,1
1,16.906250,2010-01-05 16:52:16 UTC,-74.016045,40.711304,-73.979271,40.782005,1
2,5.699219,2011-08-18 00:35:00 UTC,-73.982735,40.761269,-73.991241,40.750561,2
3,7.699219,2012-04-21 04:30:42 UTC,-73.987129,40.733143,-73.991570,40.758091,1
4,5.300781,2010-03-09 07:51:00 UTC,-73.968094,40.768009,-73.956657,40.783764,1
...,...,...,...,...,...,...,...
622599,14.000000,2014-03-15 03:28:00 UTC,-74.005272,40.740028,-73.963280,40.762554,1
622600,4.199219,2009-03-24 20:46:20 UTC,-73.957787,40.765530,-73.951637,40.773960,1
622601,14.101562,2011-04-02 22:04:24 UTC,-73.970505,40.752323,-73.960541,40.797340,1
622602,28.906250,2011-10-26 05:57:51 UTC,-73.980904,40.764629,-73.870605,40.773964,1


In [15]:
df['abs_diff_longitude'] = (df.dropoff_longitude - df.pickup_longitude).abs()

ParserError: Error tokenizing data. C error: out of memory

Still out of memory error

### Utilizing graph of operantion and operation in chunks

The way Dask works involves two steps:

1. First, you setup a computation, internally represented as a graph of operations.
2. Then, you actually run the computation on that graph.

Rough rules of thumb for chunksize:

1. If you already created a prototype, which may not involve Dask at all, using a small subset of the data you intend to process, you’ll have a clear idea of what size of data can be processed easily for this workflow. You can use this knowledge to choose similar sized chunks in Dask.
2. Some people have observed that chunk sizes below 1MB are almost always bad. Chunk size between 100MB and 1GB are generally good, going over 1 or 2GB means you have a really big dataset and/or a lot of memory available per core,
3. Upper bound: Avoid too large task graphs. More than 10,000 or 100,000 chunks may start to perform poorly.
4. Lower bound: To get the advantage of parallelization, you need the number of chunks to at least equal the number of worker cores available (or better, the number of worker cores times 2). Otherwise, some workers will stay idle.
5. The time taken to compute each task should be much larger than the time needed to schedule the task. The Dask scheduler takes roughly 1 millisecond to coordinate a single task, so a good task computation time would be measured in seconds (not milliseconds).

In [5]:
import os
import dask.dataframe as dd

In [6]:
%%time

# Load the data with Dask instead of Pandas.
df = dd.read_csv(
    'train.csv', 
    usecols=req_cols, 
    dtype=new_dtype,
    blocksize=500 * 1024 * 1024, # 500MB chunks    
)

# Setup the calculation graph; unlike Pandas code,
# no work is done at this point:
def add_travel_vector_features(df):
    df['abs_diff_longitude'] = (df.dropoff_longitude - df.pickup_longitude).abs()
    df['abs_diff_latitude'] = (df.dropoff_latitude - df.pickup_latitude).abs()

# Each degree of latitude or longitude equals approximately 111.11 km, so a distance of 2 degrees for the taxi would be unlikely.
def calculation_graph1(df):
    df.dropna(how='any')
    add_travel_vector_features(df)
    df = df[(df['abs_diff_latitude'] < 2) & ((df['abs_diff_longitude'] < 2) & ((df['fare_amount'] <= 500) & ((df['fare_amount'] >= 2.5) & (df['passenger_count'] <= 5))))]
    return df

df = calculation_graph1(df)

# Actually run the computation, using 6 threads:
df = df.compute(num_workers=6)

CPU times: total: 1min 49s
Wall time: 2min 11s


In [None]:
df.to_csv('Processed_data/train_processed.csv')

In [8]:
def calculation_graph2(df):
    df['pickup_datetime'] = dd.to_datetime(df['pickup_datetime'], format=True)
    return df

df = calculation_graph2(df)

# Actually run the computation, using 6 threads:
df = df.compute(num_workers=6)

MemoryError: 

The function dd.to_datetime stills isn't optmizes enough for running on the entire dataset

In [19]:
df[['abs_diff_longitude', 'abs_diff_latitude']].sort_values('abs_diff_latitude', ascending=False)[:10]

Unnamed: 0,abs_diff_longitude,abs_diff_latitude
50124,1.372154,1.94614
75161,1.70433,1.934723
81217,0.002403,1.924252
56814,1.483332,1.904047
116669,0.226624,1.888096
13868,0.21653,1.887989
88742,1.185425,1.886124
115628,0.131516,1.881638
90896,0.003975,1.880322
77667,0.007141,1.876225


In [22]:
# The column pickup_datetime is in date format, but Pandas/Dask still needs to recognize that
df[['pickup_datetime']][:5]

Unnamed: 0,pickup_datetime
0,2009-06-15 17:26:21 UTC
1,2010-01-05 16:52:16 UTC
2,2011-08-18 00:35:00 UTC
3,2012-04-21 04:30:42 UTC
4,2010-03-09 07:51:00 UTC


In [25]:
df['pickup_datetime'] = dd.to_datetime(df['pickup_datetime'], format='%Y-%m-%d %H:%M:%S %Z')

KeyboardInterrupt: 