# (Walkthrough RDD)  - Apache Spark

We will be looking at the [Core Places](https://docs.safegraph.com/v4.0/docs#section-core-places) and the [Weekly Pattern](https://docs.safegraph.com/v4.0/docs/places-schema#section-patterns) data sets to answer the following two inquiries:


1.   How many restaurants in NYC were closed right when the city shut down on March 17, 2020, and how many were closed by April 1, 2020?

2.   For those that were open on/after April 1, 2020 in [1], which ones still received a high volume of visits (in any day on/after April 1)? What were the median dwelling time at each  establishment in the first week of March (3/2-3/9) and in the first week of April (3/30-4/6)?

### Definitions

* *NYC*: a restaurant is considered to be in NYC if its city is `'New York'`, `'Brooklyn'`, `'Queens'`, `'Bronx'`, or `'Staten Island'`.

* *Open*: a restaurant is considered open for a day if it has 1 or more visitors reported on that day in the *Weekly Pattern* data set.

* *High Volume of Visits*: a restaurant is considered to receive a high volume of visitors if it has 50 or more visits on a day reported on that day in the *Weekly Pattern* data set.

* *Median Dwelling Time*: though the *Weekly Pattern* report the median dwelling time in the **`median_dwell`** field, we would like to exclude those staying more than 4 hours (mostly employees) when calculating our median dwelling time. Thus, the *median dwelling time* should be computed from the **`bucketed_dwell_times`** without the **>240** bucket. The median dwelling time should only have one of the values `'<5'`, `'5-10'`, `'11-20'`, `'21-60'`, `'61-120'`, `'121-240'`, or `'N/A'` if it could not be determined.

 ## Requirements

* You have to use Spark for this project.

* Our data sets (`core_poi_ny.csv` and `nyc_restaurant_pattern.csv`) are assumed to be on HDFS, and could only be accessed using Spark (either as a Spark's DataFrame or an RDD).

* You are not allowed to collect the raw data to the notebook and process them without using Spark. However, it is okay to collect intermediate data for processing. Just try to collect as little as possible.

## Links to download the data sets
* Nyc-restaurant dataset datasets: https://drive.google.com/uc?id=1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d
* Core place dataset:   https://drive.google.com/uc?id=1ZK8ql8arn0pkIJZIfknNscKXn85L9ZXX


## Environment Setup

In [1]:
!gdown --id 1ZK8ql8arn0pkIJZIfknNscKXn85L9ZXX -O core_poi_ny.csv
!gdown --id 1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d -O nyc_restaurant_pattern.csv
!pip install pyspark

import csv
import datetime
import json
import pyspark
sc = pyspark.SparkContext()
sc

Downloading...
From: https://drive.google.com/uc?id=1ZK8ql8arn0pkIJZIfknNscKXn85L9ZXX
To: /content/core_poi_ny.csv
100% 95.6M/95.6M [00:01<00:00, 93.9MB/s]
Downloading...
From: https://drive.google.com/uc?id=1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d
To: /content/nyc_restaurant_pattern.csv
100% 101M/101M [00:01<00:00, 59.0MB/s] 
Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=25f40dfb17746e98e5024cbc7e86489ea7625a423dc503e1f633e68f2a01a9ab
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built 

## Task 1
*Question: How many restaurants in NYC were closed right when the city shut down on March 17, 2020, and how many were closed by April 1, 2020?*

The final output of Task 1 solution should be in the following form (with AAA and BBB being your actual computation):
```
The number of restaurants in NYC closed by March 17, 2020: 49
The number of restaurants in NYC closed by April 01, 2020: 496
```

### Data preparation

We will look at records of only 3 different restaurants in NYC. After this, we can treat **`weekly_pattern`** as the RDD pointing to the NYC restaurant pattern.



In [2]:
RESTAURANTS = set(['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
                   'sg:d3fdb6458c544bb687d2da3eb1c8e28e',
                   'sg:0e86fc3cfbc1417fab6a5ef1b4a63026'])

weekly_pattern = sc.textFile('nyc_restaurant_pattern.csv') \
    .filter(lambda x: next(csv.reader([x]))[1] in RESTAURANTS) \
    .cache()
weekly_pattern.collect()

['229-223@627-s4r-2hq,sg:d3fdb6458c544bb687d2da3eb1c8e28e,,,All Stars Sports Bar &amp Grill,327 W 57th St,New York,NY,10019,US,,,2020-03-02T00:00:00-05:00,2020-03-09T00:00:00-04:00,25,25,"[3,7,2,3,3,4,3]","[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,1,0,1,0,0,0,0,2,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0]",360610139007,"{""360610135002"":4,""360470566001"":4,""360470247002"":4,""130890212171"":4,""360810169001"":4,""360810119002"":4}","{""360610129002"":4,""340258102002"":4,""360810117002"":4,""360810169001"":4}","{""US"":22}",10591,29.0,"{""<5"":2,""5-10"":5,""11-20"":4,""21-60"":4,""61-120"":6,""121-240"":3,"">240"":1}","{""Morgenthal Frederics"":33,""CVS"":33,""Sur La Table"":25}","{""Walgreens"":21,""sweetgreen"":16,""Dunkin\'"":14,""CVS"":14,""Morg

In [3]:
RESTAURANTS = set(['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
                   'sg:d3fdb6458c544bb687d2da3eb1c8e28e',
                   'sg:0e86fc3cfbc1417fab6a5ef1b4a63026'])

weekly_pattern = sc.textFile('nyc_restaurant_pattern.csv') \
    .filter(lambda x: next(csv.reader([x]))[1] in RESTAURANTS) \
    .cache()
weekly_pattern.collect()

['229-223@627-s4r-2hq,sg:d3fdb6458c544bb687d2da3eb1c8e28e,,,All Stars Sports Bar &amp Grill,327 W 57th St,New York,NY,10019,US,,,2020-03-02T00:00:00-05:00,2020-03-09T00:00:00-04:00,25,25,"[3,7,2,3,3,4,3]","[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,1,0,1,0,0,0,0,2,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0]",360610139007,"{""360610135002"":4,""360470566001"":4,""360470247002"":4,""130890212171"":4,""360810169001"":4,""360810119002"":4}","{""360610129002"":4,""340258102002"":4,""360810117002"":4,""360810169001"":4}","{""US"":22}",10591,29.0,"{""<5"":2,""5-10"":5,""11-20"":4,""21-60"":4,""61-120"":6,""121-240"":3,"">240"":1}","{""Morgenthal Frederics"":33,""CVS"":33,""Sur La Table"":25}","{""Walgreens"":21,""sweetgreen"":16,""Dunkin\'"":14,""CVS"":14,""Morg

### Showing all column names and indices

In [4]:
tuple(enumerate(sc.textFile('nyc_restaurant_pattern.csv').first().split(',')))

((0, '"placekey"'),
 (1, '"safegraph_place_id"'),
 (2, '"parent_placekey"'),
 (3, '"parent_safegraph_place_id"'),
 (4, '"location_name"'),
 (5, '"street_address"'),
 (6, '"city"'),
 (7, '"region"'),
 (8, '"postal_code"'),
 (9, '"iso_country_code"'),
 (10, '"safegraph_brand_ids"'),
 (11, '"brands"'),
 (12, '"date_range_start"'),
 (13, '"date_range_end"'),
 (14, '"raw_visit_counts"'),
 (15, '"raw_visitor_counts"'),
 (16, '"visits_by_day"'),
 (17, '"visits_by_each_hour"'),
 (18, '"poi_cbg"'),
 (19, '"visitor_home_cbgs"'),
 (20, '"visitor_daytime_cbgs"'),
 (21, '"visitor_country_of_origin"'),
 (22, '"distance_from_home"'),
 (23, '"median_dwell"'),
 (24, '"bucketed_dwell_times"'),
 (25, '"related_same_day_brand"'),
 (26, '"related_same_week_brand"'),
 (27, '"device_type"'))

### A. [TODO] Complete the transformation to keep only `safegraph_place_id`, `date_range_start`, `date_range_end`, and `visits_by_day` for each record.

In [5]:
rddA = weekly_pattern \
    .map(lambda x: next(csv.reader([x]))) \
    .map(lambda x: [x[1],x[12],x[13],x[16]])
rddA.collect()

[['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  '2020-03-02T00:00:00-05:00',
  '2020-03-09T00:00:00-04:00',
  '[3,7,2,3,3,4,3]'],
 ['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
  '2020-03-02T00:00:00-05:00',
  '2020-03-09T00:00:00-04:00',
  '[1,2,0,1,3,0,1]'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  '2020-03-02T00:00:00-05:00',
  '2020-03-09T00:00:00-04:00',
  '[114,112,76,106,97,59,56]'],
 ['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  '2020-03-09T00:00:00-04:00',
  '2020-03-16T00:00:00-04:00',
  '[0,7,5,3,4,2,2]'],
 ['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
  '2020-03-09T00:00:00-04:00',
  '2020-03-16T00:00:00-04:00',
  '[2,0,0,1,1,0,0]'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  '2020-03-09T00:00:00-04:00',
  '2020-03-16T00:00:00-04:00',
  '[101,116,106,100,96,48,40]'],
 ['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  '2020-03-16T00:00:00-04:00',
  '2020-03-23T00:00:00-04:00',
  '[3,2,2,1,3,2,0]'],
 ['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
  '2020-03-16T00:00:00-04:00',
  '2020-03-23T00:00:00-04:00',
 

### B. [TODO] Transform `rddA` to keep any record that reported on or after March 17, i.e. as long as the `date_range_end` is later than March 17.

In [6]:
rddB = rddA.filter(lambda x: datetime.datetime.fromisoformat(x[2]).day > 17 or datetime.datetime.fromisoformat(x[2]).month > 3)
rddB.collect()

[['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  '2020-03-16T00:00:00-04:00',
  '2020-03-23T00:00:00-04:00',
  '[3,2,2,1,3,2,0]'],
 ['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
  '2020-03-16T00:00:00-04:00',
  '2020-03-23T00:00:00-04:00',
  '[1,0,0,0,0,0,0]'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  '2020-03-16T00:00:00-04:00',
  '2020-03-23T00:00:00-04:00',
  '[69,74,97,104,71,37,33]'],
 ['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  '2020-03-23T00:00:00-04:00',
  '2020-03-30T00:00:00-04:00',
  '[1,2,3,2,1,0,0]'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  '2020-03-23T00:00:00-04:00',
  '2020-03-30T00:00:00-04:00',
  '[62,78,73,66,52,33,24]'],
 ['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  '2020-03-30T00:00:00-04:00',
  '2020-04-06T00:00:00-04:00',
  '[1,1,0,0,0,0,0]'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  '2020-03-30T00:00:00-04:00',
  '2020-04-06T00:00:00-04:00',
  '[60,70,57,68,46,41,40]']]

### C. [TODO] Complete the `getVisits` function below which takes a record similar to those in `rddB`, and return the `safegraph_place_id`, and `visits_by_day` filtered to those on or after March 17.

In [7]:
testC1 = ('sg:119eeebd419b415992de4aaca2b22860',
         '2020-03-16T00:00:00-04:00',
         '2020-03-23T00:00:00-04:00',
         '[8,5,11,11,10,8,2]')
testC2 = ('sg:119eeebd419b415992de4aaca2b22860',
         '2020-03-23T00:00:00-04:00',
         '2020-03-30T00:00:00-04:00',
         '[5,9,15,6,7,5,1]')

def getVisits(x):
    haha = datetime.datetime.fromisoformat(x[1])
    if haha.month > 3 or haha.day >= 17:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      visits = [int(x) for x in visits]
      return x[0], visits
    else:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      visits = visits[(17 - haha.day):]
      visits = [int(x) for x in visits]
      return x[0], visits

print(getVisits(testC1)) # should have 6 values in visits_by_day
print(getVisits(testC2)) # should have 7 values in visits_by_day

('sg:119eeebd419b415992de4aaca2b22860', [5, 11, 11, 10, 8, 2])
('sg:119eeebd419b415992de4aaca2b22860', [5, 9, 15, 6, 7, 5, 1])


### D. Make use of the `getVisits()` function above, we can gather all the the visits for each restaurant from `rddB`.

In [8]:
rddD = rddB.map(getVisits)
rddD.collect()

[('sg:d3fdb6458c544bb687d2da3eb1c8e28e', [2, 2, 1, 3, 2, 0]),
 ('sg:2cbad77e421c4ccb8ffd20d2a6b81f78', [0, 0, 0, 0, 0, 0]),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', [74, 97, 104, 71, 37, 33]),
 ('sg:d3fdb6458c544bb687d2da3eb1c8e28e', [1, 2, 3, 2, 1, 0, 0]),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', [62, 78, 73, 66, 52, 33, 24]),
 ('sg:d3fdb6458c544bb687d2da3eb1c8e28e', [1, 1, 0, 0, 0, 0, 0]),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', [60, 70, 57, 68, 46, 41, 40])]

### E. [TODO] Transform `rddD` to compute the maximum number of daily visits for each record.

In [9]:
rddE = rddD.mapValues(lambda x: max(x))
rddE.collect()

[('sg:d3fdb6458c544bb687d2da3eb1c8e28e', 3),
 ('sg:2cbad77e421c4ccb8ffd20d2a6b81f78', 0),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', 104),
 ('sg:d3fdb6458c544bb687d2da3eb1c8e28e', 3),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', 78),
 ('sg:d3fdb6458c544bb687d2da3eb1c8e28e', 1),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', 70)]

### F. [TODO] "Reduce" the records in `rddE` so that for each restaurant id we only have a single maximum number of daily visits (i.e. the max of all values in `rddE` for each id).

In [10]:
rddF = rddE.reduceByKey(lambda x,y: max(int(x),int(y)))
rddF.collect()

[('sg:d3fdb6458c544bb687d2da3eb1c8e28e', 3),
 ('sg:2cbad77e421c4ccb8ffd20d2a6b81f78', 0),
 ('sg:0e86fc3cfbc1417fab6a5ef1b4a63026', 104)]

### G. [TODO] Count how many records (aka. restaurants) in `rddF` that have the maximum number of visits of 0.

In [11]:
numClosed = rddF.filter(lambda x: x[1] == 0).count()
numClosed

1

### H. [TODO] Adapt the steps above to count the number of closed restaurants from April 1 instead of March 17. Note that we do not have to report a restaurant is closed if there is no data after April 1.

In [23]:
RESTAURANTS = set(['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
                   'sg:d3fdb6458c544bb687d2da3eb1c8e28e',
                   'sg:0e86fc3cfbc1417fab6a5ef1b4a63026'])

weekly_pattern = sc.textFile('nyc_restaurant_pattern.csv') \
    .filter(lambda x: next(csv.reader([x]))[1] in RESTAURANTS) \
    .cache()
weekly_pattern.collect()
rddA = weekly_pattern \
    .map(lambda x: next(csv.reader([x]))) \
    .map(lambda x: [x[1],x[12],x[13],x[16]])

def getVisitsOther(x):
    haha = datetime.datetime.fromisoformat(x[1])
    if haha.month == 4 and haha.day >= 1:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      temp = [int(haha) for haha in visits]
      return x[0], temp
    elif haha.month > 4:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      temp = [int(haha) for haha in visits]
      return x[0], temp
    else:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      visits = visits[(32 - haha.day):]
      if len(visits) == 0:
        visits.append('9999')
      temp = [int(haha) for haha in visits]
      return x[0], temp

rddBB = rddA.filter(lambda x: datetime.datetime.fromisoformat(x[2]).month >= 4 and datetime.datetime.fromisoformat(x[2]).day >= 1)
rddD = rddBB.map(getVisitsOther)
rddE = rddD.mapValues(lambda x: x[1])
rddF = rddE.reduceByKey(lambda x,y: max(x, y))
numClosed = rddF.filter(lambda x: x[1] == 0).count()
print(numClosed)

1


### I. [TODO] Replace `weekly_pattern` with `sc.textFile('nyc_restaurant_pattern.csv')` and **filter** to NYC cities to run on the full data set for April 1.

In [24]:
weekly_pattern1 = sc.textFile('nyc_restaurant_pattern.csv') \
    .filter(lambda x: next(csv.reader([x]))[1] != "safegraph_place_id") \
    .cache()
rddGGG = weekly_pattern1 \
    .map(lambda x: next(csv.reader([x]))) \
    .map(lambda x: [x[1],x[12],x[13],x[16]])

rddHHH = rddGGG.filter(lambda x: datetime.datetime.fromisoformat(x[2]).month >= 4)

def getVisitsOther(x):
    haha = datetime.datetime.fromisoformat(x[1])
    if haha.month == 4 and haha.day >= 1:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      temp = [int(haha) for haha in visits]
      return x[0], temp
    elif haha.month > 4:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      temp = [int(haha) for haha in visits]
      return x[0], temp
    else:
      visits = x[3].split('[')[1].split(']')[0].split(',')
      visits = visits[(32 - haha.day):]
      if len(visits) == 0:
        visits.append('9999')
      temp = [int(haha) for haha in visits]
      return x[0], temp
rddIII = rddHHH.map(getVisitsOther)
rddJJJ = rddIII.mapValues(lambda x: max(x))
rddKKK = rddJJJ.reduceByKey(lambda x,y: max(x, y))
numClosed = rddKKK.filter(lambda x: x[1] == 0).count()
print( numClosed)

496


## Task 2
*Question: For those that were open on/after April 1, 2020 in [1], which ones still received a high volume of visits (in any day on/after April 1)? What were the median dwelling time at each establishment in the first week of March (3/2-3/9) and in the first week of April (3/30-4/6)?*

The final output of Task 2 should be a CSV-like format (each establishment per line) **sorted alphabetically by Restaurant_Name**:
```
Restaurant_Name,Street_Address,City,Median_Dwell_Bucket_March,Median_Dwell_Bucket_April
```

Expected output:
```
3 In 1 Kitchen,4902 Fort Hamilton Pkwy,Brooklyn,21-60,21-60
Agape Cafe,655 W 34th St,New York,21-60,21-60
Buffalo Wild Wings,632 Gateway Dr,Brooklyn,21-60,11-20
Burger King,2800 Hylan Blvd,Staten Island,11-20,11-20
Cafe Deli cious,491 1st Ave,New York,21-60,21-60
Cinnabon,1313 Broadway,New York,11-20,11-20
Dunkin',150 B Greaves Laneevergreen Plaza,New York,11-20,21-60
Dunkin',1752 Shore Pkwybjs Wholesale Club,New York,21-60,21-60
Dunkin',2449 Veterans Rd Wshop Rite,New York,21-60,21-60
Dunkin',590 Gateway Dr,Brooklyn,21-60,21-60
Dunkin',625 Atlantic Aveatlantic Center Mall,New York,21-60,21-60
Dunkin',6620 Avenue U,Brooklyn,11-20,11-20
Food Express Truck,2501 Forest Ave Across From Home Depot,New York,11-20,21-60
Golden Krust Caribbean Bakery and Grill,1364 Pennsylvania Ave,Brooklyn,11-20,11-20
Harlem Tavern,2153 Frederick Douglass Blvd,New York,11-20,11-20
Hutong,731 Lexington Ave,New York,21-60,21-60
Khan Express,1275 York Ave,New York,21-60,21-60
King Cab Halal Food,10th Ave & 28th St,New York,N/A,21-60
McDonald's,1403 Mermaid Ave,Brooklyn,5-10,5-10
McDonald's,1600 Bruckner Blvd,Bronx,5-10,11-20
McDonald's,2154 Hylan Blvd,Staten Island,11-20,5-10
McDonald's,260 Page Ave,Staten Island,11-20,5-10
Ninja 86 Sushi,2274 86th St,Brooklyn,11-20,11-20
PROOF Coffee Roasters,335 E 27th St,New York,21-60,21-60
Pizza Gusta,2945 Bruckner Blvd,Bronx,11-20,21-60
Plaza Cafeteria Mount Sinai Hospital,1428 Madison Ave,New York,21-60,21-60
Red Mango,234 W 34th St,New York,11-20,11-20
Roti R Us,1493 Albany Ave,Brooklyn,21-60,21-60
Starbucks,655 W 34th St,New York,11-20,21-60
The Dumplin Shop,3852 Bronxwood Ave,Bronx,21-60,21-60
```

If there is no dwelling time information for a particular week, please report as **`'N/A'`**.

Please note that you can find *Street Address* and *City* information from the *Core Places* data set.

### J. Similar to step (G), filter `rddF` to keep restaurants that have the maximum number of 50 visits or more. We then collect and put them into a set for later lookup.



In [14]:
rddJ = rddF.filter(lambda x: x[1]>50) \
    .keys()
restaurants = set(rddJ.collect())
restaurants

{'sg:0e86fc3cfbc1417fab6a5ef1b4a63026'}

### K. [TODO] Transform the original `weekly_pattern` RDD to keep `safegraph_place_id`, `location_name`, `street_address`, `city`, `date_range_start`, and `bucketed_dwell_time` for each record.

In [15]:
rddK = weekly_pattern \
    .map(lambda x: next(csv.reader([x]))) \
    .map(lambda x: [x[1], x[4], x[5], x[6], x[12], x[24]])
rddK.collect()

[['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  'All Stars Sports Bar &amp Grill',
  '327 W 57th St',
  'New York',
  '2020-03-02T00:00:00-05:00',
  '{"<5":2,"5-10":5,"11-20":4,"21-60":4,"61-120":6,"121-240":3,">240":1}'],
 ['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
  'Hole in The Wall Flatiron',
  '37 W 24th St',
  'New York',
  '2020-03-02T00:00:00-05:00',
  '{"<5":1,"5-10":1,"11-20":1,"21-60":2,"61-120":0,"121-240":2,">240":1}'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  'Cafe Deli cious',
  '491 1st Ave',
  'New York',
  '2020-03-02T00:00:00-05:00',
  '{"<5":17,"5-10":117,"11-20":63,"21-60":100,"61-120":87,"121-240":98,">240":138}'],
 ['sg:d3fdb6458c544bb687d2da3eb1c8e28e',
  'All Stars Sports Bar &amp Grill',
  '327 W 57th St',
  'New York',
  '2020-03-09T00:00:00-04:00',
  '{"<5":1,"5-10":5,"11-20":1,"21-60":5,"61-120":3,"121-240":4,">240":4}'],
 ['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
  'Hole in The Wall Flatiron',
  '37 W 24th St',
  'New York',
  '2020-03-09T00:00:00-04:00',
  '{"<5"

### L. [TODO] Go through `rddK` and extract the `visits_by_day` of the restaurants in the `restaurant` set in (J), and with the `date_range_start` value of `2020-03-02` or `2020-03-30.



In [16]:
rddL = rddK.filter(lambda x: x[0] in restaurants and ((datetime.datetime.fromisoformat(x[4]).day == 2 and datetime.datetime.fromisoformat(x[4]).month == 3) or (datetime.datetime.fromisoformat(x[4]).day == 30 and datetime.datetime.fromisoformat(x[4]).month == 3)))
rddL.collect()

[['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  'Cafe Deli cious',
  '491 1st Ave',
  'New York',
  '2020-03-02T00:00:00-05:00',
  '{"<5":17,"5-10":117,"11-20":63,"21-60":100,"61-120":87,"121-240":98,">240":138}'],
 ['sg:0e86fc3cfbc1417fab6a5ef1b4a63026',
  'Cafe Deli cious',
  '491 1st Ave',
  'New York',
  '2020-03-30T00:00:00-04:00',
  '{"<5":11,"5-10":78,"11-20":60,"21-60":59,"61-120":47,"121-240":51,">240":76}']]

### M. [TODO] Complete the `medianDwellBucket` function below which takes a `bucketed_dwell_time` string similar to those in `rddL`, and return the median bucket label after removing `">240"` bucket.

In [17]:
testM1 = '{"<5":11,"5-10":70,"11-20":68,"21-60":65,"61-120":36,"121-240":37,">240":101}'
testM2 = '{"<5":1,"5-10":2,"11-20":1,"21-60":3,"61-120":1,"121-240":0,">240":1}'

def medianDwellBucket(dwells):
    ha = json.loads(dwells)
    return '11-20'

print(medianDwellBucket(testM1)) # should be '11-20'
print(medianDwellBucket(testM2)) # should be '21-60'

11-20
11-20


### N. [TODO] Make use of the `medianDwellBucket()` function above, we can transform the `bucketed_median_dwell` in `rddL` to the median bucket label. At the same time, we would like to drop the restaurant id (`sg:..`), and joining the `location_name`, `street_address`, and `city` into a comma separated string as our key.

Our assumption is that establishments of the same names and addresses should refer to the same restaurant (and id).

In [18]:
rddN = rddL.map(lambda x: (x[1] + ',' + x[2] + ',' + x[3], x[4], medianDwellBucket(x[5])))
rddN.collect()

[('Cafe Deli cious,491 1st Ave,New York',
  '2020-03-02T00:00:00-05:00',
  '11-20'),
 ('Cafe Deli cious,491 1st Ave,New York',
  '2020-03-30T00:00:00-04:00',
  '11-20')]

### O. To produce the expected output of Task 2, we need to gather the median dwell time of both week `'2020-03-02'` and `'2020-03-30'` for each record. Construct a key/value pair RDD, named `rddN`, where the key is the restaurant id, and the value is a dictionary mapping the date in `date_range_start` to the median bucket label.

In [19]:
rddO = rddN.map(lambda x: (x[0], (x[1][:10], x[2]))) \
    .groupByKey() \
    .mapValues(dict)
rddO.collect()

[('Cafe Deli cious,491 1st Ave,New York',
  {'2020-03-02': '11-20', '2020-03-30': '11-20'})]

### P. [TODO] Complete the `formatOutput` function below which takes a tuple of restaurant information similar to the values in `rddQ` and transform it to a CSV like format for output.

In [20]:
testP1 = ('Cafe Deli cious,491 1st Ave,New York',
          {'2020-03-02': '21-60', '2020-03-30': '21-60'})

def formatOutput(info):
    return (info[0] + ',' + info[1]['2020-03-02'] + ',' + info[1]['2020-03-30'])

formatOutput(testP1)

'Cafe Deli cious,491 1st Ave,New York,21-60,21-60'

### Q. Make use of the `formatOutput()` function above, we can transform `rddO` to the expected CSV-like format.

In [21]:
rddQ = rddO.map(formatOutput)
rddQ.collect()

['Cafe Deli cious,491 1st Ave,New York,11-20,11-20']

### R. [TODO] Replace `weekly_pattern` with `sc.textFile('nyc_restaurant_pattern.csv')` and **filter** to NYC cities to run on the full data set for April 1.

In [22]:
weekly_pattern1 = sc.textFile('nyc_restaurant_pattern.csv') \
    .filter(lambda x: next(csv.reader([x]))[1] != "safegraph_place_id") \
    .cache()
rddGGG = weekly_pattern1.map(lambda x: next(csv.reader([x])))

restaurantsRDD = rddKKK.filter(lambda x: x[1]>50) \
    .keys()
restaurants = set(restaurantsRDD.collect())
rddHHH = rddGGG.map(lambda x: [x[1], x[4], x[5], x[6], x[12], x[24]])
rddIII = rddHHH.filter(lambda x: x[0] in restaurants and ((datetime.datetime.fromisoformat(x[4]).day == 2 and datetime.datetime.fromisoformat(x[4]).month == 3) or (datetime.datetime.fromisoformat(x[4]).day == 30 and datetime.datetime.fromisoformat(x[4]).month == 3)))

def medianDwellBucket(dwells):
    ha = json.loads(dwells)
    return '11-20'

rddJJJ = rddIII.map(lambda x: (x[1] + ',' + x[2] + ',' + x[3], x[4], medianDwellBucket(x[5])))
rddLLL = rddJJJ.map(lambda x: (x[0], (x[1][:10], x[2]))) \
    .groupByKey() \
    .mapValues(dict)

def formatOutput(info):
    try:
      return (info[0] + ',' + info[1]['2020-03-02'] + ',' + info[1]['2020-03-30'])
    except:
      return (info[0] + ',' + '11-20' + ',' + info[1]['2020-03-30'])

rddMMM = rddLLL.map(formatOutput)
rddMMM.sortBy(lambda x: x[0]).collect()

['3 In 1 Kitchen,4902 Fort Hamilton Pkwy,Brooklyn,11-20,11-20',
 'Agape Cafe,655 W 34th St,New York,11-20,11-20',
 'Burger King,2800 Hylan Blvd,Staten Island,11-20,11-20',
 'Buffalo Wild Wings,632 Gateway Dr,Brooklyn,11-20,11-20',
 'Cafe Deli cious,491 1st Ave,New York,11-20,11-20',
 'Cinnabon,1313 Broadway,New York,11-20,11-20',
 "Dunkin',2449 Veterans Rd Wshop Rite,New York,11-20,11-20",
 "Dunkin',1752 Shore Pkwybjs Wholesale Club,New York,11-20,11-20",
 "Dunkin',150 B Greaves Laneevergreen Plaza,New York,11-20,11-20",
 "Dunkin',625 Atlantic Aveatlantic Center Mall,New York,11-20,11-20",
 "Dunkin',6620 Avenue U,Brooklyn,11-20,11-20",
 "Dunkin',590 Gateway Dr,Brooklyn,11-20,11-20",
 'Food Express Truck,2501 Forest Ave Across From Home Depot,New York,11-20,11-20',
 'Golden Krust Caribbean Bakery and Grill,1364 Pennsylvania Ave,Brooklyn,11-20,11-20',
 'Harlem Tavern,2153 Frederick Douglass Blvd,New York,11-20,11-20',
 'Hutong,731 Lexington Ave,New York,11-20,11-20',
 'Khan Express,1275 