In [1]:

# Now, we will focus on feature extraction and engineering in order to ready the
# data for the clustering algorithm run. We instantiate the Spark Context and read
# the Twitter dataset into a Spark dataframe. We will then successively tokenize the tweet text data, 
# apply a hashing Term frequency algorithm to the tokens, 
# and  nally apply the Inverse Document Frequency algorithm and rescale the data. 
# The code is as follows:


import pandas as pd
from pyspark import SQLContext

# CSV to Pandas DF
csv_in = "/home/carl/spark/examples/carl_Spark/data/mllib/twtr15053001-5.csv"
pddf_in = pd.read_csv(csv_in, sep=',', encoding='utf-8')
sqlContext = SQLContext(sc)

# Pandas DF to Spark DF
spdf_02 = sqlContext.createDataFrame(pddf_in[['id', 'user_id', 'user_name']])

spdf_02.show()

+----------+----------------+--------------+
|        id|         user_id|     user_name|
+----------+----------------+--------------+
|3168398228|Who What Wear AU|Kendall Jenner|
+----------+----------------+--------------+



In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="user_name", outputCol="tokens")
tokensData = tokenizer.transform(spdf_02)


In [5]:
# Applying Hashing TF to the tokens
hashingTF = HashingTF(inputCol="tokens", outputCol="rawFeatures", numFeatures=2000)
featuresData = hashingTF.transform(tokensData)

featuresData.take(1)

[Row(id=3168398228, user_id=u'Who What Wear AU', user_name=u'Kendall Jenner', tokens=[u'kendall', u'jenner'], rawFeatures=SparseVector(2000, {203: 1.0, 1234: 1.0}))]

In [6]:
# Applying IDF(Inverse Document Frequency) to the raw features and rescale the data
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featuresData)
rescaledData = idfModel.transform(featuresData)

for features in rescaledData.select('features').take(3):
    print(features)

rescaledData.take(2)

Row(features=SparseVector(2000, {203: 0.0, 1234: 0.0}))


[Row(id=3168398228, user_id=u'Who What Wear AU', user_name=u'Kendall Jenner', tokens=[u'kendall', u'jenner'], rawFeatures=SparseVector(2000, {203: 1.0, 1234: 1.0}), features=SparseVector(2000, {203: 0.0, 1234: 0.0}))]

In [7]:
rs_pddf = rescaledData.toPandas()
rs_pddf.count()

id             1
user_id        1
user_name      1
tokens         1
rawFeatures    1
features       1
dtype: int64

In [8]:
feat_lst = rs_pddf.features.tolist()
feat_lst

[SparseVector(2000, {203: 0.0, 1234: 0.0})]

In [9]:
# We will use the K-Means algorithm against the Twitter dataset. 
# As an unlabeled and shuf ed bag of tweets, we want to see if the Apache Spark tweets are grouped in a single cluster. 
# From the previous steps, 
# the TF-IDF sparse vector of features is converted into an RDD that will be the input to the Spark MLlib program.
# We initialize the K-Means model with 5 clusters, 10 iterations of 10 runs:

from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt

in_Data = sc.parallelize(feat_lst)

in_Data.take(3)




[SparseVector(2000, {203: 0.0, 1234: 0.0})]

In [10]:
in_Data.count()
# parsedData = in_Data.map(lambda line: array([Vendor(line)]))

1

In [11]:
# Build the model
clusters = KMeans.train(in_Data, 2, maxIterations=10, runs=10, initializationMode="random")

# Evaluate clustering by computing within Set Sum of Squared Errors



Py4JJavaError: An error occurred while calling o131.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1115)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:545)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:534)
	at org.apache.spark.mllib.clustering.KMeans.initRandom(KMeans.scala:363)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:254)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:219)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:201)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainKMeansModel(PythonMLLibAPI.scala:367)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/carl/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([xx*2 for x in (point - center)]))

WSSSE = in_Data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

In [10]:
print(sc)

<pyspark.context.SparkContext object at 0x7f7eed939e90>
