###Libraries

In [1]:
%matplotlib inline
import numpy as np
import scipy as sp
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import pandas as pd
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
pd.set_option('display.notebook_repr_html', True)
import seaborn as sns
sns.set_style("whitegrid")
sns.set_context("poster")

### METHOD 1: Using Pandas  Dataframe

In [2]:
#'''
# METHOD 1: convert csv file to pandas dataframe
colnames=['bounding_box','city','country','text','tid','timestamp','uid']
pandas_df = pd.read_csv('./tweets.csv', delimiter=',', skiprows=0, names=colnames, usecols=[1,2,3,4,5,6,7])
pandas_df.head(5)
pandas_df.country.head()
#'''

0    United States
1    United States
2           Brasil
3    United States
4           México
Name: country, dtype: object

#### Add state info

In [3]:
from geopy.geocoders import Nominatim
geolocator = Nominatim()
import ast

In [4]:
def find_tweet_address_v1(gps_polygon_text):
    gps_polygon_dict = ast.literal_eval(gps_polygon_text)
    location_dict = None
    longitude =  gps_polygon_dict['coordinates'][0][0][0]
    latitude =  gps_polygon_dict['coordinates'][0][0][1]
    tweetlocation = geolocator.reverse((latitude, longitude))
    tweetaddress = (tweetlocation.address)
    tweetaddress_fields = tweetaddress.split(',')
    country = tweetaddress_fields[-1].strip()
    try:
        street = tweetaddress_fields[0].strip()
        city = tweetaddress_fields[1].strip()
        county = tweetaddress_fields[2].strip()
        state = tweetaddress_fields[3].strip()
        zipcode = tweetaddress_fields[4].strip()
    except:
        street = tweetaddress_fields[0].strip()
        city = tweetaddress_fields[1].strip()
        county = ''
        state = ''
        zipcode = ''      
    location_dict = dict(street=street, city=city, county=county, state=state,zipcode=zipcode, country=country)
    return location_dict['state']
       
#example_input = "{u'type': u'Polygon', u'coordinates': [[[-122.436232, 47.495315], [-122.436232, 47.734561], [-122.224973, 47.734561], [-122.224973, 47.495315]]]}"
#locinfo = find_tweet_address(example_input)
#print locinfo

In [5]:
def find_tweet_address_v2(gps_polygon_text):
    #location_dict = None
    gps_polygon_fields = gps_polygon_text.split(':')
    longitude = gps_polygon_fields[2].split(',')[0].strip().replace("[", '')
    latitude = gps_polygon_fields[2].split(',')[1].strip().replace("]", '')
    tweetlocation = geolocator.reverse((latitude, longitude))
    tweetaddress = (tweetlocation.address)
    tweetaddress_fields = tweetaddress.split(',')
    country = tweetaddress_fields[-1].strip()
    try:
        #street = tweetaddress_fields[0].strip()
        #city = tweetaddress_fields[1].strip()
        #county = tweetaddress_fields[2].strip()
        state = tweetaddress_fields[3].strip()
        #zipcode = tweetaddress_fields[4].strip()
    except:
        #street = tweetaddress_fields[0].strip()
        #city = tweetaddress_fields[1].strip()
        #county = ''
        state = ''
        #zipcode = ''      
    #location_dict = dict(street=street, city=city, county=county, state=state,zipcode=zipcode, country=country)
    #return location_dict['state']
    return state


In [6]:
usdf = pandas_df[pandas_df.country == "United States"].head(100)
usdf['state'] = usdf.bounding_box.apply(lambda x: find_tweet_address_v2(x))  
usdf.head(5)

Unnamed: 0,bounding_box,city,country,text,tid,timestamp,uid,state
0,"{u'type': u'Polygon', u'coordinates': [[[-118....",Huntington Beach,United States,In need,of back rub,6.693751e+17,1448426000000.0,California
1,"{u'type': u'Polygon', u'coordinates': [[[-82.8...",Wheelersburg,United States,And I'm always tired but never of you,669375138958307328,1448426000000.0,3165522000.0,Ohio
3,"{u'type': u'Polygon', u'coordinates': [[[-117....",Rubidoux,United States,Lugo: I want to order pizza \rMe: let's do it ...,669375137716637696,1448426000000.0,2260613000.0,California
6,"{u'type': u'Polygon', u'coordinates': [[[-86.5...",Murfreesboro,United States,@_Shawncameron slides in dm 🤗😋 JK JK 😂🙃,669375139050627072,1448426000000.0,158503500.0,Tennessee
7,"{u'type': u'Polygon', u'coordinates': [[[-90.4...",Michigan,United States,@devilchasnme @SheriffClarke WE HAD SOME 11TH ...,669375139172261888,1448426000000.0,22429860.0,52768


##METHOD 2: Using Spark

In [7]:
#create spark dataframe
import findspark
findspark.init()
print findspark.find()
import pyspark
conf = (pyspark.SparkConf()
    .setMaster('local')
    .setAppName('pyspark')
    .set("spark.executor.memory", "4g"))
