In [1]:
# Yes we need both these imports
from pyspark.sql.types import *
from pyspark.sql.types import StructField, StructType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.udf import UserDefinedFunction
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Word2Vec, VectorAssembler
from pyspark.ml.clustering import KMeans

In [2]:
import re
import pandas as pd
try:
    from sparklingml.feature.python_pipelines import SpacyTokenizeTransformer
except:
    print("Sparklingml not currently installed w/e")

Using backing jar /sparklingml/sparklingml/../target/scala-2.11/sparklingml-assembly-0.0.1-SNAPSHOT.jar


In [3]:
fs_prefix = "gs://boo-stuff/"

In [4]:
SparkSession.builder.getOrCreate().stop()

In [5]:
session = (SparkSession.builder
           .appName("whatDoesTheMailingListLookLike")
           .config("spark.executor.instances", "20")
           .config("spark.driver.memoryOverhead", "0.25")
           .config("spark.executor.memory", "16g")
           .config("spark.dynamicAllocation.enabled", "false")
           .config("spark.ui.enabled", "true")
          ).getOrCreate()
sc = session.sparkContext

In [6]:
# TODO: pandas UDF accelerate (but multiple pieces of informaiton returned at the same time)
def lookup_sentiment(document):
    """Looks up the sentiment for a specific document."""
    from nltk.sentiment.vader import SentimentIntensityAnalyzer

    # Hack to download if needed
    # TODO(holden): Consider broadcast variable?
    try:
        sid = SentimentIntensityAnalyzer()
    except LookupError:
        import nltk
        nltk.download('vader_lexicon')
        sid = SentimentIntensityAnalyzer()

    sid = SentimentIntensityAnalyzer()
    return sid.polarity_scores(document)

In [7]:
sentiment_schema = StructType([
    StructField("neg", DoubleType()),
    StructField("neu", DoubleType()),
    StructField("pos", DoubleType()),
    StructField("compound", DoubleType())])

lookup_sentiment_udf = UserDefinedFunction(
    lookup_sentiment,
    sentiment_schema,
    "lookup_sentiment_2")

In [8]:
mailing_list_posts_mbox_df = session.read.parquet(
    "{0}/processed_mbox_data_9".format(fs_prefix))# previous 4

In [9]:
spark_mailing_list_data = mailing_list_posts_mbox_df.filter(
    mailing_list_posts_mbox_df.project_name == "spark").repartition(60).cache()

In [10]:
spark_mailing_list_data.show()

+------------+--------+-------+------------+---------------+--------+--------------------+--------------------+----------------+--------------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+
|project_name|box_type|mbox_id|backend_name|backend_version|category|                data|              origin|perceval_version|                 tag|          timestamp|   updated_on|                uuid|                from|from_processed_email|                body|           post_date|          message_id|         in_reply_to|content_language|
+------------+--------+-------+------------+---------------+--------+--------------------+--------------------+----------------+--------------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------

In [11]:
def extract_date_from_email_datefield(datefield):
    if datefield is None:
        return None
    from datetime import datetime
    try:
        regex_str = "^([\s\w,:]+)([+-]|).*$"
        match = re.search(regex_str, datefield)
        #WIP
        #datetime_object = datetime.strptime(match.group(1), '%a, %d %b %Y %H:%M:%s ')
        return match.group(1)
    except:
        return None


extract_date_from_email_datefield_udf = UserDefinedFunction(
    extract_date_from_email_datefield, StringType(), "extract_date_from_email_datefield")

session.catalog._jsparkSession.udf().registerPython(
    "extract_date_from_email_datefield",
    extract_date_from_email_datefield_udf._judf)

In [12]:
spark_mailing_list_data.select(spark_mailing_list_data.post_date,
                               extract_date_from_email_datefield_udf(spark_mailing_list_data.post_date),
                               to_date(spark_mailing_list_data.post_date)).take(5)

