## How to Work with Big Datasets on Kaggle Kernels


This particular competition is asking us to look and analyze some really big data sets. In its given form, it won't even load into pandas on the kaggle kernels. If you don't have a very fancy computer, it probably won't load on your either.

I am assuming I'm not the only one with limited computing power, and limited budget for running paid cloud setup. SO I've been looking into different methods to work with big datas on limited resources.

Below are some tips I collected while learning to get by with mostly Kaggle Kernels while not overloading its allocated RAM.

### Outline

- **Tip 1:** Deleting unused variables and gc.collect()
- **Tip 2:** Presetting the datatypes
- **Tip 3:** Importing selected rows of the file (including generating your own subsamples)
- **Tip 4:** Importing just selected columns
- **Tip 6:** Creative data processing
- **Tip 7:** Using Dask


In [6]:
import numpy as np
import pandas as pd
import datetime
import os
import time
import matplotlib.pyplot as plt
import seaborn as sns
import gc
%matplotlib inline

In [7]:
# make wider graphs

sns.set(rc = {'figure.figsize':(12,5)});
plt.figure(figsize = (12,5));

<Figure size 864x360 with 0 Axes>

## Tip 1: Deleting unused variables and gc.collect()

The thing about python is that once it loads something into RAM it doesn't really get rid of it effectively. So if you load a huge dataframe into pandas, and then make a copy of it and never use it again, that original dataframe will still be in your RAM. Eating away at your memory. Same goes for any other variables you create.

Therefore if you used up a dataframe (or other variable), get in the habit of deleting it.

For example, if you create a dataframe temp, extract some features and merge results to your main training set, temp will still be eating up space. You need to explicitly delete it by stating del temp. You also need to make sure that nothing else is referring to temp (you don't have any other variables bound to it).

Even after doing so there may still be residual memory usuage going on.

That's where the garbage collection module comes in. `import gc` at the beginning of your project, and then each time you want to clear up space put command `gc.collect()`.

It also helps to run `gc.collect()` after multiple `transformations/functions/copying` etc.. as all the little references/values accumulate.

In [8]:
# eg:
# import some file

temp = pd.read_csv('train_sample.csv')

# do something to the file

temp['os'] = temp['os'].astype('str')

In [9]:
# delete when no longer needed

del temp

# collect residual garbage

gc.collect()

140

## Tip 2: Presetting the datatypes

If you import data into CSV, python will do it's best to guess the datatypes, but it will tend to error on the side of allocating more space than necessary. So if you know in advance that your numbers are integers, and don't get bigger than certain values, set the datatypes at minimum requirements before importing.

In [10]:
dtypes = {
        'ip'            : 'uint32',
        'app'           : 'uint16',
        'device'        : 'uint16',
        'os'            : 'uint16',
        'channel'       : 'uint16',
        'is_attributed' : 'uint8',
        }

train = pd.read_csv('train_sample.csv', dtype = dtypes)


# Check dtypes

train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 8 columns):
 #   Column           Non-Null Count   Dtype 
---  ------           --------------   ----- 
 0   ip               100000 non-null  uint32
 1   app              100000 non-null  uint16
 2   device           100000 non-null  uint16
 3   os               100000 non-null  uint16
 4   channel          100000 non-null  uint16
 5   click_time       100000 non-null  object
 6   attributed_time  227 non-null     object
 7   is_attributed    100000 non-null  uint8 
dtypes: object(2), uint16(4), uint32(1), uint8(1)
memory usage: 2.8+ MB


## Tip 3: Importing selected rows of a csv files

**a) Select number of rows to import**

Instead of the default `pd.read_csv('filename')` you can use parameter `nrows` to specify number of rows to import. For example: `train = pd.read_csv('train.csv', nrows = 10000)` will only read the first 1000 rows (including the heading)

In [11]:
train = pd.read_csv('train.csv', nrows = 10000, dtype= dtypes)

