In [1]:
from transformers import TextNormalizer, identity

from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier


def fit_naive_bayes(path, saveto=None, cv=12):

    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', MultinomialNB())
    ])

    if saveto is None:
        saveto = "naive_bayes_{}.pkl".format(time.time())

    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "naive bayes training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))


def fit_logistic_regression(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', LogisticRegression())
    ])

    if saveto is None:
        saveto = "logistic_regression_{}.pkl".format(time.time())

    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "logistic regression training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))


def fit_multilayer_perceptron(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', MLPClassifier(hidden_layer_sizes=(10,10), early_stopping=True))
    ])

    if saveto is None:
        saveto = "multilayer_perceptron_{}.pkl".format(time.time())

    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "multilayer perceptron training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))

In [3]:
from reader import PickledCorpusReader

import joblib
from sklearn.model_selection import cross_val_score


@timeit
def train_model(path, model, saveto=None, cv=12):
    # Load the corpus data and labels for classification
    corpus = PickledCorpusReader(path)
    X = documents(corpus)
    y = labels(corpus)

    # Compute cross validation scores
    scores = cross_val_score(model, X, y, cv=cv)

    # Fit the model on entire dataset
    model.fit(X, y)

    # Write to disk if specified
    if saveto:
        joblib.dump(model, saveto)
    # Return scores as well as training time via decorator
    return scores

NameError: name 'timeit' is not defined

In [4]:
def documents(corpus):
    return [
        list(corpus.docs(fileids=fileid))
        for fileid in corpus.fileids()
    ]


def labels(corpus):
    return [
        corpus.categories(fileids=fileid)[0]
        for fileid in corpus.fileids()
    ]

In [5]:
import time
from functools import wraps

def timeit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        return result, time.time() - start
    return wrapper

In [6]:
import logging