[Row(post_date='Wed, 12 Feb 2014 04:42:57 +0000 (UTC)', extract_date_from_email_datefield(post_date)='Wed, 12 Feb 2014 04:42:57 ', to_date(`post_date`)=None),
 Row(post_date='Mon, 17 Feb 2014 11:36:16 +0000 (UTC)', extract_date_from_email_datefield(post_date)='Mon, 17 Feb 2014 11:36:16 ', to_date(`post_date`)=None),
 Row(post_date='Tue, 25 Feb 2014 02:17:33 +0000 (UTC)', extract_date_from_email_datefield(post_date)='Tue, 25 Feb 2014 02:17:33 ', to_date(`post_date`)=None),
 Row(post_date='Fri, 6 Nov 2015 19:09:30 +0000', extract_date_from_email_datefield(post_date)='Fri, 6 Nov 2015 19:09:30 ', to_date(`post_date`)=None),
 Row(post_date='Tue, 1 Jul 2014 15:21:24 +0530', extract_date_from_email_datefield(post_date)='Tue, 1 Jul 2014 15:21:24 ', to_date(`post_date`)=None)]

In [13]:
mailing_list_posts_in_reply_to = spark_mailing_list_data.filter(spark_mailing_list_data.in_reply_to.isNotNull()).alias("mailing_list_posts_in_reply_to")
initial_posts = spark_mailing_list_data.filter(spark_mailing_list_data.in_reply_to.isNull()).alias("initial_posts")

posts_without_replies = initial_posts.join(
        mailing_list_posts_in_reply_to,
        [col("mailing_list_posts_in_reply_to.in_reply_to") == initial_posts.message_id],
        "left_outer")


In [14]:
posts_without_replies.filter(col("mailing_list_posts_in_reply_to.message_id").isNull()).count()

8536

In [15]:
posts_by_date_count = spark_mailing_list_data.select(
    date_trunc("dd", from_unixtime(spark_mailing_list_data.timestamp)).alias("date")) \
    .groupBy("date").count()

In [16]:
posts_by_date_count.toPandas()

Unnamed: 0,date,count
0,2018-08-07,101321


In [39]:
tokenizer = None
# TODO - fix spacy tokenizetransformer
#try:
#    tokenizer = SpacyTokenizeTransformer(inputCol="body", outputCol="body_tokens")
#except:
#tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
spacy_tokenizer = SpacyTokenizeTransformer(inputCol="body", outputCol="body_tokens")
builtin_tokenizer = tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
tokenizer = spacy_tokenizer

In [18]:
# Todo - UDF to exctract & UDF to detect programming language & UDF to extract files in a stack trace


In [19]:
def extract_links(body):
    import re
    link_regex_str = r'(http(|s)://(.*?))([\s\n]|$)'
    itr = re.finditer(link_regex_str, body, re.MULTILINE)
    return list(map(lambda elem: elem.group(1), itr))

def extract_domains(links):
    from urllib.parse import urlparse
    def extract_domain(link):
        try:
            nloc = urlparse(link).netloc
            # We want to drop www and any extra spaces wtf nloc on the spaces.
            regex_str = r'^(www\.|)(.*?)\s*$'
            match = re.search(regex_str, nloc)
            return match.group(2)
        except:
            return None
    return list(map(extract_domain, links))

def contains_python_stack_trace(body):
    return "Traceback (most recent call last)" in body



def contains_probably_java_stack_trace(body):
    # Look for something based on regex
    # Tried https://stackoverflow.com/questions/20609134/regular-expression-optional-multiline-java-stacktrace - more msg looking
    # Tried https://stackoverflow.com/questions/3814327/regular-expression-to-parse-a-log-file-and-find-stacktraces
    # Yes the compile is per call, but it's cached so w/e
    import re
    stack_regex_str = r'^\s*(.+Exception.*):\n(.*\n){0,3}?(\s+at\s+.*\(.*\))+'
    match = re.search(stack_regex_str, body, re.MULTILINE)
    return match is not None


def contains_exception_in_task(body):
    # Look for a line along the lines of ERROR Executor: Exception in task 
    return "ERROR Executor: Exception in task" in body
    