In [12]:
train.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
0,83230,3,1,13,379,2017-11-06 14:32:21,,0
1,17357,3,1,19,379,2017-11-06 14:33:34,,0
2,35810,3,1,13,379,2017-11-06 14:34:12,,0
3,45745,14,1,13,478,2017-11-06 14:34:52,,0
4,161007,3,1,13,379,2017-11-06 14:35:08,,0


In [13]:
train.shape

(10000, 8)

**b) Simple row skip (with or without headings)**

You can also specify number of rows to skip (skiprows), if you, for example want 1 million rows after the first 5 million: `train = pd.read_csv('train.csv', skiprows = 5000000, nrows = 1000000)`. This however will ignore the first line with headers. Instead you can pass in range of rows to skip, that will not include the first row (Indexed [0]).

In [14]:
# plain skipping looses heading info. It's OK for files that don't
# headings
# or dataframe you'll be linking together, or where you make your own
# custom headings...

train = pd.read_csv('train.csv', skiprows = 5000000, nrows= 1000000, 
                    header = None, dtype = dtypes)

train.head()

Unnamed: 0,0,1,2,3,4,5,6,7
0,37363,12,1,25,245,2017-11-06 20:27:57,,0
1,89913,2,1,9,469,2017-11-06 20:27:57,,0
2,114235,26,1,13,121,2017-11-06 20:27:57,,0
3,18839,3,1,19,442,2017-11-06 20:27:57,,0
4,105580,15,1,13,245,2017-11-06 20:27:57,,0


In [15]:
train.shape

(1000000, 8)

In [16]:
# but if you want to import the heading form the original file
# skip first 5mil rows, but use the first row for heading

train = pd.read_csv('train.csv', skiprows = range(1,5000000), 
                   nrows = 1000000, dtype = dtypes)

train.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
0,37363,12,1,25,245,2017-11-06 20:27:57,,0
1,89913,2,1,9,469,2017-11-06 20:27:57,,0
2,114235,26,1,13,121,2017-11-06 20:27:57,,0
3,18839,3,1,19,442,2017-11-06 20:27:57,,0
4,105580,15,1,13,245,2017-11-06 20:27:57,,0


In [17]:
train.shape

(1000000, 8)

**c) Picking which rows to skip (Make a list of what you Don't want)

This is how you can do your own random sampling

Since 'skiprows' can take in a list of rows you want to skip, you can make a list of random rows you want to input i.e you can sample your data anyway you like!

Recall how many rows the train set in TalkingData has:

In [None]:
import subprocess

#from https://stackoverflow.com/questions/845058/how-to-get-line-count-cheaply-in-python , Olafur's answer

def file_len(fname):
    p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE, 
                                              stderr=subprocess.PIPE)
    result, err = p.communicate()
    if p.returncode != 0:
        raise IOError(err)
    return int(result.strip().split()[0])

lines = file_len('train.csv')
print('Number of lines in "train.csv" is:', lines)

Let's say you want to pull a random sample of 1 million lines out of the total dataset. That means that you want a list of lines - 1 - 1000000 random numbers ranging from 1 to 184903891.

Note: generating such long list also takes a lot of space and some time. Be patient and make sure to use del and gc.collect() when done!

In [None]:
#generate list of lines to skip
skiplines = np.random.choice(np.arange(1, lines), size=lines-1-1000000, replace=False)

#sort the list
skiplines=np.sort(skiplines)

In [None]:
#check our list
print('lines to skip:', len(skiplines))
print('remaining lines in sample:', lines-len(skiplines), '(remember that it includes the heading!)')

###################SANITY CHECK###################
#find lines that weren't skipped by checking difference between each consecutive line
#how many out of first 100000 will be imported into the csv?
diff = skiplines[1:100000]-skiplines[2:100001]
remain = sum(diff!=-1)
print('Ratio of lines from first 100000 lines:',  '{0:.5f}'.format(remain/100000) ) 
print('Ratio imported from all lines:', '{0:.5f}'.format((lines-len(skiplines))/lines) )

Now let's import the randomly selected 1 million rows

