In [3]:
from functools import partial
import os
from pyspark.sql import Row, Column

NEIGHBORHOODS_CSV_FPATH = 'data/wiki_Neighborhoods_in_New_York_City.csv'

def import_neighborhoods(fname):
    '''
    import wikipedia neighborhoods file
    returns: dictionary where key is borough
        and value is list of neighborhoods
    '''
    with open(fname, 'r') as f:
        raw = f.read()
    
    lines = raw.split(os.linesep)
    
    assert len(lines)==59, "Neighborhoods file not proper length"
    
    boroughs = {}
    
    for line in lines:
        fields = line.split(',')
        borough = fields.pop(0)
        if borough not in boroughs:
            boroughs[borough]=[]
        for f in fields:
            if f:
                neighborhood = str.upper(f.strip())
                boroughs[borough].append(neighborhood)
    boroughs_dict = {'QUEENS': set(boroughs['Queens']+['QUEENS']),
                     'BROOKLYN': set(boroughs['Brooklyn']+['BROOKLYN']),
                     'MANHATTAN': set(boroughs['Manhattan']+['MANHATTAN','NEW YORK']),
                     'STATEN ISLAND': set(boroughs['Staten Island']+['STATEN ISLAND']),
                     'BRONX':set(boroughs['Bronx']+['BRONX'])}
    return boroughs_dict

def _city2borough(borough, neighborhoods):
    '''
    If borough is in the borough_list, return it
    Otherwise, apply the city-to-borough map
    '''
    orig_borough, city = borough_city_tuple
    borough_list = ['QUEENS','BROOKLYN','BRONX','STATEN ISLAND','MANHATTAN']
    try: #convert to upper if it's a string
        if str.upper(orig_borough) in borough_list:
            return (str.upper(orig_borough), city)
    except: #if orig_borough not a string, ignore error
        pass
    else: #if 
        #Return first borough that comes up
        for borough,hood_list in neighborhoods.items():
            try:
                if str.upper(city) in hood_list:
                    return (borough, city)
            except:
                return (None, city)
        return (None, city)

neighborhoods = import_neighborhoods(NEIGHBORHOODS_CSV_FPATH)
city2borough = partial(_city2borough, neighborhoods=neighborhoods)

In [4]:
df = spark.read.csv("data/311-all.csv", header=True)

In [5]:
# reduced_df = df.select(
#     'Unique Key', 
#     'Created Date',
#     'Complaint Type',
#     'Incident Zip',
#     'Incident Address',
#     'City',
#     'Borough',
#     'Latitude',
#     'Longitude'
# )

# reduced_df.write.csv(...)

In [None]:
df_alt = as_rdd.map(lambda row: (row['Unique Key'], row['Borough'], row['City'])) \
               .map(city2borough) \
               .map(lambda pair: Row(Key=pair[0], Borough=pair[1], City=pair[2])) \
               .toDF()

In [None]:
from pyspark.sql.types import IntegerType

df = df.withColumn('Unique Key', df['Unique Key'].cast(IntegerType()))
df_alt = df_alt.withColumn('Key', df_alt['Key'].cast(IntegerType()))

df.createOrReplaceTempView('origin_df')
df_alt.createOrReplaceTempView('new_df')

In [None]:
new_df = sqlContext.sql("""
        SELECT 
            origin_df.*, 
            new_df.Borough AS Borough_2 
        FROM 
            origin_df 
            INNER JOIN new_df ON origin_df.`Unique Key` = new_df.`Key`
    """)

new_df.show()

In [None]:
new_df = new_df.drop('Borough').withColumnRenamed('Borough_2', 'Borough')

In [None]:
def clean_borough(df):
    
    from pyspark.sql import IntegerType
    
    df_alt = (
        df.rdd.map(lambda row: (row['Unique Key'], row['Borough'], row['City']))
              .map(city2borough)
              .map(lambda pair: Row(Key=pair[0], Borough=pair[1], City=pair[2]))
              .toDF()
    )
    
    df = df.withColumn('Unique Key', df['Unique Key'].cast(IntegerType()))
    df_alt = df_alt.withColumn('Key', df_alt['Key'].cast(IntegerType()))
    
    df.createOrReplaceTempView('origin_df')
    df_alt.createOrReplaceTempView('new_df')
    
    new_df = sqlContext.sql("""
        SELECT 
            origin_df.*, 
            new_df.Borough AS Borough_2 
        FROM 
            origin_df 
            INNER JOIN new_df ON origin_df.`Unique Key` = new_df.`Key`
    """)
    
    new_df = new_df.drop('Borough').withColumnRenamed('Borough_2', 'Borough')
    
    return new_df

In [None]:
new_df = clean_borough(df)

In [None]:
new_df.explain()