In [20]:
extract_links_udf = UserDefinedFunction(
    extract_links, ArrayType(StringType()), "extract_links")

session.catalog._jsparkSession.udf().registerPython(
    "extract_links",
    extract_links_udf._judf)


extract_domains_udf = UserDefinedFunction(
    extract_domains, ArrayType(StringType()), "extract_domains")

session.catalog._jsparkSession.udf().registerPython(
    "extract_domains",
    extract_domains_udf._judf)


contains_python_stack_trace_udf = UserDefinedFunction(
    contains_python_stack_trace, BooleanType(), "contains_python_stack_trace")

session.catalog._jsparkSession.udf().registerPython(
    "contains_python_stack_trace",
    contains_python_stack_trace_udf._judf)


contains_probably_java_stack_trace_udf = UserDefinedFunction(
    contains_probably_java_stack_trace, BooleanType(), "contains_probably_java_stack_trace")

session.catalog._jsparkSession.udf().registerPython(
    "contains_probably_java_stack_trace",
    contains_probably_java_stack_trace_udf._judf)


contains_exception_in_task_udf = UserDefinedFunction(
    contains_exception_in_task, BooleanType(), "contains_exception_in_task")

session.catalog._jsparkSession.udf().registerPython(
    "contains_exception_in_task",
    contains_exception_in_task_udf._judf)

We could make this a transformer stage, but I'm lazy so we'll just use a UDF directly.

In [21]:
annotated_spark_mailing_list_data = spark_mailing_list_data.select(
    "*",
    extract_links_udf(spark_mailing_list_data.body).alias("links_in_email"),
    contains_python_stack_trace_udf(spark_mailing_list_data.body).alias("contains_python_stack_trace").cast("double"),
    contains_probably_java_stack_trace_udf(spark_mailing_list_data.body).alias("contains_java_stack_trace").cast("double"),
    contains_exception_in_task_udf(spark_mailing_list_data.body).alias("contains_exception_in_task").cast("double"))

In [22]:
annotated_spark_mailing_list_data.cache()

DataFrame[project_name: string, box_type: string, mbox_id: string, backend_name: string, backend_version: string, category: string, data: map<string,string>, origin: string, perceval_version: string, tag: string, timestamp: double, updated_on: double, uuid: string, from: string, from_processed_email: string, body: string, post_date: string, message_id: string, in_reply_to: string, content_language: string, links_in_email: array<string>, contains_python_stack_trace: double, contains_java_stack_trace: double, contains_exception_in_task: double]

In [23]:
annotated_spark_mailing_list_data.show()

+------------+--------+-------+------------+---------------+--------+--------------------+--------------------+----------------+--------------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+---------------------------+-------------------------+--------------------------+
|project_name|box_type|mbox_id|backend_name|backend_version|category|                data|              origin|perceval_version|                 tag|          timestamp|   updated_on|                uuid|                from|from_processed_email|                body|           post_date|          message_id|         in_reply_to|content_language|      links_in_email|contains_python_stack_trace|contains_java_stack_trace|contains_exception_in_task|
+------------+--------+-------+------------+---------------+--------+--------------------+----------

In [24]:
further_annotated = annotated_spark_mailing_list_data.withColumn(
    "domain_links",
    extract_domains_udf(annotated_spark_mailing_list_data.links_in_email)).withColumn(
    "is_thread_start",
    isnull(annotated_spark_mailing_list_data.in_reply_to).cast(DoubleType()))
# Long story, allow mixed UDF types
further_annotated.cache()
further_annotated.count()

101321

In [25]:
body_hashing = HashingTF(inputCol="body_tokens", outputCol="raw_body_features", numFeatures=10000)
body_idf =IDF(inputCol="raw_body_features", outputCol="body_features")

In [26]:
body_word2Vec = Word2Vec(vectorSize=5, minCount=0, numPartitions=10, inputCol="body_tokens", outputCol="body_features")