sc = pyspark.SparkContext(conf=conf)

/usr/local/opt/apache-spark/libexec


In [8]:
import sys
rdd = sc.parallelize(xrange(10),10)
rdd.map(lambda x: sys.version).collect()

['2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x86_64)| (default, Oct 19 2015, 18:31:17) \n[GCC 4.2.1 (Apple Inc. build 5577)]',
 '2.7.10 |Anaconda 2.3.0 (x8

In [9]:
# read csv to spark RDD then creat spark dataframe from it
alltweets_rdd = sc.textFile('./tweets.csv')
alltweets_rdd = alltweets_rdd.map(lambda line: line.split(","))
print "Total number of tweets: ", alltweets_rdd.count()
alltweets_rdd.take(2)

Total number of tweets:  37519


[[u'0',
  u'"{u\'type\': u\'Polygon\'',
  u" u'coordinates': [[[-118.082615",
  u' 33.628991]',
  u' [-118.082615',
  u' 33.756093]',
  u' [-117.91485',
  u' 33.756093]',
  u' [-117.91485',
  u' 33.628991]]]}"',
  u'Huntington Beach',
  u'United States',
  u'In need',
  u' of back rub',
  u'669375138903666692',
  u'1448426444160',
  u'1611125173'],
 [u'1',
  u'"{u\'type\': u\'Polygon\'',
  u" u'coordinates': [[[-82.87387",
  u' 38.688052]',
  u' [-82.87387',
  u' 38.76256]',
  u' [-82.811927',
  u' 38.76256]',
  u' [-82.811927',
  u' 38.688052]]]}"',
  u'Wheelersburg',
  u'United States',
  u"And I'm always tired but never of you",
  u'669375138958307328',
  u'1448426444173',
  u'3165522051']]

In [10]:
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
sqlsc=SQLContext(sc)

In [11]:
#create spark dataframe from pandas dataframe of csv file
#alltweets_df = sqlsc.createDataFrame(pandas_df)
#alltweets_df.printSchema()
#alltweets_df.show(5)
#ustweets_df = alltweets_df[alltweets_df['country'] == "United States"]
#ustweets_df = alltweets_df.filter(alltweets_df['country'] == "United States")
#print "Count of All tweets =", alltweets_df.count()
#print "Count of US tweets:", ustweets_df.count()
#ustweets_df.show(5)

In [12]:
#create spark dataframe from rdd of csv file
alltweets_rdd = sc.textFile('./tweets.csv')
alltweets_rdd = alltweets_rdd.map(lambda line: line.split(","))
header = alltweets_rdd.first()
#alltweets_rdd = alltweets_rdd.filter(lambda line:line != header)
#alltweets_df0 = alltweets_rdd.map(lambda line: Row(bounding_box=line[1:10], city=line[10], country=line[11], 
#                              text=line[12:-3], tid=line[-3], timestamp=line[-2], uid=line[-1])).toDF()
alltweets_df0 = alltweets_rdd.map(lambda line: Row(bounding_box=line[1:10], city=line[10], country=line[11], 
                              text=line[12:-3], tid=line[-3], timestamp=line[-2], uid=line[-1]))
alltweets_df = sqlsc.createDataFrame(alltweets_df0)
#alltweets_df.registerTempTable("alltweets_df")
alltweets_df.printSchema()

print type(alltweets_rdd)
print alltweets_rdd.count()
print type(alltweets_df)

root
 |-- bounding_box: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tid: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- uid: string (nullable = true)

<class 'pyspark.rdd.PipelinedRDD'>
37519
<class 'pyspark.sql.dataframe.DataFrame'>


In [13]:
# RDD actions like count() or take() do not seem to work 
# for the alltweets_df object. Is it because dataset is too big?
# Not sure why the following return an IndexError: "list index out of range"  
#alltweets_df.count()
#alltweets_df.take(2)
#alltweets_df.show(5)

In [14]:
#alltweets_df.head()
alltweets_df.first()

Row(bounding_box=[u'"{u\'type\': u\'Polygon\'', u" u'coordinates': [[[-118.082615", u' 33.628991]', u' [-118.082615', u' 33.756093]', u' [-117.91485', u' 33.756093]', u' [-117.91485', u' 33.628991]]]}"'], city=u'Huntington Beach', country=u'United States', text=[u'In need', u' of back rub'], tid=u'669375138903666692', timestamp=u'1448426444160', uid=u'1611125173')

In [15]:
alltweets_df.select("text").first()

Row(text=[u'In need', u' of back rub'])

In [16]:
tweet_states = alltweets_df.map(lambda r: find_tweet_address_v2(r.bounding_box))

In [17]:
tweet_states.take(3)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 17, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-12-b5c86b140645>", line 8, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:119)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:111)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-12-b5c86b140645>", line 8, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:119)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:111)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
