In [1]:
#https://www.kaggle.com/szelee/how-to-import-a-csv-file-of-55-million-rows
import pandas as pd
import dask.dataframe as dd
import os
from tqdm import tqdm
TRAIN_PATH = 'train.csv'

In [2]:
%%time
# Assume we only know that the csv file is somehow large, but not the exact size
# we want to know the exact number of rows

# Method 1, using file.readlines. Takes about 20 seconds.
with open(TRAIN_PATH) as file:
    n_rows = len(file.readlines())
    
print(f'Exact number of rows: {n_rows}')


Exact number of rows: 55423857
CPU times: user 20.9 s, sys: 23 s, total: 43.9 s
Wall time: 54.6 s


In [None]:
%%time

# Method 2 by @danlester, using wc unix command. Takes only 3 seconds!
s = !wc -l {TRAIN_PATH}

# add one because the file isn't quite correctly formatted as a CSV, should have a final newline char
n_rows = int(s[0].split(' ')[0])+1

print (f'Exact number of rows: {n_rows}')


In [7]:
%%time

# Same method but more 'pythonic'
import subprocess

def file_len(fname):
    p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE, 
                                              stderr=subprocess.PIPE)
    result, err = p.communicate()
    if p.returncode != 0:
        raise IOError(err)
    return int(result.strip().split()[0])+1

n_rows = file_len(TRAIN_PATH)
print (f'Exact number of rows: {n_rows}')

Exact number of rows: 55423857
CPU times: user 3.27 ms, sys: 15.5 ms, total: 18.8 ms
Wall time: 6.65 s


In [8]:
# Peep at the training file header
df_tmp = pd.read_csv(TRAIN_PATH,nrows=5)
df_tmp.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2009-06-15 17:26:21.0000001,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,2010-01-05 16:52:16.0000002,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,2011-08-18 00:35:00.00000049,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,2012-04-21 04:30:42.0000001,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,2010-03-09 07:51:00.000000135,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


In [9]:
df_tmp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 8 columns):
key                  5 non-null object
fare_amount          5 non-null float64
pickup_datetime      5 non-null object
pickup_longitude     5 non-null float64
pickup_latitude      5 non-null float64
dropoff_longitude    5 non-null float64
dropoff_latitude     5 non-null float64
passenger_count      5 non-null int64
dtypes: float64(5), int64(1), object(2)
memory usage: 400.0+ bytes


We might not need float64 (16 decimal places) for the longitude and latitude values. float32 (7 decimal places) might be just enough.

In [10]:
# Set columns to most suitable type to optimize for memory usage
traintypes = {'fare_amount': 'float32',
              'pickup_datetime': 'str', 
              'pickup_longitude': 'float32',
              'pickup_latitude': 'float32',
              'dropoff_longitude': 'float32',
              'dropoff_latitude': 'float32',
              'passenger_count': 'uint8'}

cols = list(traintypes.keys())

In [11]:
chunksize = 5_000_000 # 5 million rows at one go. Or try 10 million

In [12]:
%%time
df_list = [] # list to hold the batch dataframe

for df_chunk in tqdm(pd.read_csv(TRAIN_PATH,usecols=cols,dtype=traintypes,chunksize=chunksize)):
    # Neat trick from https://www.kaggle.com/btyuhas/bayesian-optimization-with-xgboost
    # Using parse_dates would be much slower!
    df_chunk['pickup_datetime'] = df_chunk['pickup_datetime'].str.slice(0,16)
    df_chunk['pickup_datetime'] = pd.to_datetime(df_chunk['pickup_datetime'],utc=True,format='%Y-%m-%d %H:%M')
    
    # Can process each chunk of dataframe here
    # clean_data(), feature_engineer(),fit()
    
    # Alternatively, append the chunk to list and merge all
    df_list.append(df_chunk)
    
    

12it [02:22,  9.26s/it]

CPU times: user 2min 2s, sys: 18.9 s, total: 2min 20s
Wall time: 2min 22s





In [13]:
# Merge all dataframes into one dataframe
train_df = pd.concat(df_list)

# Delete the dataframe list to release memory
del df_list

# See what we have loaded
train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55423856 entries, 0 to 55423855
Data columns (total 7 columns):
fare_amount          float32
pickup_datetime      datetime64[ns, UTC]
pickup_longitude     float32
pickup_latitude      float32
dropoff_longitude    float32
dropoff_latitude     float32
passenger_count      uint8
dtypes: datetime64[ns, UTC](1), float32(5), uint8(1)
memory usage: 1.5 GB


In [17]:
display(train_df.head())
display(train_df.tail())

Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,4.5,2009-06-15 17:26:00+00:00,-73.844315,40.721317,-73.841614,40.712276,1
1,16.9,2010-01-05 16:52:00+00:00,-74.016045,40.711304,-73.979271,40.782005,1
2,5.7,2011-08-18 00:35:00+00:00,-73.982735,40.761269,-73.991241,40.750561,2
3,7.7,2012-04-21 04:30:00+00:00,-73.987129,40.733143,-73.99157,40.758091,1
4,5.3,2010-03-09 07:51:00+00:00,-73.968094,40.768009,-73.956657,40.783764,1


Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
55423851,14.0,2014-03-15 03:28:00+00:00,-74.005272,40.740028,-73.96328,40.762554,1
55423852,4.2,2009-03-24 20:46:00+00:00,-73.957787,40.76553,-73.951637,40.77396,1
55423853,14.1,2011-04-02 22:04:00+00:00,-73.970505,40.752323,-73.960541,40.79734,1
55423854,28.9,2011-10-26 05:57:00+00:00,-73.980904,40.764629,-73.870605,40.773964,1
55423855,7.5,2014-12-12 11:33:00+00:00,-73.969719,40.797668,-73.970886,40.783314,1