In [27]:
domains_hashing = HashingTF(inputCol="domain_links", outputCol="raw_domain_features", numFeatures=100)
domains_idf = IDF(inputCol="raw_domain_features", outputCol="domain_features")

In [28]:
assembler = VectorAssembler(
    inputCols=["body_features", "contains_python_stack_trace", "contains_java_stack_trace", 
              "contains_exception_in_task", "is_thread_start", "domain_features"],
    outputCol="features")

In [29]:
kmeans = KMeans(featuresCol="features", k=2, seed=42)

In [30]:
dataprep_pipeline = Pipeline(stages=[tokenizer, body_hashing, body_idf, domains_hashing, domains_idf, assembler])
pipeline = Pipeline(stages=[dataprep_pipeline, kmeans])

In [41]:
dataprep_pipeline.fit(test)

Py4JJavaError: An error occurred while calling o429.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 4 times, most recent failure: Lost task 0.3 in stage 32.0 (TID 635, holden-magic-sw-56d6.c.boos-demo-projects-are-rad.internal, executor 11): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 260, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 82, in verify_result_length
    result = f(*a)
  File "/usr/lib/spark/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/sparklingml/sparklingml/transformation_functions.py", line 118, in inner
    nlp = SpacyMagic.get(lang)
  File "/sparklingml/sparklingml/transformation_functions.py", line 92, in get
    cls._spacys[lang] = spacy.load(lang)
  File "/opt/conda/lib/python3.6/site-packages/spacy/__init__.py", line 18, in load
    return util.load_model(name, **overrides)
  File "/opt/conda/lib/python3.6/site-packages/spacy/util.py", line 119, in load_model
    raise IOError(Errors.E050.format(name=name))
OSError: [E050] Can't find model 'en'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:171)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:92)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 260, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 82, in verify_result_length
    result = f(*a)
  File "/usr/lib/spark/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/sparklingml/sparklingml/transformation_functions.py", line 118, in inner
    nlp = SpacyMagic.get(lang)
  File "/sparklingml/sparklingml/transformation_functions.py", line 92, in get
    cls._spacys[lang] = spacy.load(lang)
  File "/opt/conda/lib/python3.6/site-packages/spacy/__init__.py", line 18, in load
    return util.load_model(name, **overrides)
  File "/opt/conda/lib/python3.6/site-packages/spacy/util.py", line 119, in load_model
    raise IOError(Errors.E050.format(name=name))
OSError: [E050] Can't find model 'en'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:171)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
from pyspark.ml.pipeline import Transformer
isinstance(tokenizer, Transformer)

In [31]:
test = further_annotated.limit(10).cache()
test.count()

10

In [37]:
test_model = pipeline.fit(test)

Py4JJavaError: An error occurred while calling o429.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 (TID 627, holden-magic-sw-56d6.c.boos-demo-projects-are-rad.internal, executor 11): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 260, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 82, in verify_result_length
    result = f(*a)
  File "/usr/lib/spark/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/sparklingml/sparklingml/transformation_functions.py", line 118, in inner
    nlp = SpacyMagic.get(lang)
  File "/sparklingml/sparklingml/transformation_functions.py", line 92, in get
    cls._spacys[lang] = spacy.load(lang)
  File "/opt/conda/lib/python3.6/site-packages/spacy/__init__.py", line 18, in load
    return util.load_model(name, **overrides)
  File "/opt/conda/lib/python3.6/site-packages/spacy/util.py", line 119, in load_model
    raise IOError(Errors.E050.format(name=name))
OSError: [E050] Can't find model 'en'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:171)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:92)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 260, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 82, in verify_result_length
    result = f(*a)
  File "/usr/lib/spark/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/sparklingml/sparklingml/transformation_functions.py", line 118, in inner
    nlp = SpacyMagic.get(lang)
  File "/sparklingml/sparklingml/transformation_functions.py", line 92, in get
    cls._spacys[lang] = spacy.load(lang)
  File "/opt/conda/lib/python3.6/site-packages/spacy/__init__.py", line 18, in load
    return util.load_model(name, **overrides)
  File "/opt/conda/lib/python3.6/site-packages/spacy/util.py", line 119, in load_model
    raise IOError(Errors.E050.format(name=name))
