In [12]:
import numpy as np
import psutil
import ray
from scipy import signal
from time import time
from multiprocessing import Pool
import pandas as pd

In [3]:
num_cpus = psutil.cpu_count(logical=False)
print("Number of physical CPU cores:{}".format(num_cpus))

Number of physical CPU cores:2


In [6]:
ray.init(num_cpus=num_cpus)
@ray.remote
def f(image,random_filter):
    return signal.convolve2d(image,random_filter)[::5, ::5]

def image_filter_experiment():
    filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]
    start = time()
    for _ in range(10):
        image = np.zeros((3000, 3000))
        image_id = ray.put(image)
        ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)])
    run_time = time() - start
    print("Ray uses time: {:.5f}s".format(run_time))

2019-06-24 01:55:15,763	INFO node.py:498 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-06-24_01-55-15_761187_3383/logs.
2019-06-24 01:55:15,877	INFO services.py:409 -- Waiting for redis server at 127.0.0.1:48306 to respond...
2019-06-24 01:55:16,000	INFO services.py:409 -- Waiting for redis server at 127.0.0.1:50373 to respond...
2019-06-24 01:55:16,007	INFO services.py:806 -- Starting Redis shard with 1.72 GB max memory.
2019-06-24 01:55:16,032	INFO node.py:512 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-06-24_01-55-15_761187_3383/logs.
2019-06-24 01:55:16,039	INFO services.py:1442 -- Starting the Plasma object store with 2.58 GB memory using /tmp.


In [9]:
image_filter_experiment()

Ray uses time: 10.66825s


In [14]:
def add_features(df):
    df['question_text'] = df['question_text'].apply(lambda x:str(x))
    df["lower_question_text"] = df["question_text"].apply(lambda x: x.lower())
    df['total_length'] = df['question_text'].apply(len)
    df['capitals'] = df['question_text'].apply(lambda comment: sum(1 for c in comment if c.isupper()))
    df['caps_vs_length'] = df.apply(lambda row: float(row['capitals'])/float(row['total_length']),
                                axis=1)
    df['num_words'] = df.question_text.str.count('\S+')
    df['num_unique_words'] = df['question_text'].apply(lambda comment: len(set(w for w in comment.split())))
    df['words_vs_unique'] = df['num_unique_words'] / df['num_words'] 
    df['num_exclamation_marks'] = df['question_text'].apply(lambda comment: comment.count('!'))
    df['num_question_marks'] = df['question_text'].apply(lambda comment: comment.count('?'))
    df['num_punctuation'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '.,;:'))
    df['num_symbols'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '*&$%'))
    df['num_smilies'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-)', ':)', ';-)', ';)')))
    df['num_sad'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-<', ':()', ';-()', ';(')))
    df["mean_word_len"] = df["question_text"].apply(lambda x: np.mean([len(w) for w in str(x).split()]))
    return df

def parallelize_dataframe(df, func, n_cores=4):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [15]:
# data can be found here: https://www.kaggle.com/c/quora-insincere-questions-classification/data
train_df = pd.read_csv("train.csv")

In [17]:
train_df.head()

Unnamed: 0,qid,question_text,target
0,00002165364db923c7e6,How did Quebec nationalists see their province...,0
1,000032939017120e6e44,"Do you have an adopted dog, how would you enco...",0
2,0000412ca6e4628ce2cf,Why does velocity affect time? Does velocity a...,0
3,000042bf85aa498cd78e,How did Otto von Guericke used the Magdeburg h...,0
4,0000455dfa3e01eae3af,Can I convert montra helicon D to a mountain b...,0


In [21]:
%%timeit 
train = parallelize_dataframe(train_df, add_features) 

44.4 s ± 597 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [22]:
%%timeit
train = add_features(train_df)

1min 25s ± 407 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
