In [324]:
import geopandas as gpd
import shapely
from shapely.geometry import Point

import os
import pandas as pd
import numpy as np
import csv
import pickle
from datetime import datetime, date, time

#### This notebook is for testing PySpark algorithms/strategy. A non-notebook python script will be generated separately for use on NYU's Dumbo cluster.

In [9]:
# get a sparkContext as sc
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:43:17)
SparkSession available as 'spark'.


In [45]:
sc

<pyspark.context.SparkContext at 0x111b1bcd0>

In [175]:
with open('./data/region_shapes.pickle', 'rb') as handle:
    region_shapes = pickle.load(handle)

with open('./data/anoms.pickle', 'rb') as handle:
    anoms_key = pickle.load(handle)

In [176]:
region_shapes.set_index('EIA_Region', drop=True, inplace=True)
region_shapes

Unnamed: 0_level_0,geometry
EIA_Region,Unnamed: 1_level_1
California,(POLYGON ((-114.2997240000745 34.1588979998338...
Carolinas,(POLYGON ((-77.94237900003289 36.5445129996888...
Central,(POLYGON ((-102.5477049998308 46.2827879998399...
Florida,(POLYGON ((-82.11678699982296 24.5491439996071...
Mid Atlantic,(POLYGON ((-84.82944799969027 38.8971309999546...
Midwest,(POLYGON ((-89.87456199992369 34.3342880000746...
New England,(POLYGON ((-68.36510700029049 44.1014639998632...
New York,(POLYGON ((-72.16159800009964 41.1895390001785...
Northwest,(POLYGON ((-122.5850379996938 48.3951660000205...
Southern,(POLYGON ((-88.08681199961534 30.2598640003873...


In [178]:
anoms_df = pd.DataFrame(anoms_key, columns=['region', 'datetime'])
anoms_df.head()

Unnamed: 0,region,datetime
0,Central,2015-12-03
1,Central,2016-01-06
2,Central,2016-01-11
3,Central,2016-01-12
4,Central,2016-01-13


In [180]:
def parse_bq_date(stamp):
    """
    converts datetime objects into date strings suitable for google BigQuery SQL syntax
    """
    y = str(stamp.year)
    m = str(stamp.month)
    d = str(stamp.day)
    if len (m)==1:
        m = '0' + m
    if len (d)==1:
        d = '0' + d
    return int(y + m + d)

In [None]:
# get unique YYYYMMDD dates for BigQuery
sql_dates = list(set(map(lambda anom: parse_bq_date(anom[1]), anoms_key)))

In [181]:
anoms_dict = {}
for region, days in anoms_df.groupby('region'):
    anoms_dict[region] = (list(set(map(parse_bq_date, days.datetime))),
                         region_shapes.get_value(region, 'geometry'))

In [268]:
# write the anoms_dict to file, so we can use on compute cluster later
with open('./data/anoms_dict.pickle', 'wb') as handle:
    pickle.dump(anoms_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [269]:
LOCAL_PATH = './data/anoms*.gz'
TEST_PATH = './data/anomstest.csv'
#HDFS_PATH = ''

anoms = sc.textFile(TEST_PATH)

In [270]:
# convert all lines to UTF-8 encoding
anoms = anoms.map(lambda line: line.encode('utf-8'))

In [271]:
# get field indexes for RDD
list(enumerate(anoms.first().split(',')))

[(0, 'SQLDATE'),
 (1, 'Actor1Name'),
 (2, 'Actor1Type1Code'),
 (3, 'EventCode'),
 (4, 'QuadClass'),
 (5, 'AvgTone'),
 (6, 'ActionGeo_Fullname'),
 (7, 'ActionGeo_Lat'),
 (8, 'ActionGeo_Long'),
 (9, 'Actor1Geo_Fullname'),
 (10, 'Actor1Geo_Lat'),
 (11, 'Actor1Geo_Long')]

In [272]:
# remove header row
anoms = anoms.filter(lambda line: not line.startswith('SQL'))

In [273]:
anoms.take(5)

['20151218,,,043,1,0,"Akron, Ohio, United States",41.0814,-81.519,,,',
 '20150816,,,112,3,-12.3076923076923,"Akron, Ohio, United States",41.0814,-81.519,,,',
 '20160124,,,046,1,-0.89285714285715,"Akron, Ohio, United States",41.0814,-81.519,,,',
 '20160104,,,040,1,0.98792535675083,"Dover, Ohio, United States",40.5206,-81.474,,,',
 '20151001,,,050,1,-2.13333333333333,"Utica, Ohio, United States",39.4976,-84.1602,,,']

In [275]:
def get_point(lng, lat):
    return Point(float(lng), float(lat))

In [280]:
def parse_events(events):
    reader = csv.reader(events)
    # for each event record
    for event in reader:
        # continue to look for regional matches until one is found and yielded
        for region in anoms_dict.keys():
            # check if datestamp within the regional set of interesting dates
            try:
                if int(event[0]) in anoms_dict[region][0]:
                    # check if event location took place within region
                    point = get_point(event[8], event[7])
                    if point.within(anoms_dict[region][1]):
                        yield (region,          # region
                               (int(event[0]),   # date
                               str(event[1]),   # actorname
                               str(event[2]),   # actortype
                               str(event[3]),   # eventcode
                               int(event[4]),   # quadclass
                               float(event[5]))) #tone  
            except ValueError:
                pass

In [283]:
# map by partition for the initial reduction (heaviest lifting is here)
reduced_anoms = anoms.mapPartitions(parse_events)

In [284]:
reduced_anoms.take(10)

[('Texas', (20161220, '', '', '112', 3, -7.02875399361022)),
 ('Texas', (20170429, '', '', '112', 3, -11.6022099447514)),
 ('Northwest', (20161215, '', '', '130', 3, -4.31654676258993)),
 ('Texas', (20170109, '', '', '0874', 2, 0.0)),
 ('Texas', (20160811, '', '', '042', 1, -1.34408602150538)),
 ('Texas', (20170109, '', '', '060', 2, -5.34883720930232)),
 ('Texas', (20160811, '', '', '052', 1, 0.73529411764706)),
 ('Texas', (20151012, '', '', '036', 1, 4.29553264604811)),
 ('Texas', (20170108, '', '', '036', 1, 0.95969289827255)),
 ('Texas', (20161017, '', '', '042', 1, 1.47639739285009))]

In [333]:
def skip_missing(line, col_index):
    return line[1][col_index] != ''

In [334]:
def concat_for_count(line, col_index):
    return (line[0] + '_' + str(line[1][col_index]), 1)

In [335]:
def resplit_regions(line):
    return (line[0].split('_')[0], (line[0].split('_')[1], line[1]))

In [344]:
def get_top_n(v, n):
    return sorted(list(v), key=lambda x: x[1], reverse=True)[:n]

In [345]:
def get_top_values(col_index, n):
    """
    ARGS
    - col_index:
        1=actorname
        2=actortype
        3=eventcode
        4=quadclass
    - top n values
    """
    return reduced_anoms.filter(lambda event: skip_missing(event, col_index)) \
        .map(lambda event: concat_for_count(event, col_index)) \
        .reduceByKey(lambda x, y: x + y) \
        .map(resplit_regions) \
        .groupByKey() \
        .mapValues(lambda counts: get_top_n(counts, n)) \
        .collect()

In [347]:
# get average TONE, by region
reduced_anoms.filter(lambda event: skip_missing(event, 5)) \
        .map(lambda event: (event[0], event[1][5])) \
        .mapValues(lambda v: (v, 1)) \
        .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
        .mapValues(lambda v:v[0] / v[1]) \
        .collect()

[('Central', -2.5778306850976005),
 ('Southern', -2.329364268487252),
 ('New England', -1.5525235745149324),
 ('Florida', -2.388332896634119),
 ('California', -1.0773653697289698),
 ('TVA', -1.1582477193554037),
 ('Texas', -1.9694371392691459),
 ('Southwest', -1.6412355170788393),
 ('Northwest', -1.5152019623052377),
 ('Carolinas', -1.1544674277886582)]

In [346]:
get_top_values(1,3)

[('Central', [('UNITED STATES', 2), ('OKLAHOMA', 1), ('POLICE', 1)]),
 ('Southern', [('UNITED STATES', 3), ('SRI LANKA', 1), ('SWEDEN', 1)]),
 ('New England', [('INDUSTRY', 1), ('COLLEGE', 1), ('OBAMA', 1)]),
 ('Florida', [('UNITED STATES', 6), ('CHINA', 2), ('CUBAN', 2)]),
 ('California', [('UNITED STATES', 8), ('CHINA', 2), ('COMPANIES', 1)]),
 ('TVA', [('NASHVILLE', 6), ('UNITED STATES', 3), ('COMPANY', 1)]),
 ('Texas', [('UNITED STATES', 2), ('CHINESE', 1), ('PROFESSOR', 1)]),
 ('Southwest', [('MEXICO', 5), ('UNITED STATES', 4), ('NEW YORK', 1)]),
 ('Northwest', [('UNITED STATES', 12), ('DENVER', 4), ('POLICE', 4)]),
 ('Carolinas', [('UNITED STATES', 4), ('FIRE MARSHAL', 2), ('ARMY', 1)])]

In [342]:
get_top_values(2, 3)

[('Central', [('LEG', 1), ('ELI', 1), ('GOV', 1)]),
 ('Southern', [('COP', 1), ('GOV', 1), ('EDU', 1)]),
 ('New England', [('GOV', 2), ('EDU', 1), ('BUS', 1)]),
 ('Florida', [('GOV', 5), ('LEG', 2), ('JUD', 1)]),
 ('California', [('JUD', 2), ('CVL', 2), ('GOV', 2)]),
 ('TVA', [('ELI', 1), ('LEG', 1), ('BUS', 1)]),
 ('Texas', [('EDU', 2), ('REF', 1)]),
 ('Carolinas', [('GOV', 2), ('COP', 1), ('MIL', 1)]),
 ('Northwest', [('COP', 5), ('BUS', 4), ('MED', 3)]),
 ('Southwest', [('COP', 2), ('BUS', 1), ('GOV', 1)])]

In [329]:
get_top_values(4, 5)

[('Central', [('1', 15), ('3', 4), ('4', 1)]),
 ('Southern', [('1', 34), ('4', 8), ('3', 2), ('2', 1)]),
 ('New England', [('1', 3), ('4', 3), ('3', 1), ('2', 1)]),
 ('Florida', [('1', 56), ('4', 14), ('3', 7), ('2', 7)]),
 ('California', [('1', 156), ('4', 38), ('2', 20), ('3', 17)]),
 ('TVA', [('1', 20), ('4', 5), ('2', 3), ('3', 2)]),
 ('Texas', [('1', 52), ('4', 7), ('2', 7), ('3', 5)]),
 ('Southwest', [('1', 37), ('3', 10), ('4', 7), ('2', 4)]),
 ('Northwest', [('1', 69), ('4', 19), ('3', 10), ('2', 5)]),
 ('Carolinas', [('1', 27), ('4', 6), ('3', 3), ('2', 1)])]

In [330]:
get_top_values(3, 5)

[('Central', [('042', 4), ('040', 3), ('036', 2), ('043', 2), ('112', 2)]),
 ('Southern', [('042', 13), ('040', 6), ('043', 4), ('173', 3), ('051', 3)]),
 ('New England', [('061', 1), ('186', 1), ('160', 1), ('010', 1), ('046', 1)]),
 ('Florida', [('020', 11), ('043', 10), ('042', 8), ('040', 7), ('173', 5)]),
 ('California',
  [('042', 38), ('043', 27), ('040', 22), ('051', 17), ('036', 16)]),
 ('TVA', [('043', 5), ('042', 4), ('040', 3), ('190', 3), ('020', 3)]),
 ('Texas', [('042', 12), ('043', 11), ('036', 10), ('040', 7), ('190', 5)]),
 ('Carolinas', [('043', 6), ('042', 4), ('036', 4), ('051', 3), ('040', 3)]),
 ('Northwest',
  [('042', 16), ('043', 11), ('040', 10), ('173', 8), ('020', 6)]),
 ('Southwest', [('042', 11), ('040', 9), ('190', 6), ('051', 4), ('010', 4)])]

desired output

regionally
- most common actors, actor type, and event codes
- counts of quad classes
- average tone value