# Logging configuration
logging.basicConfig(
    level=logging.INFO,
    format="%(processName)-10s %(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

In [7]:
def run_parallel(path):
    tasks = [
        fit_naive_bayes, fit_logistic_regression, fit_multilayer_perceptron,
    ]

    logger.info("beginning parallel tasks")
    start = time.time()

    procs = []
    for task in tasks:
        proc = mp.Process(name=task.__name__, target=task, args=(path,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()

    delta = time.time() - start
    logger.info("total parallel fit time: {:0.2f} seconds".format(delta))


if __name__ == '__main__':
    run_parallel("corpus/")

MainProcess 2020-08-14 05:09:00 beginning parallel tasks


NameError: name 'mp' is not defined

In [8]:
class ParallelPreprocessor(Preprocessor):

    def on_result(self, result):
        self.results.append(result)

    def transform(self, tasks=None):
        [...]

        # Reset the results
        self.results = []

        # Create a multiprocessing pool
        tasks = tasks or mp.cpu_count()
        pool  = mp.Pool(processes=tasks)

        # Enqueue tasks on the multiprocessing pool and join
        for fileid in self.fileids():
            pool.apply_async(
                self.process, (fileid,), callback=self.on_result
            )

        # Close the pool and join
        pool.close()
        pool.join()

        return self.results

NameError: name 'Preprocessor' is not defined

In [9]:
from pyspark import SparkConf, SparkContext

APP_NAME = "My Spark Application"


def main(sc):
    # Define RDDs and apply operations and actions to them.


if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)

    # Execute Main functionality
    main(sc)

IndentationError: expected an indented block (<ipython-input-9-f7af06e2aa9c>, line 10)

In [10]:
corpus = sc.wholeTextFiles("hobbies/*/*.txt")
print(corpus.take(1))

NameError: name 'sc' is not defined

In [11]:
import json

corpus = sc.wholeTextFiles("corpus/*.jsonl")
corpus = corpus.flatMap(
    lambda d: [
        json.loads(line)
        for line in d[1].split("\n")
        if line
    ]
)

NameError: name 'sc' is not defined

In [12]:
import os
from operator import itemgetter

def parse_label(path):
    # Returns the name of the directory containing the file
    return os.path.basename(os.path.dirname(path))


data = sc.wholeTextFiles("hobbies/*/*.txt")
labels = data.map(itemgetter(0)).map(parse_label)

NameError: name 'sc' is not defined

In [13]:
from operator import add

label_count = labels.map(lambda l: (l, 1)).reduceByKey(add)
for label, count in label_count.collect():
    print("{}: {}".format(label, count))

AttributeError: 'function' object has no attribute 'map'

In [14]:
# Load data from disk
corpus = sc.wholeTextFiles("hobbies/*/*.txt")

# Parse the label from the text path
corpus = corpus.map(lambda d: (parse_label(d[0]), d[1]))

# Create the dataframe with two columns
df = spark.createDataFrame(corpus, ["category", "text"])

NameError: name 'sc' is not defined

In [15]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

APP_NAME = "My Spark Text Analysis"


def main(sc, spark):
    # Define DataFrames and apply ML estimators and transformers


if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)

    # Build SparkSQL Session
    spark = SparkSession(sc)

    # Execute Main functionality
    main(sc, spark)

IndentationError: expected an indented block (<ipython-input-15-3c37979073f2>, line 11)

In [16]:
from pyspark.ml.feature import RegexTokenizer

# Create the RegexTokenizer
tokens = RegexTokenizer(
    inputCol="text", outputCol="tokens",
    pattern="\\w+", gaps=False, toLowercase=True)

# Transform the corpus
corpus = tokens.transform(corpus)

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

In [17]:
def make_vectorizer(stopwords=True, tfidf=True, n_features=5000):
    # Creates a vectorization pipeline that starts with tokenization
    stages = [
        Tokenizer(inputCol="text", outputCol="tokens"),
    ]

    # Append stopwords to the pipeline if requested
    if stopwords:
        stages.append(
            StopWordsRemover(
                caseSensitive=False, outputCol="filtered_tokens",
                inputCol=stages[-1].getOutputCol(),
            ),
        )

    # Create the Hashing term frequency vectorizer
    stages.append(
        HashingTF(
            numFeatures=n_features,
            inputCol=stages[-1].getOutputCol(),
            outputCol="frequency"
        )
    )

    # Append the IDF vectorizer if requested
    if tfidf:
        stages.append(
            IDF(inputCol=stages[-1].getOutputCol(), outputCol="tfidf")
        )

    # Return the completed pipeline
    return Pipeline(stages=stages)

In [18]:
from tabulate import tabulate
from pyspark.ml import Pipeline
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import Word2Vec, Tokenizer

# Create the vector/cluster pipeline
pipeline = Pipeline(stages=[
    Tokenizer(inputCol="text", outputCol="tokens"),
    Word2Vec(vectorSize=7, minCount=0, inputCol="tokens", outputCol="vecs"),
    BisectingKMeans(k=10, featuresCol="vecs", maxIter=10),
])

# Fit the model
model = pipeline.fit(corpus)
corpus = model.transform(corpus)

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

In [19]:
# Retrieve stages
bkm = model.stages[-1]
wvec = model.stages[-2]

# Evaluate clustering
cost = bkm.computeCost(corpus)
sizes = bkm.summary.clusterSizes

NameError: name 'model' is not defined

In [20]:
# Get the text representation of each cluster
table = [["Cluster", "Size", "Terms"]]
for ci, c in enumerate(bkm.clusterCenters()):
    ct = wvec.findSynonyms(c, 7)
    size = sizes[ci]
    terms = " ".join([row.word for row in ct.take(7)])
    table.append([ci, size, terms])

# Print the results
print(tabulate(table))
print("Sum of square distance to center: {:0.3f}".format(cost))

NameError: name 'bkm' is not defined

In [21]:
from pyspark.ml.feature import StringIndexer

# Create the vectorizer
vector = make_vectorizer().fit(corpus)

# Index the labels of the classification
labelIndex = StringIndexer(inputCol="label", outputCol="indexedLabel")
labelIndex = labelIndex.fit(corpus)

# Split the data into training and test sets
training, test = corpus.randomSplit([0.8, 0.2])

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

In [22]:
from pyspark.ml.classification import LogisticRegression

model = Pipeline(stages=[
    vector, labelIndex, clf
]).fit(training)

# Make predictions
predictions = model.transform(test)
predictions.select("prediction", "indexedLabel", "tfidf").show(5)

NameError: name 'vector' is not defined

In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel",
    predictionCol="prediction",
    metricName="f1"
)

score = evaluator.evaluate(predictions)
print("F1 Score: {:0.3f}".format(score))

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

In [24]:
# Vectorize the corpus on the cluster
vector = make_vectorizer().fit(corpus)
corpus = vector.transform(corpus)

# Get the sample from the dataset
sample = corpus.sample(False, 0.1).collect()
X = [row['tfidf'] for row in sample]
y = [row['label'] for row in sample]

# Train a Scikit-Learn Model
clf = AdaBoostClassifier()
clf.fit(X, y)

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

In [25]:
# Broadcast the Scikit-Learn Model to the cluster
clf = sc.broadcast(clf)

# Create accumulators for correct vs incorrect
correct = sc.accumulator(0)
incorrect = sc.accumulator(1)

NameError: name 'sc' is not defined

In [26]:
def make_accuracy_closure(model, correct, incorrect):
    # model should be a broadcast variable
    # correct and incorrect should be accumulators
    def inner(rows):
        X = []
        y = []

        for row in rows:
            X.append(row['tfidf'])
            y.append(row['label'])

        yp = model.value.predict(X)
        for yi, ypi in zip(y, yp):
            if yi == ypi:
                correct.add(1)
            else:
                incorrect.add(1)
    return inner

In [27]:
# Create the accuracy closure
accuracy = make_accuracy_closure(clf, incorrect, correct)

# Compute the number incorrect and correct
corpus.foreachPartition(accuracy)

accuracy = float(correct.value) / float(correct.value + incorrect.value)
print("Global accuracy of model was {}".format(accuracy))

NameError: name 'clf' is not defined