In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'

print(os.environ.get('PYSPARK_DRIVER_PYTHON'))
print(os.environ.get('PYSPARK_PYTHON'))

/usr/bin/python3
/usr/bin/python3


In [2]:
import pandas as pd
import numpy as np
import pyspark as spark
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Cover")\
    .getOrCreate()

In [23]:
from string import punctuation
from itertools import chain
from collections import Counter
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import udf, col, explode, monotonically_increasing_id, when, split, lower

class Cover:
    def __init__(self, window_size=5, min_occurrence_count=1):
        self.window_size = window_size
        self.min_occurrence_count = min_occurrence_count
        self.transformed_data = []
        self.corpus = None
    
    def import_data(self, filename):
        self.corpus = spark.read.format("csv").option("header", "True").option("mode", "DROPMALFORMED").load(filename)
        print("Corpus has {} documents".format(self.corpus.count()))
        
    def fit_transform(self, column_name):
        if self.corpus is None:
            print()
        else:
            tokenise = udf(lambda x: x.lower().translate(str.maketrans('','',punctuation)).split(' ') if x else [], ArrayType(StringType()))
            tokenised_dataframe = self.corpus.withColumn('tokens', tokenise(column_name).alias('tokens'))

            words_dataframe = tokenised_dataframe.withColumn('word', explode(col('tokens')))\
                                 .groupBy('word')\
                                 .count()\
                                 .sort('count', ascending=True)
            
            #Need to find a way to automatically assign ID's from 1 - vocab size
            words_with_id_dataframe = words_dataframe.withColumn('id', monotonically_increasing_id() + 1)

            filtered_words_with_id_dataframe = words_with_id_dataframe.withColumn('id', when(words_with_id_dataframe['count'] <= self.min_occurrence_count, 0).otherwise(words_with_id_dataframe.id))
            
            token_to_id = filtered_words_with_id_dataframe.rdd.map(lambda r : (r.word,r.id)).collectAsMap()
            
            print("There are {} unique words".format(len(token_to_id)))
            
            get_id = udf(lambda x: [token_to_id[word] for word in x], ArrayType(StringType()))
            transformed_dataframe = tokenised_dataframe.withColumn('transform', get_id('tokens').alias('transform'))
            
            print("Transformed tokens to id!".format(len(token_to_id)))
            
            ngrams = udf(lambda x: ngramsa(x), ArrayType(StringType()))
            
            matrix = transformed_dataframe.withColumn("matrix", ngrams("transform").alias('matrix'))
            
            matrix.select('matrix').show(50)
                
    def build_cooccur_matrix(self):
        ij_list = []
        cooccur_matrix = np.fromiter(())

    def get_ngramso(indexes):
        ngrams = defaultdict(lambda: 0)
        for i, left_index in enumerate(indexes):
            window = indexes[i + 1:i + 3 + 1]
            for distance, right_index in enumerate(window):
                ngrams[frozenset((left_index, right_index))] += (distance + 1)
        return ngrams

    def ngramsa(indexes):
        ngrams = []
        for i, left_index in enumerate(indexes):
            window = indexes[i + 1:i + 3 + 1]
            for distance, right_index in enumerate(window):
                ngrams.append([left_index, right_index, distance + 1])
        return ngrams

In [18]:
from itertools import islice
from collections import defaultdict
sentence = "Hi my name is, hi my age is"
sentence_list = sentence.lower().split(' ')

word_to_index = list(set(sentence_list))
transform_sentence_list = [word_to_index.index(word) for word in sentence_list]

In [24]:
import time
filename = '/opt/training/data/raw/billboard_lyrics_1964-2015.csv'
column_name = 'lyrics'
cover = Cover(min_occurrence_count=5)

start_time = time.time()
cover.import_data(filename)
cover.fit_transform(column_name)
end_time = time.time()

print("Time taken is {}".format(end_time-start_time))

Corpus has 5100 documents
There are 42181 unique words
Transformed tokens to id!


Py4JJavaError: An error occurred while calling o693.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 52.0 failed 1 times, most recent failure: Lost task 0.0 in stage 52.0 (TID 3640, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 75, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 67, in <lambda>
    return lambda *a: g(f(*a))
  File "/usr/local/lib/python3.5/dist-packages/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-23-37f210f9c1d3>", line 44, in <lambda>
NameError: name 'ngramsa' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 75, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 67, in <lambda>
    return lambda *a: g(f(*a))
  File "/usr/local/lib/python3.5/dist-packages/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-23-37f210f9c1d3>", line 44, in <lambda>
NameError: name 'ngramsa' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
dic = {'a': 1}

dic['a'] += 1
dic['a'] += 2

print(dic['a'])

In [None]:
from nltk.corpus import gutenberg

cover = Cover()
texts = gutenberg.sents('shakespeare-macbeth.txt')
sentences = [" ".join(list_of_words) for list_of_words in texts]

start_time = time.time()
data = cover.fit_transform(sentences)
end_time = time.time()
print("Time taken is {}".format(end_time-start_time))
print(data[1000])