# Importing libraries and loading training data

In [3]:
#mounting google drive
from google.colab import drive
drive.mount('/gdrive')

import numpy as np
import pandas as pd
import dask.dataframe as dd
import pickle
import timeit as time
import gc

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /gdrive


In [5]:
#connecting to kaggle API and loading datasets
from google.colab import files
files.upload()
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 /root/.kaggle/kaggle.json

!kaggle competitions download -c talkingdata-adtracking-fraud-detection
!unzip train.csv.zip

Saving kaggle.json to kaggle.json
Downloading train.csv.zip to /content
 99% 1.20G/1.21G [00:11<00:00, 69.4MB/s]
100% 1.21G/1.21G [00:11<00:00, 113MB/s] 
Downloading train_sample.csv.zip to /content
  0% 0.00/1.08M [00:00<?, ?B/s]
100% 1.08M/1.08M [00:00<00:00, 108MB/s]
Downloading test.csv.zip to /content
 93% 154M/165M [00:03<00:00, 47.7MB/s]
100% 165M/165M [00:03<00:00, 53.5MB/s]
Downloading sample_submission.csv.zip to /content
 90% 36.0M/39.8M [00:00<00:00, 74.5MB/s]
100% 39.8M/39.8M [00:00<00:00, 133MB/s] 
Downloading test_supplement.csv.zip to /content
 95% 479M/504M [00:08<00:00, 77.8MB/s]
100% 504M/504M [00:08<00:00, 64.9MB/s]
Archive:  train.csv.zip
  inflating: mnt/ssd/kaggle-talkingdata2/competition_files/train.csv  


## Loading dask dataframe

Instead of using pandas to read in a dataframe like pd.read_csv(), import dask.dataframe as dd and load dd.read_csv. Let's see how fast it runs: 

In [8]:
dtypes = {
        'ip'            : 'uint32',
        'app'           : 'uint16',
        'device'        : 'uint16',
        'os'            : 'uint16',
        'channel'       : 'uint16',
        'click_id'      : 'uint32'
        }

start = timeit.default_timer()
train = dd.read_csv('mnt/ssd/kaggle-talkingdata2/competition_files/train.csv', dtype=dtypes, usecols=(['ip', 'app', 'device', 'os', 'channel', 'click_time']))
stop = timeit.default_timer()
print('Time: ', stop - start) 

train['hour'] = dd.to_datetime(train['click_time']).dt.hour.astype('uint8')
train['day'] = dd.to_datetime(train['click_time']).dt.day.astype('uint8')

Time:  0.09457860999998502


Less than one second! Super fast! So this doesn't mean that you can treat it exactly like a pandas df. It's not actually "in'" your session. From what I understand, it's split up into a bunch of smaller dataframes (split by row) and spreads them out across the cores on your computer. So when you do manipulate the data, it can run that command across all the smaller dataframes at once. 

In [0]:
train.npartitions

118

So essentially dask has split our huge  182 million row dataframe into 118 partitions that are all about 1.5 million rows. So let's take a look at this:

In [0]:
#pull in first partition
start = timeit.default_timer()

part = train.partitions[0].compute()

stop = timeit.default_timer()
print('Time: ', stop - start) 

#taking a look at partition
display(print('Number of rows in part:', len(part)))
display(part.head(5))
display(type(part))

gc.collect()

Time:  1.575964417999785
Number of rows in part: 1568734


None

Unnamed: 0,ip,app,device,os,channel,click_time,hour,day
0,83230,3,1,13,379,2017-11-06 14:32:21,14,6
1,17357,3,1,19,379,2017-11-06 14:33:34,14,6
2,35810,3,1,13,379,2017-11-06 14:34:12,14,6
3,45745,14,1,13,478,2017-11-06 14:34:52,14,6
4,161007,3,1,13,379,2017-11-06 14:35:08,14,6


pandas.core.frame.DataFrame

And now this partition is in your session like a normal pd df. And it only took 1.5 seconds. So when you want to do something concretel such as grouping and calculating a summary statistic, you have to use the "compute()" function as above. There's another thing to notice though:

In [0]:
part['day'].value_counts()

6    1568734
Name: day, dtype: int64

All of these clicks are from day 6. Since the partitions are done row wise, this has implications since the training data is ordered by click time. Almost all clicks in one partition will likely be from the same day so you won't get a good sample of data just from pulling in a few partitions.   

Let's take a look at trying to find the length of a dask df. 

In [0]:
start = timeit.default_timer()
len(train)
stop = timeit.default_timer()
print('Time: ', stop - start) 

Time:  167.05913564599905


It takes freaking FOREVER. Because I think what happens is dask has to pull everything together from all across the cores to get this answer. It took me awhile to understand why these seemingly simple commands took so freaking long. So if you want to know the dimensions of a dask df i've found it's easier to understand how mamy partitions there are and how many rows are in one partition. Last thing about partitions is that you can change how any there are:

I read online that it's good to parition your df so each partition is 100MB. Her'e's some code you can use to figure out how many rows 100MB is: df.memory_usage().sum().compute()

You can repartion dask like this:

In [0]:
train=train.repartition(npartitions=50)
part_2=train.partitions[0].compute()
display(len(part_2))
del part_2
gc.collect()

3135920

146

It makes sense that our partitions are much larger since we went from 118 dataframes to 50 dataframes. So each partition is about 3 million rows now. 

# Feature engineering

I've been using Dask to do feature engineering on the entire dataset. So below I'm grouping the entire training dataset by 'ip' and 'app' and getting the number of clicks for each ip-app combination. 

In [19]:
#trying to get an idea of how long it takes to get value counts by grouping

start = timeit.default_timer()
grouped=train.groupby(['ip', 'app']).size().compute()
grouped=grouped.reset_index()
stop = timeit.default_timer()

print('Time: ', stop - start)  

display(grouped.head(5))

Time:  208.61606336699992


Unnamed: 0,ip,app,0
0,9,3,802
1,9,9,403
2,9,12,538
3,9,18,339
4,10,12,174


So it's not fast but I wouldn't be able to create feature aggregations using the entire dataset if I tried to load the train dataset as I normally would. With the entire df in memory, as soon as I would try to manipulate it at all, it will crash my session. So dask makes it possible to use the entire train dataset! 

**Note: do not try to do a bunch of computations at once** Maybe there is a better way to do this, but basically when you run a command on a dask object, it doesn't immediately run it if you don't use the compute function. It will build up in a computation graph. And then when you finally run .compute() your session will crash because it's running all of the commands that had built up!! 

I usually will run one thing at a time. And pickle EVERYTHING as often as possible so i don't lose so much when my session crashes. 