# Dask-ml
In this notebook we will play a little with `dask-ml` and see how we can use it together with tensorflow estimators to make our model.

In [1]:
%matplotlib inline
# standard library
import itertools
import sys, os
import re
import glob

from collections import OrderedDict
from urllib.parse import urlparse

# pandas
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask


# numpy, matplotlib, seaborn
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# dask-ml
from dask_ml.preprocessing import StandardScaler, MinMaxScaler
from dask_ml.linear_model import PartialSGDClassifier

# tesnorflow 
import tensorflow as tf

# local imports
sys.path.append(os.path.join(os.getcwd(), "../src"))

# this styling is purely my preference
# less chartjunk
sns.set_context('notebook', font_scale=1.5, rc={'line.linewidth': 2.5})
sns.set(style='ticks', palette='Set2')

In [2]:
dask.set_options(temporary_directory='/home/jovyan/work/partd/')

<dask.context.set_options at 0x7f084ef28828>

In [3]:
# read the features 
data_ddf = dd.read_csv('../data/final/dragnet/dom-full-01.csv')
data_ddf.head()

Unnamed: 0,depth,sibling_pos,no_classes,id_len,class_len,no_children,text_len,descendant1_no_nodes,descendant1_no_children_avg,descendant1_id_len_avg,...,ancestor5_tag_h3,ancestor5_tag_maxamineignore,ancestor5_tag_a,ancestor5_tag_ifcommentsaccepted,ancestor5_tag_noindex,ancestor5_tag_property,ancestor5_tag_iframe,ancestor5_tag_http:,ancestor5_tag_bodyonload,content_label
0,3,21,0,0,0,0,0,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False
1,6,0,0,21,0,2,31,2,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False
2,8,19,0,0,0,0,16,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,True
3,5,18,0,0,0,0,11,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False
4,3,35,0,0,0,0,0,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False


## Data Processing
First we are going to split the data into records and labels

In [4]:
# separate the data
X_da = data_ddf.drop(['url', 'path', 'content_label'], axis=1).values
y_da = data_ddf['content_label'].values

In [5]:
scaler = MinMaxScaler()  # instantiate a scaler
scaler.fit(X_da)  # fit the data

MinMaxScaler(columns=None, copy=True, feature_range=(0, 1))

In [None]:
# check the results
scaled_X_da = scaler.transform(X_da)

## Prediction
Now having scaled the data, we will try to feed it to a logistic regressor and see how it behaves.

In [None]:
class_weight = y_da.mean().compute()
class_weight

In [None]:
%%time
# instantiate the classifier
model = PartialSGDClassifier(fit_intercept=False, class_weight={0: class_weight, 1:1-class_weight}, classes=[0,1], max_iter=500)
model.fit(scaled_X_da, y_da)

In [None]:
columns = tf.contrib.learn.infer_real_valued_columns_from_input(data_ddf.drop(['url', 'path', 'content_label'], axis=1).head())

In [None]:
estimator = tf.contrib.learn.LinearClassifier(
    feature_columns=columns)

In [None]:
sk_estimator = tf.contrib.learn.SKCompat(estimator)
sk_estimator

In [None]:
from sklearn.datasets import load_iris

# save load_iris() sklearn dataset to iris
# if you'd like to check dataset type use: type(load_iris())
# if you'd like to view list of attributes use: dir(load_iris())
iris = load_iris()

# np.c_ is the numpy concatenate function
# which is used to concat iris['data'] and iris['target'] arrays 
# for pandas column argument: concat iris['feature_names'] list
# and string list (in this case one string); you can make this anything you'd like..  
# the original dataset would probably call this ['Species']
data1 = pd.DataFrame(data= np.c_[iris['data'], iris['target']],
                     columns= iris['feature_names'] + ['target'])
data1.columns = ['slen', 'swid', 'plen', 'pwid', 'target']
data1_ddf = dd.from_pandas(data1, npartitions=10)

In [None]:
columns = tf.contrib.learn.infer_real_valued_columns_from_input_fn(df_input_fn)

In [None]:
estimator = tf.contrib.learn.LinearClassifier(
    feature_columns=columns)

In [None]:
sk_estimator = tf.contrib.learn.SKCompat(estimator)
estimator.fit(input_fn=df_input_fn)

In [None]:
sk_estimator.fit(x=data1_ddf.drop('target', axis=1).compute(), y=data1_ddf['target'].compute())

In [None]:
x_ddf = data1_ddf.drop(['target'], axis=1).compute()
y_ddf = data1_ddf['target'].compute()
ddf_input_fn = tf.estimator.inputs.pandas_input_fn(x=x_ddf, y=y_ddf, queue_capacity=1000, shuffle=False)

In [None]:
ddf_input_fn

In [None]:
a = data1_ddf.to_delayed()[0]

In [None]:
a.compute()

In [None]:
a = data_ddf.to_delayed()[0]
a.compute()

In [None]:
data_ddf = data_ddf.drop(['url', 'path', 'content_label'], axis=1)
scaled_ddf = (data_ddf - data_ddf.mean()) / data_ddf.std()

In [None]:
first_delay = scaled_ddf.to_delayed()[0]

In [None]:
first_delay.compute()

