# Simple data analysis with Apache Spark


In [4]:
# Do an initial test of Spark to make sure it works.
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext('local[*]')
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)
sc.stop()


Now that we have checked that PySpark is up and running let's start to do some processing using a csv file generated previously using fb_post.py. For instructions on using fb_scrapper please see the readme file.

In [5]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import regexp_replace, trim, col, lower 
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer

sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('160558090672531_facebook_statuses.csv')
df.show()
# Drop the null crap 
df = df.na.drop(subset=["status_message"])
#Remove punctation from status messages
df2 = df.select(regexp_replace("status_message", "\p{Punct}", "").alias("status_message"))
df2.show()
messages = df2.select("status_message")
messages.show()
#Tokenize and remove stop words
tokenizer = Tokenizer(inputCol="status_message", outputCol="filtered")
filterw = tokenizer.transform(messages)
filterw.show()
remover = StopWordsRemover(inputCol="filtered", outputCol="filtered1")
filtered_final = remover.transform(filterw)
filtered_final.show()
messages = filtered_final.select("filtered1")
#Convert to RDD
message_rdd=messages.rdd


+--------------------+--------------------+----------+-----------+--------------------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|           status_id|      status_message| link_name|status_type|         status_link|   status_published|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|
+--------------------+--------------------+----------+-----------+--------------------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|160558090672531_1...|So, rain is in th...|      null|     status|                null|2016-12-22 13:34:30|            4|          15|         0|        4|        0|       0|        0|       0|         0|
|160558090672531_1...|                null|      null|     status|                null|2016-12-22 21:38:02|           15|           3|         0|       14|        0|       1|      

Now we are going to do a word count with the rdd that we just created. 


In [60]:
#WordCount with RDD. Normally this would be used in conjunction with NLP to extract trending topics.
from operator import add
statuses = message_rdd.flatMap(lambda x: x)
words = statuses.flatMap(lambda x: x)
#se = statuses.flatMap(String)
counts = words.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)               
output = counts.takeOrdered(9, key = lambda x: -x[1])
print(output)
output = counts.sortByKey().collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
#Possibly save to HDFS 
#classic.saveAsTextFile("keys.txt")



[('', 752), ('boat', 169), ('anyone', 144), ('one', 116), ('get', 93)]
: 752
00: 1
01754: 1
02oct16: 1
0415: 1
0903: 1
0906: 1
0911: 1
0921: 1
0924: 1
0925: 2
1: 6
10: 13
100: 8
1000: 4
100000: 1
1000am: 1
1012: 1
1030: 1
1080hd: 2
10am2pm: 1
10ft: 1
10hours: 1
10th: 1
11: 6
110: 1
1100: 3
111130: 1
1130: 3
115: 1
119: 1
11am: 1
11ish: 1
12: 6
120: 1
1200: 6
123: 1
1230: 1
12th: 1
12v: 2
13: 6
1300: 2
14: 7
140: 1
142: 1
14225: 4
146ft: 1
15: 9
150: 1
1500: 1
15000cfs: 1
1540: 2
16: 1
165: 2
166: 1
167: 2
1675200: 1
169: 2
16ft: 1
17: 3
17000: 1
1726: 1
175: 2
176: 1
17a: 1
18: 4
185: 1
18901: 1
19: 1
190: 1
190200lbs: 1
1926: 1
1968: 1
1978: 1
1980: 2
1983: 1
1988: 1
1990: 1
1995: 1
1996: 1
19days: 1
1ft: 1
1oz: 1
1st: 2
2: 17
20: 6
200: 1
2000gph: 1
2002: 1
2007: 1
2009: 1
200lbs: 3
201: 1
2011…thanks: 1
2012: 1
2015: 2
2016: 6
20160531: 1
2017: 4
2028: 1
206: 1
22: 1
2200: 1
220lbs: 1
222: 1
23: 3
23rd: 1
24: 1
244: 1
245: 1
248: 1
249: 1
25: 2
250: 2
25000: 1
25th: 1
26: 1
27: 3
27

In [6]:
# Filter to extract known names of rivers and get useful URLS 
df2 = df.na.drop(subset=["status_link"])
df3 = df2.select("status_link")
df2.show()

