In [1]:
import dask.dataframe as dd
import pandas as pd
import json, ast
from haversine import haversine, Unit

For each restaurant (‘Restaurants_in_Durham_County_NC.csv) with “status”=“ACTIVE” and “"rpt_area_desc"="Food Service", show the number of foreclosures (‘durham-nc-foreclosure-2006-2016’) within a radius of 1 mile of the restaurant’s coordinates.

In [2]:
def format_data(file):
    with open(file, 'r') as f:
        data = json.load(f)
    return pd.json_normalize(data)

restaurants = dd.from_pandas(format_data('shared/hw2/Restaurants_in_Durham_County_NC.json'))
foreclosure = dd.from_pandas(format_data('shared/hw2/durham-nc-foreclosure-2006-2016.json'))
print(restaurants.head(10))
print(foreclosure.head(10))

          datasetid                                  recordid  \
0  restaurants-data  1644654b953d1802c3c941211f61be1f727b2951   
1  restaurants-data  93573dbf8c9e799d82c459e47de0f40a2faa47bb   
2  restaurants-data  0d274200c7cef50d05dd633bc5da896a69e1bb99   
3  restaurants-data  cf3e0b175a6ebad2a9aa8edb4b3985c6391ed31d   
4  restaurants-data  e796570677f7c39cc90d4870f192a9beb928560a   
5  restaurants-data  90cdb7722ea7d4ffde931d1b9c45fec2ad97378e   
6  restaurants-data  a777677a2690466efa03b77e411566eecd76d26f   
7  restaurants-data  f8e7a2243f9edb2cd5d08c13add936bd43eca09c   
8  restaurants-data  b13c50f485bd92b042fd065523c58e5480531f9b   
9  restaurants-data  f71fd314be631c0fe1226cf4485385d992ea45b6   

            record_timestamp fields.status         fields.geolocation  \
0  2017-07-13T09:15:31-04:00        ACTIVE  [35.9207272, -78.9573299]   
1  2017-07-13T09:15:31-04:00        ACTIVE  [36.0467802, -78.8895483]   
2  2017-07-13T09:15:31-04:00        ACTIVE  [35.9182655, -78.9593

In [3]:
res_filtered = restaurants[(restaurants['fields.status'] == 'ACTIVE') & (restaurants['fields.rpt_area_desc'] == 'Food Service')]
print(res_filtered.head(10))

           datasetid                                  recordid  \
0   restaurants-data  1644654b953d1802c3c941211f61be1f727b2951   
1   restaurants-data  93573dbf8c9e799d82c459e47de0f40a2faa47bb   
2   restaurants-data  0d274200c7cef50d05dd633bc5da896a69e1bb99   
3   restaurants-data  cf3e0b175a6ebad2a9aa8edb4b3985c6391ed31d   
10  restaurants-data  6c31810f57775266e2d88f58a38ab21bffc8e267   
11  restaurants-data  3920e7cdf00cbfb2c2e1d40296ac0d1151939fb0   
12  restaurants-data  4a803ed1eb23942b7360fbac36efa80d3f599ef6   
13  restaurants-data  f89ee52d319aab04c91b097353ac853a38c85043   
14  restaurants-data  eaf7b61888aad93d1a132e628998258615d8aff7   
19  restaurants-data  290c641ae1be2b74ddb841e7ccded4f5b3144149   

             record_timestamp fields.status         fields.geolocation  \
0   2017-07-13T09:15:31-04:00        ACTIVE  [35.9207272, -78.9573299]   
1   2017-07-13T09:15:31-04:00        ACTIVE  [36.0467802, -78.8895483]   
2   2017-07-13T09:15:31-04:00        ACTIVE  [35.91

In [4]:
foreclosure = foreclosure.compute()
foreclosure = pd.DataFrame(foreclosure['geometry.coordinates'])
print(foreclosure.head(10))

        geometry.coordinates
0  [-78.8922549, 36.0013755]
1    [-78.895396, 35.995797]
2   [-78.8950321, 35.995413]
3  [-78.8873774, 35.9957683]
4    [-78.888343, 35.993026]
5     [-78.888092, 35.99217]
6   [-78.886681, 35.9865799]
7  [-78.8806365, 35.9815968]
8    [-78.874621, 35.992438]
9    [-78.869642, 35.985903]


In [5]:
def foreclosure_count(row, foreclosures):
    count = 0
    try:
        float_list = ast.literal_eval(row['geometry.coordinates'])
        r_lat, r_lon = float_list[0], float_list[1]
    except:
        count = 0 
        return count
        
    for index, row in foreclosures.iterrows():
        # Access geocode data for each foreclosure entry.
        coords = row['geometry.coordinates']
        try:
            float_list = ast.literal_eval(coords)
            f_lat, f_lon = float_list[0], float_list[1]
        except:
            continue
        distance = haversine((r_lat, r_lon), (f_lat, f_lon), unit=Unit.MILES)    
        if distance <= 1:  # 1 mile in km
            count += 1
    return count

res_filtered['count_closed'] = res_filtered.apply(lambda row: foreclosure_count(row, foreclosure), axis=1)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=(None, 'int64'))



In [6]:
# Sort the DataFrame by 'foreclosure_count' in descending order.
sorted_restaurants_df = res_filtered.sort_values(by='count_closed', ascending=False)

# Print the top 10 results from the sorted DataFrame.
print(sorted_restaurants_df[['fields.premise_name', 'count_closed']].head(10))

          fields.premise_name  count_closed
316    LOS PRIMOS SUPERMARKET           764
988   BURTON SCHOOL LUNCHROOM           760
1118         TATER BREAD CAFE           757
1443   AMERICAN LEGION POST 7           752
1767   SAVE-A-LOT MEAT MARKET           742
1164          THE COTTON ROOM           742
205     QUICK LUNCH RICO TACO           735
1002        TOWN DELI GROCERY           734
280              JC'S KITCHEN           721
1777  GLOBAL SCHOLARS ACADEMY           719