In [None]:
scaled_ddf.to_delayed()[1].compute()

In [None]:
tf.contrib.learn.extract_dask_data(data_ddf[['depth', 'sibling_pos']])

In [None]:
df = pd.DataFrame(
  dict(
      a=list("aabbcc"), b=list(range(6))),
  index=pd.date_range(
      start="20100101", periods=6))
ddf = dd.from_pandas(df, npartitions=3)
tf.contrib.learn.extract_dask_data(ddf)

In [None]:
ddf

In [None]:
data1_ddf

In [None]:
tf.contrib.learn.extract_dask_data(ddf)

## Trying out the tensorflow pipeline

In [None]:
sess = tf.InteractiveSession()  # initialize the session

In [None]:
# initialize the filenames queue
filenames = tf.matching_files('../data/final/cleaneval/dom-full-*.csv')
filenames_queue =  tf.train.string_input_producer(filenames, capacity=2)
filenames_queue

In [None]:
reader = tf.TextLineReader(skip_header_lines=1)  # intialize the reader
key, value = reader.read(filenames_queue)

# inspect them
key, value

In [None]:
# initialize the queue runners
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)

# test the output
print(sess.run(key))
print(sess.run(value)) 
coord.request_stop()
coord.join(threads)  # join it

In [None]:
ddf = dd.read_csv('../data/final/cleaneval/dom-full-*.csv')


In [None]:
ddf.dtypes

In [None]:
default_vals = [['' if dtype.type() is None else dtype.type()] for dtype in ddf.dtypes]
default_vals[-10:]

In [None]:
def make_csv_decoder(input_tensor, dtypes, **kwargs):
    """Raturns a csv_decoded tensor from the input_tensor. Requires a sample
    file to determine the types. Also automatically converts booleans"""
    
    # infer the types
    default_values = ['' if dtype.name in ['bool', 'object'] else dtype.type() for dtype in dtypes] #  convert bools and objs to string
    default_values = map(lambda x: 0.0 if np.issubdtype(type(x), np.integer) else x, default_values) # convert ints to float
    default_values = [[x] for x in default_values]  # must be wrapped in a list
    decoded_tensors = tf.decode_csv(input_tensor, default_values, **kwargs)
    
    # replace bools with their conversions
    for i, dtype in zip(range(len(decoded_tensors)), dtypes):
        if dtype.name == 'bool':
            condition = tf.equal(decoded_tensors[i], tf.constant('True'))
            decoded_tensors[i] = tf.where(condition, tf.constant(1.0), tf.constant(0.0)) 
            
    return decoded_tensors

make_csv_decoder(value, dd.read_csv('../data/final/cleaneval/dom-full-*.csv').dtypes)[-10:]

In [None]:
def make_csv_col_tensors(csv_pattern=None, csv_files=None, shuffle=True, num_epochs=10, csv_decoder_kwargs={}):
    """Returns a dict of column names and their corresponding tensors.
    `shuffle` specifies whether the files and lines should be shuffled.
    `num_epochs` specifies how many time to yield every file"""
    # read all the files fitting the specification
    if csv_pattern is not None:
        filenames = tf.matching_files(csv_pattern)
    elif csv_files is not None:
        filenames = tf.train.string_input_producer(filenames) # hardcoded files
    else:
        # rais eif no file pattern specified
        raise ValueError('either csv_files or csv_pattern has to be specified')
        
    filenames_queue =  tf.train.string_input_producer(filenames, shuffle=shuffle, num_epochs=num_epochs)

    reader = tf.TextLineReader(skip_header_lines=1)  # intialize the reader
    key, value = reader.read(filenames_queue)
    
    # read the metadata
    ddf = dd.read_csv(csv_pattern)
    decoded_tensors = make_csv_decoder(value, ddf.dtypes, **csv_decoder_kwargs)

    return {k:v for k,v in zip(ddf.columns, decoded_tensors)}

tens_dict = make_csv_col_tensors('../data/final/cleaneval/dom-full-*.csv')
tens_dict['url']

In [None]:
def pack_features_and_labels(col_dict, feature_cols, label_cols):
    """"Receives a dict of tensors and returns 2 packed ones from the feature 
    subest and lable subset"""
    feature_tensors = [tens for key, tens in col_dict.items() if key in feature_cols]
    label_tensors = [tens for key, tens in col_dict.items() if key in label_cols]
    
    # stack them
    return tf.stack(feature_tensors), tf.stack(label_tensors)

features, labels = pack_features_and_labels(tens_dict, ['depth', 'sibling_pos', 'text_len', 'descendant2_id_len_avg'], ['content_label'])
features, labels

In [None]:
# shuffle it
data_batch, label_batch = tf.train.shuffle_batch([features, labels], batch_size=100, capacity=2000, min_after_dequeue=1000)
data_batch, label_batch

In [None]:
with tf.Session() as sess:
    # variables must be initialized otherwise it fails 
    sess.run(tf.local_variables_initializer())
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # test the output
    print(sess.run(data_batch))
    print(sess.run(label_batch)) 
    
    # again
    print(sess.run(data_batch))
    print(sess.run(label_batch)) 

    # finish
    coord.request_stop()
    coord.join(threads)