+--------------------+--------------------+--------------------+-----------+--------------------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|           status_id|      status_message|           link_name|status_type|         status_link|   status_published|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|
+--------------------+--------------------+--------------------+-----------+--------------------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|160558090672531_1...|              Zoomie|                null|      video|https://www.faceb...|2016-12-21 19:12:13|           55|          13|         4|       50|        0|       5|        0|       0|         0|
|160558090672531_1...|Someone don't see...|                null|      video|https://www.faceb...|2016-12-21 17:40:37|           34|         

Okay so now we want to get the images and or videos. Uses for the images might include image search for our database (with the river extracted using NLP) or computer

In [15]:
#Now let's try to use these urls to get their the respective images or videos 
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
df2.createOrReplaceTempView("posts")
sqlDF = spark.sql("SELECT * FROM posts WHERE status_link LIKE '%photo%'")
sqlDF.show()
statusRDD = sqlDF.select('status_link').rdd
urls = statusRDD.flatMap(lambda x: x)
print(urls.take(3))
#Now let's save the images possibly for a Computer Vision application





+--------------------+--------------------+--------------------+-----------+--------------------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|           status_id|      status_message|           link_name|status_type|         status_link|   status_published|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|
+--------------------+--------------------+--------------------+-----------+--------------------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|160558090672531_1...|Doopey, Mr. Magoo...|                null|      photo|https://www.faceb...|2016-12-21 18:35:29|            9|           0|         0|        9|        0|       0|        0|       0|         0|
|160558090672531_1...|we needed more sa...|                null|      photo|https://www.faceb...|2016-12-21 18:25:58|           11|         

In [8]:
#More Natural Language Processing using NLP. Lemmatization, Chunking, and Tagging. 
import nltk
from nltk.stem.wordnet import WordNetLemmatizer
messages = filtered_final.select("filtered")
message_rdd=messages.rdd
message_rdd = message_rdd.flatMap(lambda x:x)
print(message_rdd.first())
pos_statuses = message_rdd.map(nltk.pos_tag)
print(pos_statuses.take(5))




['so,', 'rain', 'is', 'in', 'the', 'forecast', 'for', 'saturday', 'and', 'saturday', 'night,', 'which', 'means', 'we', 'may', 'get', 'some', 'natural', 'flow', 'for', 'christmas.', 'would', 'anyone', 'be', 'down', 'to', 'paddle', 'if', 'that', 'happens?', 'like', 'little', 'or', 'tellico', 'or', 'something?', 'rich,', 'holly,', 'alex,', 'aaaaaaanyone?']
[[('so,', 'NN'), ('rain', 'NN'), ('is', 'VBZ'), ('in', 'IN'), ('the', 'DT'), ('forecast', 'NN'), ('for', 'IN'), ('saturday', 'JJ'), ('and', 'CC'), ('saturday', 'JJ'), ('night,', 'NN'), ('which', 'WDT'), ('means', 'VBZ'), ('we', 'PRP'), ('may', 'MD'), ('get', 'VB'), ('some', 'DT'), ('natural', 'JJ'), ('flow', 'NN'), ('for', 'IN'), ('christmas.', 'NN'), ('would', 'MD'), ('anyone', 'NN'), ('be', 'VB'), ('down', 'VBN'), ('to', 'TO'), ('paddle', 'VB'), ('if', 'IN'), ('that', 'DT'), ('happens?', 'VBZ'), ('like', 'JJ'), ('little', 'JJ'), ('or', 'CC'), ('tellico', 'NN'), ('or', 'CC'), ('something?', 'NN'), ('rich,', 'NN'), ('holly,', 'NN'), ('a

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 20, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\pyspark\rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-8-51a3a2c0bf6e>", line 12, in <lambda>
  File "C:\Users\img\Anaconda3\lib\site-packages\nltk\stem\wordnet.py", line 40, in lemmatize
    lemmas = wordnet._morphy(word, pos)
  File "C:\Users\img\Anaconda3\lib\site-packages\nltk\corpus\reader\wordnet.py", line 1708, in _morphy
    if form in exceptions:
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\img\Documents\spark-2.0.2-bin-hadoop2.7\python\pyspark\rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-8-51a3a2c0bf6e>", line 12, in <lambda>
  File "C:\Users\img\Anaconda3\lib\site-packages\nltk\stem\wordnet.py", line 40, in lemmatize
    lemmas = wordnet._morphy(word, pos)
  File "C:\Users\img\Anaconda3\lib\site-packages\nltk\corpus\reader\wordnet.py", line 1708, in _morphy
    if form in exceptions:
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
