# Parallel Processing with Dask 

## Dask Bags for Unstructured Data

In [None]:
# Import the Dask bag subpackage as db
import dask.bag as db

# Convert the list to a Dask bag
review_bag = db.from_sequence(reviews_list, npartitions=3)

# Print 1 element of the bag
print(review_bag.take(1))

In [None]:
# Load in all the .txt files inside data/tripadvisor_hotel_reviews
review_bag = db.read_text("data/tripadvisor_hotel_reviews/*.txt")

# Count the number of reviews in the bag
review_count = review_bag.count()

# Compute and print the answer
print(review_count.compute())

In [None]:
# Convert all of the reviews to lower case
lowercase_reviews = review_bag.str.lower()

# Count the number of times 'excellent' appears in each review
excellent_counts = lowercase_reviews.str.count("excellent")

# Print the first 10 counts of 'excellent'
print(excellent_counts.take(10))

In [None]:
# Import of the json package
import json

# Read all of the JSON files inside data/politicians
text_bag = db.read_text("data/politicians/*.json")

# Convert the JSON strings into dictionaries
dict_bag = text_bag.map(json.loads)

# Show an example dictionary
print(dict_bag.take(1))


In [None]:
# Print the number of elements in dict_bag
print(dict_bag.count().compute())

# Filter out records using the has_birth_date() function
filtered_bag = dict_bag.filter(has_birth_date)

# Print the number of elements in filtered_bag
print(filtered_bag.count().compute())

In [None]:
# Select the 'birth_date' from each dictionary in the bag
birth_date_bag = filtered_bag.pluck("birth_date")

# Extract the year as an integer from the birth_date strings
birth_year_bag = birth_date_bag.map(lambda x: int(x[:4]))

# Calculate the min, max and mean birth years
min_year = birth_year_bag.min()
max_year = birth_year_bag.max()
mean_year = birth_year_bag.mean()

# Compute the results efficiently and print them
print(dask.compute(min_year, max_year, mean_year))

### Converting unstructured data to DataFrame 

In [None]:
def extract_url(x):
    # Extract the url and assign it to the key 'url'
    x['url'] = x['links'][0]['url']
    return x
  
# Run the function on all elements in the bag.
dict_bag = dict_bag.map(extract_url)

print(dict_bag.take(1))

In [None]:
def select_keys(dictionary, keys_to_keep):
    new_dict = {}
    # Loop through kept keys and add them to new dictionary
    for k in keys_to_keep:
        new_dict[k] = dictionary[k]
    return new_dict

# Use the select_keys to reduce to the 4 required keys
filtered_bag = dict_bag.map(select_keys, keys_to_keep=['gender','name', 'birth_date', 'url'])

# Convert the restructured bag to a DataFrame
df = filtered_bag.to_dataframe()

# Print the first few rows of the DataFrame
print(df.head())

### Using any data in Dask bags

In [None]:
# Import scipy module for .wav files
import numpy as np
from scipy.io import wavfile
import dask 
import dask.bag as db

def load_wav(filename):
    # Load in the audio data
    sampling_freq, audio = wavfile.read(filename)
    
    # Add the filename, audio data, and sampling frequency to the dictionary
    data_dict = {
        'filename': filename,
        'audio': audio, 
        'sample_frequency': sampling_freq
    }
    return data_dict

def not_silent(data_dict):
    # Check if the audio data is silent
    return np.mean(np.abs(data_dict['audio'])) > 100

In [None]:
# Convert the list of filenames into a Dask bag
filename_bag = db.from_sequence(wavfiles)

# Apply the load_wav() function to each element of the bag
loaded_audio_bag = filename_bag.map(load_wav)

Alternative version

In [None]:
delayed_loaded_audio = []

for wavfile in wavfiles:
    # Append the delayed loaded audio to the list
    delayed_loaded_audio.append(dask.delayed(load_wav)(wavfile))

In [None]:
# Convert the list to a Dask bag
loaded_audio_bag = db.from_delayed(delayed_loaded_audio)

# Filter out blank audio files
filtered_audio_bag = loaded_audio_bag.filter(not_silent)

# Apply the peak_frequency function to all audio files
audio_and_freq_bag = filtered_audio_bag.map(peak_frequency)

# Use the delete_dictionary_entry function to drop the audio
final_bag = audio_and_freq_bag.map(delete_dictionary_entry, key_to_drop='audio')

# Convert to a DataFrame and run the computation
df = final_bag.to_dataframe().compute()
print(df)

Nice work! An important part of this calculation was removing the audio data after we had performed our calcuations on it. The audio data is much larger than the information we actually want to extract from it, so it is important to drop it before we run the compute method. You might notice that the notes and frequencies here are the 6 standard guitar strings.

## Dask ML and final pieces

### Using processes and threads

In [None]:
# Import Client and LocalCluster
from dask.distributed import Client, LocalCluster

# Create a thread-based local cluster
cluster = LocalCluster(
	processes=False,
    n_workers=4,
    threads_per_worker=1,
)

# Create a client
client = Client(cluster)

In [None]:
from dask.distributed import Client

# Create a client without creating cluster first
client = Client(
	processes=False, 
    n_workers=4,
    threads_per_worker=1
)

### Training ML models on big datasets

The input variables are available as dask_X and contain a few numeric columns, such as the song's tempo and danceability. The target values are available as dask_y and are the popularity score of each song.

In [None]:
# Import the SGDRegressor and the Incremental wrapper
from sklearn.linear_model import SGDRegressor
from dask_ml.wrappers import Incremental

# Create a SGDRegressor model
model = SGDRegressor()

# Wrap the model so that it works with Dask
dask_model = Incremental(model, scoring = "neg_mean_squared_error")

# Fit the wrapped model
dask_model.fit(dask_X, dask_y)

Great work! Whenever you run the .fit() method, Dask optimizes the computation by copying the model to the process or thread where the data is, rather than copying the data into the main process which holds the model. It can take a long time to copy information, and the model is much smaller than the dataset, so this is much more efficient.

In [None]:
# Loop over the training data 5 times
for i in range(5):
	dask_model.partial_fit(dask_X,dask_y)

# Use your model to make predictions
y_pred_delayed = dask_model.predict(dask_X)

# Compute the predictions
y_pred_computed = y_pred_delayed.compute()

print(y_pred_computed)

Fantastic! That is a well-fit model. If you were just to use the .fit() method 5 times, your code would run, but you wouldn't get more accurate predictions on each loop repetition. When .fit() is run, the model is reset back to an unfitted state and refit to the data, so you start from scratch each time. Using the .partial_fit() method allows us to pick up fitting from where we left off and refine the previous loop's fitting.

### Preprocessing big datasets

#### Lazily transforming training data

In [None]:
# Import the StandardScaler class
from dask_ml.preprocessing import StandardScaler

X = dask_df[['duration_ms', 'explicit', 'danceability', 'acousticness', 'instrumentalness', 'tempo']]

# Select the target variable
y = dask_df[['popularity']]

# Create a StandardScaler object and fit it on X
scaler = StandardScaler()
scaler.fit(X)

# Transform X
X = scaler.transform(X)
print(X)

Well done! You may have noticed that X is still a Dask DataFrame even after being transformed. However, you have already had to load all the data in X once so that you could fit the scaler.

#### Lazy train-test split 

In [None]:
# Import the train_test_split function
from dask_ml.model_selection import train_test_split

# Rescale the target values
y = y / 100

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True, test_size=0.2)

print(X_train)