__Note to self: run this notebook in Python 2, as Spark was having compatibility issues with Python 3.__

# Matching Tweets with BART Schedules

- [Reformatting BART Data](#section1)
    - [Creating Station Schedules Dictionary](#stations)
    
- [Using Spark to Filter Tweets](#spark)

- [Situational Demo](#demo)
    - [Matching Tweets to BART Data](#matching)

<a id='section1'></a>

### Reformatting BART Data

In [53]:
import json

What stops are there?

In [54]:
with open('bart_schedules_by_route.json') as f:
    scheds = f.read().split('\n')

In [55]:
print(len(scheds))
scheds = scheds[:-1]  # Removing last item in list, which is an empty string
print(len(scheds))

13
12


In [56]:
# Looking at the layout of one particular train's schedule
sched0 = json.loads(scheds[0])
sched0['route']['train'][0]['stop']

[{u'@bikeflag': u'1', u'@origTime': u'4:02 AM', u'@station': u'PITT'},
 {u'@bikeflag': u'1', u'@origTime': u'4:08 AM', u'@station': u'NCON'},
 {u'@bikeflag': u'1', u'@origTime': u'4:12 AM', u'@station': u'CONC'},
 {u'@bikeflag': u'1', u'@origTime': u'4:17 AM', u'@station': u'PHIL'},
 {u'@bikeflag': u'1', u'@origTime': u'4:20 AM', u'@station': u'WCRK'},
 {u'@bikeflag': u'1', u'@origTime': u'4:25 AM', u'@station': u'LAFY'},
 {u'@bikeflag': u'1', u'@origTime': u'4:30 AM', u'@station': u'ORIN'},
 {u'@bikeflag': u'1', u'@origTime': u'4:35 AM', u'@station': u'ROCK'},
 {u'@bikeflag': u'1', u'@origTime': u'4:38 AM', u'@station': u'MCAR'},
 {u'@bikeflag': u'1', u'@origTime': u'4:42 AM', u'@station': u'19TH'},
 {u'@bikeflag': u'1', u'@origTime': u'4:43 AM', u'@station': u'12TH'},
 {u'@bikeflag': u'1', u'@origTime': u'4:48 AM', u'@station': u'WOAK'},
 {u'@bikeflag': u'1', u'@origTime': u'4:55 AM', u'@station': u'EMBR'},
 {u'@bikeflag': u'1', u'@origTime': u'4:56 AM', u'@station': u'MONT'},
 {u'@b

__Creating a dictionary of the station abbreviations and names (for matching to tweets)__

In [57]:
# Creating a dictionary which will contain the abbreviated stop/station as the key and a full(er) name
# as the value. This will be helpful when searching for information about the stations in the tweets.

stations = {}
for sched in scheds:
    r = json.loads(sched)
    for train in r['route']['train']:
        for stop in train['stop']:
            if stop['@station'] in stations:
                pass
            else:
                stations[stop['@station']] = ''  # For now saving value as empty string, placeholder

In [58]:
stations

{u'12TH': '',
 u'16TH': '',
 u'19TH': '',
 u'24TH': '',
 u'ASHB': '',
 u'BALB': '',
 u'BAYF': '',
 u'CAST': '',
 u'CIVC': '',
 u'COLM': '',
 u'COLS': '',
 u'CONC': '',
 u'DALY': '',
 u'DBRK': '',
 u'DELN': '',
 u'DUBL': '',
 u'EMBR': '',
 u'FRMT': '',
 u'FTVL': '',
 u'GLEN': '',
 u'HAYW': '',
 u'LAFY': '',
 u'LAKE': '',
 u'MCAR': '',
 u'MLBR': '',
 u'MONT': '',
 u'NBRK': '',
 u'NCON': '',
 u'OAKL': '',
 u'ORIN': '',
 u'PHIL': '',
 u'PITT': '',
 u'PLZA': '',
 u'POWL': '',
 u'RICH': '',
 u'ROCK': '',
 u'SANL': '',
 u'SBRN': '',
 u'SFIA': '',
 u'SHAY': '',
 u'SSAN': '',
 u'UCTY': '',
 u'WCRK': '',
 u'WDUB': '',
 u'WOAK': ''}

In [59]:
stations['12TH'] = '12th st oakland ciy center'
stations['16TH'] = '16th st mission'
stations['19TH'] = '19th st oakland'
stations['24TH'] = '24th st mission'
stations['ASHB'] = 'ashby'
stations['BALB'] = 'balboa'
stations['BAYF'] = 'bayfair'
stations['CAST'] = 'castro valley'
stations['CIVC'] = 'civic center'
stations['COLM'] = 'colma'
stations['COLS'] = 'coliseum'
stations['CONC'] = 'concord'
stations['DALY'] = 'daly city'
stations['DBRK'] = 'downtown berkeley'
stations['DELN'] = 'el cerrito del norte'
stations['DUBL'] = 'dublin'
stations['EMBR'] = 'embarcadero'
stations['FRMT'] = 'fremont'
stations['FTVL'] = 'fruitvale'
stations['GLEN'] = 'glen park'
stations['HAYW'] = 'hayward'
stations['LAFY'] = 'lafayette'
stations['LAKE'] = 'lake merritt'
stations['MCAR'] = 'macarthur'
stations['MLBR'] = 'millbrae'
stations['MONT'] = 'montgomery'
stations['NBRK'] = 'north berkeley'
stations['NCON'] = 'north concord'
stations['OAKL'] = 'oakland'
stations['ORIN'] = 'orinda'
stations['PHIL'] = 'pleasant hill'
stations['PITT'] = 'pittsburgh'
stations['PLZA'] = 'el cerrito plaza'
stations['POWL'] = 'powell'
stations['RICH'] = 'richmond'
stations['ROCK'] = 'rockridge'
stations['SANL'] = 'san leandro'
stations['SBRN'] = 'san bruno'
stations['SFIA'] = 'san francisco airport'
stations['SHAY'] = 'south hayward'
stations['SSAN'] = 'south san francisco'
stations['UCTY'] = 'union city'
stations['WCRK'] = 'walnut creek'
stations['WDUB'] = 'west dublin'
stations['WOAK'] = 'west oakland'

^ With this dictionary, it will be easier to match BART stops and schedules up with tweets.

In [60]:
from collections import Counter, defaultdict

In [61]:
# First train, first station on route
print(sched0['route']['train'][0]['stop'][0]['@station'], sched0['route']['train'][0]['stop'][0]['@origTime'])
# Next train, same station
print(sched0['route']['train'][1]['stop'][0]['@station'], sched0['route']['train'][1]['stop'][0]['@origTime'])

(u'PITT', u'4:02 AM')
(u'PITT', u'4:17 AM')


<a id='stations'></a>

__Creating a dictionary of the schedule for each individual station__

In [62]:
station_sched = defaultdict(list)
for sched in scheds:
    r = json.loads(sched)
    for train in r['route']['train']:
        # Train's route (ie _____-bound train)
        started_at = train['stop'][0]['@station']
        final_dest = train['stop'][-1]['@station']
        route = '{0} - {1}'.format(started_at, final_dest)
        for stop in train['stop']:
            if '@origTime' in stop.keys():
                station_sched[stop['@station']].append((route, stop['@origTime']))
            else:
                station_sched[stop['@station']].append((route, 'None'))

In [63]:
station_sched['WOAK']

[('PITT - MLBR', u'4:48 AM'),
 ('PITT - MLBR', u'5:03 AM'),
 ('PITT - MLBR', u'5:18 AM'),
 ('PITT - MLBR', u'5:33 AM'),
 ('PITT - MLBR', u'5:48 AM'),
 ('PITT - MLBR', u'6:03 AM'),
 ('PITT - MLBR', u'6:18 AM'),
 ('PITT - MLBR', u'6:33 AM'),
 ('PITT - MLBR', u'6:42 AM'),
 ('PITT - MLBR', u'6:48 AM'),
 ('PITT - MLBR', u'6:57 AM'),
 ('PITT - MLBR', u'7:03 AM'),
 ('PITT - MLBR', u'7:12 AM'),
 ('PITT - MLBR', u'7:18 AM'),
 ('PITT - MLBR', u'7:27 AM'),
 ('PITT - MLBR', u'7:33 AM'),
 ('PITT - MLBR', u'7:38 AM'),
 ('PITT - MLBR', u'7:42 AM'),
 ('PITT - MLBR', u'7:48 AM'),
 ('PITT - MLBR', u'7:53 AM'),
 ('PITT - MLBR', u'7:57 AM'),
 ('PITT - MLBR', u'8:03 AM'),
 ('PITT - MLBR', u'8:08 AM'),
 ('PITT - MLBR', u'8:12 AM'),
 ('PITT - MLBR', u'8:18 AM'),
 ('PITT - MLBR', u'8:27 AM'),
 ('PITT - MLBR', u'8:33 AM'),
 ('PITT - MLBR', u'8:42 AM'),
 ('PITT - MLBR', u'8:48 AM'),
 ('PITT - MLBR', u'8:57 AM'),
 ('PITT - MLBR', u'9:03 AM'),
 ('PITT - MLBR', u'9:12 AM'),
 ('PITT - MLBR', u'9:18 AM'),
 ('PITT - 

<a id='spark'></a>

# Spark


- [Jump to RDD's](#rdd)

In [1]:
sc

<pyspark.context.SparkContext at 0x108f9e850>

In [2]:
from pyspark import SparkContext
import json

Gathering tweets from files:

In [3]:
import os
for root, dirs, files in os.walk('/Users/brynstark/Stark gU/DSCI6007-student/FinalProject_BrynStark/tweets/'):
    filenames = files
print(filenames)

['12th st oakland.json', 'bay area.json', 'oakland.json', 'san francisco.json']


In [4]:
all_tweets = []
# For each of the JSON files with Twitter data
for name in filenames:
    path = 'tweets/' + name  # filename.json of tweets
    with open(path) as f:
        tweets_lst = f.read().split('\n') # Splitting up JSON objects, separated by new-line, and adding to list.
        tweets_lst = tweets_lst[:-1]
    for tweet in tweets_lst[:-1]:
        j_tweet = json.loads(tweet)
        all_tweets.append(j_tweet)

Checking the list of all tweets collected:

In [5]:
len(all_tweets)

7137

In [6]:
all_tweets[0]

{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Mon Jul 11 22:27:05 +0000 2016',
 u'entities': {u'hashtags': [{u'indices': [10, 14], u'text': u'Job'}],
  u'symbols': [],
  u'urls': [{u'display_url': u'goo.gl/4wy4il',
    u'expanded_url': u'http://goo.gl/4wy4il',
    u'indices': [90, 113],
    u'url': u'https://t.co/9mo6PRU6uS'}],
  u'user_mentions': []},
 u'favorite_count': 0,
 u'favorited': False,
 u'filter_level': u'low',
 u'geo': None,
 u'id': 752630317219913729,
 u'id_str': u'752630317219913729',
 u'in_reply_to_screen_name': None,
 u'in_reply_to_status_id': None,
 u'in_reply_to_status_id_str': None,
 u'in_reply_to_user_id': None,
 u'in_reply_to_user_id_str': None,
 u'is_quote_status': False,
 u'lang': u'en',
 u'place': None,
 u'possibly_sensitive': False,
 u'retweet_count': 0,
 u'retweeted': False,
 u'source': u'<a href="http://twitterfeed.com" rel="nofollow">twitterfeed</a>',
 u'text': u'Find this #Job &amp;More Sales Center Associate - Carrier Enterprise, LLC - R

In [7]:
all_tweets[40]['text']

u'Hotels present-day berkeley, california reviewed: oLY'

<a id='cleaner_tweets'></a>

In [105]:
# Taking only the info we may need from the JSON object
cleaner_tweets = []
for tweet in all_tweets:
    cleaned = (tweet['text'], tweet['created_at'], tweet['user']['screen_name'], tweet['id'])
    cleaner_tweets.append(cleaned)

In [106]:
len(cleaner_tweets)

7137

<a id='rdd'></a>

__Putting all_tweets into Spark RDD:__

In [8]:
sc

<pyspark.context.SparkContext at 0x108f9e850>

In [9]:
rdd_tweets = sc.parallelize(all_tweets)

In [10]:
rdd_tweets.take(1)

[{u'contributors': None,
  u'coordinates': None,
  u'created_at': u'Mon Jul 11 22:27:05 +0000 2016',
  u'entities': {u'hashtags': [{u'indices': [10, 14], u'text': u'Job'}],
   u'symbols': [],
   u'urls': [{u'display_url': u'goo.gl/4wy4il',
     u'expanded_url': u'http://goo.gl/4wy4il',
     u'indices': [90, 113],
     u'url': u'https://t.co/9mo6PRU6uS'}],
   u'user_mentions': []},
  u'favorite_count': 0,
  u'favorited': False,
  u'filter_level': u'low',
  u'geo': None,
  u'id': 752630317219913729,
  u'id_str': u'752630317219913729',
  u'in_reply_to_screen_name': None,
  u'in_reply_to_status_id': None,
  u'in_reply_to_status_id_str': None,
  u'in_reply_to_user_id': None,
  u'in_reply_to_user_id_str': None,
  u'is_quote_status': False,
  u'lang': u'en',
  u'place': None,
  u'possibly_sensitive': False,
  u'retweet_count': 0,
  u'retweeted': False,
  u'source': u'<a href="http://twitterfeed.com" rel="nofollow">twitterfeed</a>',
  u'text': u'Find this #Job &amp;More Sales Center Associate 

New RDD keeping only the information that's relevant for now:

In [15]:
rdd_cleaner = rdd_tweets.map(lambda twt: (twt['text'], twt['created_at'], twt['user']['screen_name'], twt['id']))

Tweets that contain the string "bart" (not case sensitive, can be substring):

In [22]:
rdd_cleaner.filter(lambda (text, time, screen_name, tweet_id): 'bart' in text.lower()) \
          .count()

21

In [52]:
rdd_cleaner.filter(lambda (text, time, screen_name, tweet_id): 'bart' in text.lower()) \
           .sortBy(lambda (text, time, screen_name, tweet_id): time, ascending=False) \
           .take(5)

[(u'RT @SFBART: Delays on Oakland Airport Connector 7/12-14 for maintenance work. https://t.co/NIMf49WPns',
  u'Mon Jul 11 23:25:42 +0000 2016',
  u'rdparr1',
  752645067483983872),
 (u'Kevin Bartlett only gets upset with the rules when it has to do with Richmond',
  u'Mon Jul 11 23:18:40 +0000 2016',
  u'isdon_isgood',
  752643297437376512),
 (u'RT @SFBART: Delays on Oakland Airport Connector 7/12-14 for maintenance work. https://t.co/NIMf49WPns',
  u'Mon Jul 11 23:16:29 +0000 2016',
  u'PlanBayArea',
  752642747463372801),
 (u'RT @SFBART: Delays on Oakland Airport Connector 7/12-14 for maintenance work. https://t.co/NIMf49WPns',
  u'Mon Jul 11 23:15:17 +0000 2016',
  u'transit_tweets',
  752642446358487040),
 (u"Who in Hayward? I'm finna Bart over there and knock over some of those gyms",
  u'Mon Jul 11 23:14:36 +0000 2016',
  u'MayneCharacter',
  752642273272139777)]

Tweets about 12th St. Oakland:

In [18]:
rdd_cleaner.filter(lambda (text, time, screen_name, tweet_id): ('oakland' in text.lower()) and ('12th') in text.lower()) \
           .collect()

[(u'RT @lpnotes: Is anyone interested in going to a fun, laid-back, free music hackathon on July 24th @ 1PM in Oakland near the 12th St. BART?\u2026',
  u'Mon Jul 11 04:31:39 +0000 2016',
  u'cj_floats',
  752359674612060161),
 (u'7/10/16;\nsprints!!! woke up early to shower! then bussed to bal park station &amp; bart to 12th street oakland w/ Gordon then ubered to alameda',
  u'Mon Jul 11 04:33:40 +0000 2016',
  u'erickaxliu',
  752360183536361472)]

<a id='demo'></a>

## Situational Demo

__Let's say you know you're going to be heading into Downtown Oakland from the Coliseum... should you take BART or should you drive?__

In [83]:
rdd_cleaner.filter(lambda (text, time, screen_name, tweet_id): ('oakland' in text.lower()) and ('protest' in text.lower())) \
           .count()

24

In [82]:
rdd_cleaner.filter(lambda (text, time, screen_name, tweet_id): ('oakland' in text.lower()) and ('protest' in text.lower())) \
           .sortBy(lambda t: t[1], ascending=False) \
           .take(5)

[(u'RT @WORLDSTAR: 1000+ Protesters take over  I-880 in Oakland... \U0001f440 https://t.co/R9Uhjmhq3T',
  u'Mon Jul 11 17:16:29 +0000 2016',
  u'Afiaaaa_2x',
  752552152611033088),
 (u'RT @KenWayneKTVU: BREAKING: All lanes closed 880 by protesters downtown #Oakland. https://t.co/AHntCUVyOw',
  u'Mon Jul 11 17:14:54 +0000 2016',
  u'melxnchole',
  752551753514573828),
 (u'RT @WORLDSTAR: 1000+ Protesters take over  I-880 in Oakland... \U0001f440 https://t.co/R9Uhjmhq3T',
  u'Mon Jul 11 17:08:17 +0000 2016',
  u'notsavv',
  752550087440629760),
 (u'RT @hautedamn: "Peaceful" protestors shutting down a highway in Oakland, California. https://t.co/sxBesgFCkE',
  u'Mon Jul 11 16:59:29 +0000 2016',
  u'TrumpSupport13',
  752547872957034496),
 (u'To all protesters #BlackLivesMatter be safe tonight. I see you #Oakland #Inglewood -Twilight 22 - Electric Kingdom https://t.co/ufhrDhO3I6',
  u'Mon Jul 11 04:52:01 +0000 2016',
  u'dxoxos14',
  752364802576621568)]

__Okay, quick change of plans, you'll be taking BART instead of driving. Or maybe you'll BART on over to join the protest. Either way, we'll have to check the relevant BART schedules.__

<a id='matching'></a>

## Matching the BART Schedule with Tweets

I know I need to take the Richmond train to get from the Coliseum to 12th Street Oakland City Center.

In [87]:
station_sched['COLS']

[('FRMT - RICH', u'4:26 AM'),
 ('FRMT - RICH', u'4:41 AM'),
 ('FRMT - RICH', u'4:56 AM'),
 ('FRMT - RICH', u'5:11 AM'),
 ('FRMT - RICH', u'5:26 AM'),
 ('FRMT - RICH', u'5:41 AM'),
 ('FRMT - RICH', u'5:56 AM'),
 ('FRMT - RICH', u'6:11 AM'),
 ('FRMT - RICH', u'6:26 AM'),
 ('FRMT - RICH', u'6:41 AM'),
 ('FRMT - RICH', u'6:56 AM'),
 ('FRMT - RICH', u'7:11 AM'),
 ('FRMT - RICH', u'7:26 AM'),
 ('FRMT - RICH', u'7:41 AM'),
 ('FRMT - RICH', u'7:56 AM'),
 ('FRMT - RICH', u'8:11 AM'),
 ('FRMT - RICH', u'8:26 AM'),
 ('FRMT - RICH', u'8:41 AM'),
 ('FRMT - RICH', u'8:56 AM'),
 ('FRMT - RICH', u'9:11 AM'),
 ('FRMT - RICH', u'9:26 AM'),
 ('FRMT - RICH', u'9:41 AM'),
 ('FRMT - RICH', u'9:56 AM'),
 ('FRMT - RICH', u'10:11 AM'),
 ('FRMT - RICH', u'10:26 AM'),
 ('FRMT - RICH', u'10:41 AM'),
 ('FRMT - RICH', u'10:56 AM'),
 ('FRMT - RICH', u'11:11 AM'),
 ('FRMT - RICH', u'11:26 AM'),
 ('FRMT - RICH', u'11:41 AM'),
 ('FRMT - RICH', u'11:56 AM'),
 ('FRMT - RICH', u'12:11 PM'),
 ('FRMT - RICH', u'12:26 PM'),


In [118]:
# The user can enter their station, the train route, and where they're going,
# and the output will be the day's train schedule.

def bart_schedule(station, my_route, destination):
    my_station = station_sched[station]
    schedule = []
    for stop in my_station:
        route, time = stop
        if route == my_route:
            schedule.append(time)
    return schedule, destination

In [127]:
schedule, destination = bart_schedule('COLS', 'FRMT - RICH', 'oakland')

The following code uses cleaner_tweets, defined [above](#cleaner_tweets).

In [132]:
def bart_tweets(destination):
    relevant = []
    for tweet in cleaner_tweets:
        text, created_at, screen_name, twt_id = tweet
        if destination in text:
            relevant.append(tweet)
    return sorted(relevant, key=(lambda r: r[1]))

In [134]:
bart_tweets(destination)[:5]

[(u'At the Oakland 880 protest I was on the front line reminding all the "white allies" that if they were serious they\'d be on the front lines',
  u'Mon Jul 11 02:59:58 +0000 2016',
  u'SpecialKay00',
  752336603331252224),
 (u'RT @SheHatesJacoby: Woahhhh RT @ArsalaiH: #Oakland (7/7/2016) Highway 880 shutdown #AltonSterling #BlackLivesMatter https://t.co/SWb1mMNM2r',
  u'Mon Jul 11 03:01:11 +0000 2016',
  u'gayflappybird',
  752336910236971008),
 (u'RT @ArsalaiH: #Oakland (7/7/2016) Highway 880 shutdown #AltonSterling #BlackLivesMatter https://t.co/Oal2wrhoyS',
  u'Mon Jul 11 03:03:16 +0000 2016',
  u'ItsMe_LadyJazzy',
  752337431194660864),
 (u'RT @WORLDSTAR: 1000+ Protesters take over  I-880 in Oakland... \U0001f440 https://t.co/R9Uhjmhq3T',
  u'Mon Jul 11 03:04:06 +0000 2016',
  u'Smiley_darien',
  752337643690602496),
 (u'RT @SpecialKay00: At the Oakland 880 protest I was on the front line reminding all the "white allies" that if they were serious they\'d be o\u2026',
  u'Mon Jul 