OSError: [E050] Can't find model 'en'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:171)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [38]:
test_result = test_model.transform(test)
test_result.toPandas()

NameError: name 'test_model' is not defined

In [None]:
data_prep_model = dataprep_pipeline.fit(further_annotated)
preped_data = data_prep_model.transform(further_annotated)
preped_data.count()

In [None]:
kmeans = kmeans.fit(preped_data)

In [None]:
result = kmeans.transform(preped_data)

In [None]:
window = Window.partitionBy(result.prediction).orderBy(result.body)

result.select('*', rank().over(window).alias('rank')) \
  .filter(col('rank') <= 10).limit(100).toPandas()

In [None]:
result.filter(result.prediction != 0).count()

In [None]:
def process(k):
    kmeans = KMeans(featuresCol="features", k=k, seed=42)
    kmeans_model = kmeans.fit(preped_data)
    result = kmeans_model.transform(preped_data)
    non_zero = result.filter(result.prediction != 0).count()
    num_per_group = result.select(result.prediction).groupBy(result.prediction).count().collect()
    return (k, kmeans_model, result, non_zero, num_per_group)
results = list(map(process, [2, 3, 4, 5, 8, 9, 10, 15, 20, 25, 50, 100, 150, 200, 250]))

In [None]:
results

In [None]:
num_predictions_by_k = list(map(lambda r: (r[0], dict(r[4])), results))

In [None]:
num_predictions_by_k

In [None]:
num_non_zero_by_k = list(map(lambda r: (r[0], r[3]), results[8:]))

In [None]:
num_non_zero_by_k

In [None]:
result = results[13][2]
result

In [None]:
group_sizes = results[13][4]
group_sizes

In [None]:
kmeans = results[13][0]
kmeans

In [None]:
import pandas as pd

In [None]:
pd.set_option('display.max_colwidth', 1000)

In [None]:
result.select("data", "prediction").limit(10).toPandas()

In [None]:
result.filter(result.contains_python_stack_trace == 1).select("data", "prediction").limit(10).toPandas()

In [None]:
result.filter(result.contains_java_stack_trace == 1).select("data", "prediction").limit(10).toPandas()

In [None]:
def fetch_for_group(group):
    group_id = group.prediction
    return result.filter(result.prediction == group_id).limit(100).toPandas()

all_the_results = list(map(fetch_for_group, filter(lambda x: x[1] > 10, group_sizes)))

In [None]:
pd.set_option('display.max_colwidth', 5000)

In [None]:
all_the_results[0]["body"]

In [None]:
all_the_results[0]["prediction"]

In [None]:
all_the_results[1]["body"]

In [None]:
all_the_results[1]["prediction"][0]

In [None]:
all_the_results[2]["body"][6]

In [None]:
all_the_results[2]["prediction"][2]

In [None]:
all_the_results[3]["body"][5]

In [None]:
all_the_results[3]["prediction"][3]

In [None]:
all_the_results[4]["body"][5]

In [None]:
all_the_results[4]["prediction"][0]

In [42]:
import spacy

In [43]:
spacy.cli.download("en")


[93m    Linking successful[0m
    /opt/conda/lib/python3.6/site-packages/en_core_web_sm -->
    /opt/conda/lib/python3.6/site-packages/spacy/data/en

    You can now load the model via spacy.load('en')



In [44]:
sp_model = spacy.load("en")

In [45]:
sc.broadcast(sp_model)

Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 83, in dump
    pickle.dump(value, f, 2)
AttributeError: Can't pickle local object 'FeatureExtracter.<locals>.feature_extracter_fwd'


PicklingError: Could not serialize broadcast: AttributeError: Can't pickle local object 'FeatureExtracter.<locals>.feature_extracter_fwd'