In [None]:
train = pd.read_csv('../input/train.csv', 
                    skiprows=skiplines, 
                    dtype=dtypes)

train.head()

Delete the now useless list and any other garbaged generated along the way 

In [20]:


gc.collect()

221

## Tip 4: Importing in batches and processing each individually


We know that the proportion of clicks that was attributed is very low. So let's say we want to look at all of the same time. We don't know what rows they are, and we can't load the whole data and filter. But we can load in chuncks, extract from each chunk what we need and get rid of everything else.

The idea is simple. You specify size of chunk (number of lines) you want pandas to import at a time. Then you do some kind of processing on it. Then pandas imports the next chunk, until there are no more lines left.

So below I import one million rows, extract only rows that have "is_attributed" == 1 (i.e app was downloaded) and then merge these results into common dataframe for further inspection.

In [22]:
# set up an empty dataframe

df_converted = pd.DataFrame()

# we are going to work with chunks of size 1 million rows
chunksize = 10 ** 6

# in each chunk, filter for values that have 'is_attributed' == 1, 
# and merge these values into one dataframe

for chunk in pd.read_csv('train.csv', chunksize = chunksize, dtype = dtypes):
    filtered = (chunk[(np.where(chunk['is_attributed'] == 1, True, False))])
    df_converted = pd.concat([df_converted, filtered], ignore_index = True, )


Let's see what we've got:

In [23]:
df_converted.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 456846 entries, 0 to 456845
Data columns (total 8 columns):
 #   Column           Non-Null Count   Dtype 
---  ------           --------------   ----- 
 0   ip               456846 non-null  uint32
 1   app              456846 non-null  uint16
 2   device           456846 non-null  uint16
 3   os               456846 non-null  uint16
 4   channel          456846 non-null  uint16
 5   click_time       456846 non-null  object
 6   attributed_time  456846 non-null  object
 7   is_attributed    456846 non-null  uint8 
dtypes: object(2), uint16(4), uint32(1), uint8(1)
memory usage: 12.6+ MB


In [24]:
df_converted.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
0,204158,35,1,13,21,2017-11-06 15:41:07,2017-11-07 08:17:19,1
1,29692,9,1,22,215,2017-11-06 16:00:02,2017-11-07 10:05:22,1
2,64516,35,1,13,21,2017-11-06 16:00:02,2017-11-06 23:40:50,1
3,172429,35,1,46,274,2017-11-06 16:00:03,2017-11-07 00:55:29,1
4,199085,35,1,13,274,2017-11-06 16:00:04,2017-11-06 23:04:54,1


Perfect! Now we know that in our entire dataset there are 456846 samples of conversions. We can explore this subset for patterns, anomalies, etc...

Using analogous method you can explore specific ips, apps, devices, etc combinations, devices, etc...

## Tip 5: Importing just selected columns

If you want to analyze just some specific feature, you can import just the selected columns.

For example, lets say we want to analyze clicks by ips. Or conversions by ips

Importing just 2 fields as opposed to full table just may fit in our RAM.

In [25]:
# wanted columns

columns = ['ip','click_time','is_attributed']
dtypes = {
        'ip'            : 'uint32',
        'is_attributed' : 'uint8',
        }

ips_df = pd.read_csv('train.csv', usecols=columns, dtype=dtypes)

Let's see what we've got.

In [26]:
print(ips_df.info())
ips_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 184903890 entries, 0 to 184903889
Data columns (total 3 columns):
 #   Column         Dtype 
---  ------         ----- 
 0   ip             uint32
 1   click_time     object
 2   is_attributed  uint8 
dtypes: object(1), uint32(1), uint8(1)
memory usage: 2.2+ GB
None


Unnamed: 0,ip,click_time,is_attributed
0,83230,2017-11-06 14:32:21,0
1,17357,2017-11-06 14:33:34,0
2,35810,2017-11-06 14:34:12,0
3,45745,2017-11-06 14:34:52,0
4,161007,2017-11-06 14:35:08,0


The dataframe is big (over 2GG, but manageable).