In [19]:
%%time
# Save into feather format, about 1.5Gb. 
train_df.to_feather('nyc_taxi_data_raw.feather')

CPU times: user 2.59 s, sys: 3.19 s, total: 5.78 s
Wall time: 3.62 s


In [23]:
%%time
# load the same dataframe next time directly, without reading the csv file again!
import feather
train_df_new = feather.read_dataframe('nyc_taxi_data_raw.feather')

CPU times: user 1.4 s, sys: 3.75 s, total: 5.15 s
Wall time: 5.83 s


In [29]:
train_df_new.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55423856 entries, 0 to 55423855
Data columns (total 7 columns):
fare_amount          float32
pickup_datetime      datetime64[ns, UTC]
pickup_longitude     float32
pickup_latitude      float32
dropoff_longitude    float32
dropoff_latitude     float32
passenger_count      uint8
dtypes: datetime64[ns, UTC](1), float32(5), uint8(1)
memory usage: 1.5 GB


## Using Dask
With Dask and its dataframe construct, you set up the dataframe must like you would in pandas but rather than loading the data into pandas, this approach keeps the dataframe as a sort of ‘pointer’ to the data file and doesn’t load anything until you specifically tell it to do so.

Source: http://pythondata.com/dask-large-csv-python/

In [31]:
%%time
# dask's read_csv takes no time at all!
ddf = dd.read_csv(TRAIN_PATH,usecols=cols,dtype=traintypes)

CPU times: user 73.1 ms, sys: 31.5 ms, total: 105 ms
Wall time: 141 ms


In [32]:
# no info?
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 7 entries, fare_amount to passenger_count
dtypes: object(1), float32(5), uint8(1)

In [33]:
# nothing to describe?
ddf.describe()

Unnamed: 0_level_0,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float32,float32,float32,float32,float32,uint8
,...,...,...,...,...,...


In [34]:
%%time
# dask is lazy. It only works when it is asked explicitly with compute()
ddf.describe().compute()

CPU times: user 3min 6s, sys: 1min 3s, total: 4min 9s
Wall time: 1min 42s


Unnamed: 0,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
count,55423860.0,55423860.0,55423860.0,55423480.0,55423480.0,55423860.0
mean,11.3457,-72.16512,39.76592,-72.16639,39.76695,1.68538
std,20.69598,13.85106,10.39707,13.78469,10.38557,1.308445
min,-300.0,-3442.06,-3492.264,-3442.025,-3547.887,0.0
25%,6.0,-73.992,40.7351,-73.99122,40.73441,1.0
50%,8.5,-73.98151,40.75299,-73.97995,40.7533,1.0
75%,12.9,-73.96649,40.76725,-73.96356,40.76818,2.0
max,93963.36,3457.626,3408.79,3457.622,3537.133,208.0


In [35]:
%%time
# Again, it only works when it is asked :)
len(ddf)

CPU times: user 2min 16s, sys: 43.8 s, total: 3min
Wall time: 1min 31s


55423856

In [36]:
del ddf

In [37]:
%%time
# using dask read_csv followed by compute() to create a panda dataframe
ddf_pd = dd.read_csv(TRAIN_PATH,usecols=cols,dtype=traintypes).compute()

# RangeIndex is an optimized version of Int64Index that can represent a monotonic ordered set
# Source: https://pandas-docs.github.io/pandas-docs-travis/advanced.html#int64index-and-rangeindex
# Furthermore, without conversion, the resulting dataframe takes up more memory usage (1.9GB)
ddf_pd.index = pd.RangeIndex(start=0,stop=len(ddf_pd))

CPU times: user 2min 19s, sys: 56.8 s, total: 3min 15s
Wall time: 1min 51s


In [38]:
%%time
ddf_pd['pickup_datetime'] = ddf_pd['pickup_datetime'].str.slice(0,16)
ddf_pd['pickup_datetime'] = pd.to_datetime(ddf_pd['pickup_datetime'],utc=True,format='%Y-%m-%d %H:%M')

CPU times: user 41.2 s, sys: 55 s, total: 1min 36s
Wall time: 2min 5s


In [39]:
%%time
ddf_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55423856 entries, 0 to 55423855
Data columns (total 7 columns):
fare_amount          float32
pickup_datetime      datetime64[ns, UTC]
pickup_longitude     float32
pickup_latitude      float32
dropoff_longitude    float32
dropoff_latitude     float32
passenger_count      uint8
dtypes: datetime64[ns, UTC](1), float32(5), uint8(1)
memory usage: 1.5 GB
CPU times: user 1.55 s, sys: 7.59 s, total: 9.13 s
Wall time: 10.6 s
