# 1. Set up spark context and SparkSession

In [61]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


# 2. Load dataset

In [62]:
raw_data = spark.read.csv('InputFile.txt', inferSchema=True, sep='\t',header=True).toDF('words')

In [63]:
raw_data.count()

100000

In [64]:
raw_data.distinct().count()

93405

# 3.  Clean and Manipulation

In [65]:
u_raw_data= raw_data.distinct()

In [66]:
type(u_raw_data)

pyspark.sql.dataframe.DataFrame

In [67]:
u_raw_data.show(4, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                                                                                                                    |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[ACTH Syndrome, Ectopic]  [Adrenal Gland Neoplasms]  [Adrenocorticotropic Hormone]  [Corticotropin-Releasing Hormone]  [Cushing Syndrome]  [Dexamethasone]  [Diagnosis, Differential]  [Humans]  [Hydrocortisone]                                  

In [68]:
## define udf function
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def keywords_parser(s):
    s = s.replace(',', '')
    s1 = s.strip('[|]')
    s2 = re.split('\]\s+\[', s1)
    return [''.join(y.split()) for y in s2]

In [69]:
%time keywords_parser_udf = udf(keywords_parser, ArrayType(StringType()))

CPU times: user 1.79 ms, sys: 523 µs, total: 2.31 ms
Wall time: 11.6 ms


In [70]:
df = u_raw_data.select(keywords_parser_udf(u_raw_data.words)).toDF('term')

# 4. Build corpus and document-term matrix

In [71]:
from pyspark.ml.feature import CountVectorizer
#count_vectorizer = CountVectorizer(vocabSize=pow(2,4),inputCol='term', outputCol='features')
count_vectorizer = CountVectorizer(vocabSize=166,inputCol='term',outputCol='features')
countVectorizer_mod = count_vectorizer.fit(df)
countVectorizer_twitter = countVectorizer_mod.transform(df)

### 4.1 build corpus

In [72]:
voca = countVectorizer_mod.vocabulary
len(voca)

166

In [73]:
with open("voca.txt", "w") as f:
    for s in voca:
        f.write(str(s) +"\n")

### 4.2 build document-term matrix

In [74]:
df = countVectorizer_twitter.select('features')

In [75]:
df.count()

93405

In [76]:
df.show(truncate=False)

+-----------------------------------------------------------+
|features                                                   |
+-----------------------------------------------------------+
|(166,[0,26],[1.0,1.0])                                     |
|(166,[0,2,13,86,95,160],[1.0,1.0,1.0,1.0,1.0,1.0])         |
|(166,[1,22,24,115],[1.0,1.0,1.0,1.0])                      |
|(166,[0,28],[1.0,1.0])                                     |
|(166,[],[])                                                |
|(166,[0,3,4,44,152],[1.0,1.0,1.0,1.0,1.0])                 |
|(166,[0,2,3,4,5,9],[1.0,1.0,1.0,1.0,1.0,1.0])              |
|(166,[0,2,3,20,39],[1.0,1.0,1.0,1.0,1.0])                  |
|(166,[1,43,115],[1.0,1.0,1.0])                             |
|(166,[0,28,141],[1.0,1.0,1.0])                             |
|(166,[0,101],[1.0,1.0])                                    |
|(166,[1,18,39],[1.0,1.0,1.0])                              |
|(166,[0,80,84],[1.0,1.0,1.0])                              |
|(166,[0

In [77]:
type(df)

pyspark.sql.dataframe.DataFrame

In [78]:
from pyspark.ml.linalg import Vectors, DenseMatrix

In [79]:
dA = df.rdd.map(lambda vector: Vectors.dense(vector[0].toArray()))

In [80]:
type(dA)

pyspark.rdd.PipelinedRDD

In [81]:
dA.collect()

[DenseVector([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
 DenseVector([1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 

In [53]:
from pyspark.mllib.linalg.distributed import *

def as_block_matrix(rdd, rowsPerBlock=93405, colsPerBlock=166):
    return IndexedRowMatrix(
        rdd.zipWithIndex().map(lambda xi: IndexedRow(xi[1], xi[0]))
    ).toBlockMatrix(rowsPerBlock, colsPerBlock)

In [54]:
matrixA = as_block_matrix(dA)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 51.0 failed 1 times, most recent failure: Lost task 0.0 in stage 51.0 (TID 2662, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/spark/python/pyspark/rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-53-f3925d8264fa>", line 5, in <lambda>
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/distributed.py", line 314, in __init__
    self.vector = _convert_to_vector(vector)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/spark/python/pyspark/rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-53-f3925d8264fa>", line 5, in <lambda>
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/distributed.py", line 314, in __init__
    self.vector = _convert_to_vector(vector)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


# 6. Build term-term matrix

In [31]:
%time A = mat.transpose().dot(mat)

CPU times: user 20.4 ms, sys: 32 µs, total: 20.5 ms
Wall time: 19.8 ms


In [41]:
B = A.toarray()

In [56]:
type(B)

numpy.ndarray

In [44]:
B.shape

(166, 166)

# 7. Extract the frequency large than 50

In [72]:
import numpy as np
import pandas as pd
uptri = np.triu(B, 1)

In [73]:
uptri[uptri>50]

array([  5275.,  17556.,  14052., ...,     76.,     87.,     71.])

In [82]:
tempM = np.where( uptri > 50 )

In [122]:
index = pd.DataFrame(list(tempM)).transpose()

In [124]:
value = pd.DataFrame(list(uptri[uptri>50]))

In [140]:
vocab = pd.DataFrame(voca)
vocab.head()

Unnamed: 0,0
0,Humans
1,Animals
2,Female
3,Male
4,Adult


In [136]:
indexFreq = pd.concat([index,value],axis=1)
indexFreq.columns = ['index1','index2','freq']

In [138]:
indexFreq.head()

Unnamed: 0,index1,index2,freq
0,0,1,5275
1,0,2,17556
2,0,3,14052
3,0,4,10401
4,0,5,8189


In [153]:
join1 = indexFreq.join(vocab, on='index2',how='left')
join1.columns = ['index1','index2','freq','term.x']

In [154]:
join2 = join1.join(vocab, on='index1',how='left')
join2.columns = ['index1','index2','freq','term.x','term.y']

In [155]:
join2.head()

Unnamed: 0,index1,index2,freq,term.x,term.y
0,0,1,5275,Animals,Humans
1,0,2,17556,Female,Humans
2,0,3,14052,Male,Humans
3,0,4,10401,Adult,Humans
4,0,5,8189,MiddleAged,Humans


In [165]:
output = join2[['term.x','term.y','freq']]

In [163]:
output.head()

Unnamed: 0,term.x,term.y,freq
0,Animals,Humans,5275
1,Female,Humans,17556
2,Male,Humans,14052
3,Adult,Humans,10401
4,MiddleAged,Humans,8189