Now let's say you want to generate counts of ips frequencies (maybe to use as a feature in a model).

The regular groupby method will crush the system. So you'd have to be creative about how to do it.

Which is what the next section is about.

## Tip 6: Creative data processing

The kernel cannot handle groupby on the whole dataframe. But it can do it in sections. For example:

In [27]:
# processing part of the table is not a problem

ips_df[0:100][['ip','is_attributed']].groupby('ip', 
                                              as_index = False).count()[:10]

Unnamed: 0,ip,is_attributed
0,1025,1
1,2805,1
2,3425,1
3,3653,1
4,6242,1
5,7755,1
6,16760,1
7,17357,1
8,18787,1
9,23550,1


So you can calculate counts in batches, merge them and sum up to total counts. (Take a bit of time, but works)

In [29]:
size = 100000
all_rows = len(ips_df)
num_parts = all_rows // size


# generate the first batch
ip_counts = ips_df[0:size][['ip','is_attributed']].groupby('ip', as_index = False).count()

#add remaining batches
for p in range(1,num_parts):
    start = p*size
    end = p*size + size
    if end < all_rows:
        group = ips_df[start:end][['ip', 'is_attributed']].groupby('ip', as_index=False).count()
    else:
        group = ips_df[start:][['ip', 'is_attributed']].groupby('ip', as_index=False).count()
    ip_counts = ip_counts.merge(group, on='ip', how='outer')
    ip_counts.columns = ['ip', 'count1','count2']
    ip_counts['counts'] = np.nansum((ip_counts['count1'], ip_counts['count2']), axis = 0)
    ip_counts.drop(columns=['count1', 'count2'], axis = 0, inplace=True)

In [30]:
#see what we've got:
ip_counts.head()

Unnamed: 0,ip,counts
0,31,1039.0
1,36,6743.0
2,60,670.0
3,81,519.0
4,92,4457.0


In [31]:
ip_counts.sort_values('counts', ascending=False)[:20]

Unnamed: 0,ip,counts
490,5348,1238701.0
483,5314,1171427.0
6336,73516,770418.0
6331,73487,763823.0
4619,53454,498171.0
9852,114276,427449.0
2282,26995,401484.0
8200,95766,378684.0
1486,17149,310985.0
9041,105475,302180.0


In [32]:
np.sum(ip_counts['counts'])

184900000.0

Let's say you wanted proportion of conversions by ip, get the sums of conversions and then devide them by counts...

In [33]:
size=100000
all_rows = len(ips_df)
num_parts = all_rows//size

