In [1]:
# Parallel Computing

import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm

# Data Ingestion 
import pandas as pd
# Text Processing 
import re 
from nltk.corpus import stopwords
import string

import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\canut\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [2]:
n_workers =  mp.cpu_count()*2

print(f"{n_workers} workers are available")

8 workers are available


In [3]:
%%time
file_name= r"C:/Users/canut/Desktop/SQL/DATA/US_Accidents.csv" # ubuuntu

# file_name= r"/home/dani/Desktop/US_Accidents.csv" # ubuuntu
df = pd.read_csv(file_name)
df =df.sample(frac=0.5, replace=True, random_state=1)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

Shape:(1422671, 47)

Column Names:
Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
       'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
       'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
       'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
       'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
       'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
       'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
       'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
       'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
       'Astronomical_Twilight'],
      dtype='object')

CPU times: total: 33.8 s
Wall time: 36.1 s


In [6]:
def clean_text(text): 
  # Remove stop words
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word 
  not in stops])
  # Remove Special Characters
  text = text.translate(str.maketrans('', '', string.punctuation))
  return re.sub(' +',' ', text)

### Pandas operation

In [8]:
%%time
tqdm.pandas()
df['Description_P'] = df['Description'].progress_apply(clean_text)

df.head()

  0%|          | 0/1422671 [00:00<?, ?it/s]

### JOBlib

In [4]:
def text_parallel_clean(array):
  return Parallel(
      n_jobs=n_workers, backend="multiprocessing")(
          delayed(clean_text)(text) for text in tqdm(array))

In [7]:
%%time
df['Description'] = text_parallel_clean(df['Description'])

  0%|          | 0/1422671 [00:00<?, ?it/s]

### Joblib with batches

In [10]:
def proc_batch(batch):
  return [clean_text(text) for text in batch]

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / (2*n_workers))
  return [
      array[ix:ix + batch_size] for ix in tqdm(range(0, file_len, batch_size))]

batches = batch_file(df['Description_P'],n_workers)


  0%|          | 0/16 [00:00<?, ?it/s]

  array[ix:ix + batch_size] for ix in tqdm(range(0, file_len, batch_size))]


In [11]:
%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(proc_batch) (batch) for batch in tqdm(batches))


df['Description_P'] = [j for i in batch_output for j in i]

  0%|          | 0/16 [00:00<?, ?it/s]

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



CPU times: user 2.63 s, sys: 958 ms, total: 3.59 s
Wall time: 2min 35s


### Concurrent

In [20]:
%%time
from tqdm.contrib.concurrent import process_map
# batch = round(len(df)/(n_workers*3))
batch = 500


df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

  0%|          | 0/1422671 [00:00<?, ?it/s]

CPU times: user 7.28 s, sys: 1.41 s, total: 8.69 s
Wall time: 2min 24s