#generate the first batch
ip_sums = ips_df[0:size][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()

#add remaining batches
for p in range(1,num_parts):
    start = p*size
    end = p*size + size
    if end < all_rows:
        group = ips_df[start:end][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
    else:
        group = ips_df[start:][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
    ip_sums = ip_sums.merge(group, on='ip', how='outer')
    ip_sums.columns = ['ip', 'sum1','sum2']
    ip_sums['conversions_per_ip'] = np.nansum((ip_sums['sum1'], ip_sums['sum2']), axis = 0)
    ip_sums.drop(columns=['sum1', 'sum2'], axis = 0, inplace=True)

In [34]:
ip_sums.head(10)

Unnamed: 0,ip,conversions_per_ip
0,31,3.0
1,36,6.0
2,60,2.0
3,81,0.0
4,92,3.0
5,95,2.0
6,122,4.0
7,126,7.0
8,151,5.0
9,169,1.0


In [35]:
#check proportion (we calculated earlier how many rows of data had conversions)
np.sum(ip_sums['conversions_per_ip'])/184900000

0.002470773391022174

What if we want proportion of conversions to click per ip?



In [36]:
ip_conversions=ip_counts.merge(ip_sums, on='ip', how='left')
ip_conversions.head()

Unnamed: 0,ip,counts,conversions_per_ip
0,31,1039.0,3.0
1,36,6743.0,6.0
2,60,670.0,2.0
3,81,519.0,0.0
4,92,4457.0,3.0


In [37]:
ip_conversions['converted_ratio']=ip_conversions['conversions_per_ip']/ip_conversions['counts']
ip_conversions[:10]

Unnamed: 0,ip,counts,conversions_per_ip,converted_ratio
0,31,1039.0,3.0,0.002887
1,36,6743.0,6.0,0.00089
2,60,670.0,2.0,0.002985
3,81,519.0,0.0,0.0
4,92,4457.0,3.0,0.000673
5,95,2676.0,2.0,0.000747
6,122,4213.0,4.0,0.000949
7,126,3017.0,7.0,0.00232
8,151,2956.0,5.0,0.001691
9,169,1619.0,1.0,0.000618


In [38]:
#some cleanup
del ip_conversions
del ip_sums
del ips_df
del df_converted
del train
gc.collect()

121

## Tip 7: Using Dask


**DASK**

**What is it?**

A python library for parallel computing that can work on a single notebook or large cluster

**What does it do?**

- it parallizes NumPy and Pandas
- makes it possible to work on larger-than-memory datasets
- in case of single machine uses its own task scheduler to get the most out of your machine (kaggle kernels are multicore, so there is definetly room for improvement)
- BASICALLY IT WILL MAKE SOME COMPUTATIONS FIT RAM, AND WILL DO IT FASTER

**Its limitations?**

- it's still relativelty early in development
- it doesn't have all of Panda's options/functions/features. Only the most common ones.
- many operations that require setting the index are still computationally expensive

First you'll need to import the library.


In [39]:
import dask
import dask.dataframe as dd

There are different sections to Dask, but for this case you'll likely just use Dask DataFrames.

Here are some basics from the developers:

`A Dask DataFrame is a large parallel dataframe composed of many smaller Pandas dataframes, split along the index. These pandas dataframes may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask dataframe operation triggers many operations on the constituent Pandas dataframes.`

(https://dask.pydata.org/en/latest/dataframe.html)

For convenience and Dask.dataframe copies the Pandas API. Thus commands look and feel familiar.

What DaskDataframes can do? -they are very fast on most commonly used set of Pandas API

below is taken directly from: https://dask.pydata.org/en/latest/dataframe.html

**Trivially parallelizable operations (fast):**

- Elementwise operations: df.x + df.y, df * df
- Row-wise selections: df[df.x > 0]
- Loc: df.loc[4.0:10.5]
- Common aggregations: df.x.max(), df.max()
- Is in: df[df.x.isin([1, 2, 3])]
- Datetime/string accessors: df.timestamp.month


**Cleverly parallelizable operations(fast):**

- groupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()
- groupby-apply on index: df.groupby(['idx', 'x']).apply(myfunc), where idx is the index level name
- value_counts: df.x.value_counts()
- Drop duplicates: df.x.drop_duplicates()
- Join on index: dd.merge(df1, df2, left_index=True, right_index=True) or dd.merge(df1, df2, on=['idx', 'x']) where idx is the index name for both df1 and df2
- Join with Pandas DataFrames: dd.merge(df1, df2, on='id')
- Elementwise operations with different partitions / divisions:df1.x + df2.y
- Datetime resampling: df.resample(...)
- Rolling averages: df.rolling(...)
- Pearson Correlations: df[['col1', 'col2']].corr()


**Notes/observations:**

- To actually get results of many of the above functions you have to add .compute() at the end. eg, for value_counts would be: df.x.value_counts().compute(). This hikes up RAM use a lot. I believe it's because .compute() gets the data into pandas format, with all the accompanying overhead. (Please correct me if wrong).

- I've been playing with dask for the past little while here on Kaggle Kernels, and while they can load full data and do some nice filtering, many actual operations do hike up RAM to extreme and even crush the system. For example, after loading 'train' dataframe, just getting len(train) hiked RAM up to 9GB. So be careful... Use a lot of gc.collect() and other techniques for making data smaller. So far I find dask most useful for filtering (selecting rows with specified features).



Now let's see some examples.

First, let's load the big train data:


In [41]:
# Loading in the train data
dtypes = {'ip':'uint32',
          'app': 'uint16',
          'device': 'uint16',
          'os': 'uint16',
          'channel': 'uint16',
          'is_attributed': 'uint8'}

train = dd.read_csv("train.csv", dtype=dtypes, parse_dates=['click_time', 'attributed_time'])
train.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
0,83230,3,1,13,379,2017-11-06 14:32:21,NaT,0
1,17357,3,1,19,379,2017-11-06 14:33:34,NaT,0
2,35810,3,1,13,379,2017-11-06 14:34:12,NaT,0
3,45745,14,1,13,478,2017-11-06 14:34:52,NaT,0
4,161007,3,1,13,379,2017-11-06 14:35:08,NaT,0


In [42]:
train.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 8 entries, ip to is_attributed
dtypes: datetime64[ns](2), uint16(4), uint32(1), uint8(1)

In [43]:
len(train)

184903890

In [44]:
train.columns

Index(['ip', 'app', 'device', 'os', 'channel', 'click_time', 'attributed_time',
       'is_attributed'],
      dtype='object')

Let's see how it works for selecting data subsets:

In [45]:
#select only rows 'is_attributed'==1
train[train['is_attributed']==1].head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
103,204158,35,1,13,21,2017-11-06 15:41:07,2017-11-07 08:17:19,1
1504,29692,9,1,22,215,2017-11-06 16:00:02,2017-11-07 10:05:22,1
1798,64516,35,1,13,21,2017-11-06 16:00:02,2017-11-06 23:40:50,1
2102,172429,35,1,46,274,2017-11-06 16:00:03,2017-11-07 00:55:29,1
3056,199085,35,1,13,274,2017-11-06 16:00:04,2017-11-06 23:04:54,1


In [46]:
#select only data attributed after 2017-11-06 
train[train['attributed_time']>='2017-11-07 00:00:00'].head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
103,204158,35,1,13,21,2017-11-06 15:41:07,2017-11-07 08:17:19,1
1504,29692,9,1,22,215,2017-11-06 16:00:02,2017-11-07 10:05:22,1
2102,172429,35,1,46,274,2017-11-06 16:00:03,2017-11-07 00:55:29,1
3220,82917,19,0,24,210,2017-11-06 16:00:04,2017-11-07 00:21:50,1
5438,24200,19,88,24,213,2017-11-06 16:00:07,2017-11-07 04:18:51,1


Now we'll do some heavier lifting.

Let's get counts by ip

In [47]:
ip_counts = train.ip.value_counts().compute()
ip_counts[:20]

5348      1238734
5314      1171448
73516      770451
73487      763854
53454      498186
114276     427453
26995      401495
95766      378693
17149      310996
105475     302192
100275     276799
43793      261970
105560     260049
86767      257649
111025     247187
137052     217614
201182     212448
5178       211556
49602      200053
5147       197994
Name: ip, dtype: int64

In [48]:
#clean up to free up space
#for future work, you can export data you generated to CSVs so you don't have to make it
#all over again
del ip_counts
gc.collect()

47

Let's now try to get mean conversion by channel:

In [49]:
channel_means = train[['channel','is_attributed']].groupby('channel').mean().compute()
channel_means[:20]

Unnamed: 0_level_0,is_attributed
channel,Unnamed: 1_level_1
0,0.077345
3,0.000413
13,0.000121
15,0.000173
17,0.000188
18,0.000525
19,0.000224
21,0.140053
22,0.001292
24,0.00017


Unfortunately as_index=False does not appear to be implemented in Dask yet... You'd have to do manual manipulation to get the channel into a column... For example like this:

In [50]:
channel_means=channel_means.reset_index()
channel_means[:20]

Unnamed: 0,channel,is_attributed
0,0,0.077345
1,3,0.000413
2,13,0.000121
3,15,0.000173
4,17,0.000188
5,18,0.000525
6,19,0.000224
7,21,0.140053
8,22,0.001292
9,24,0.00017
