# Homework 2 - MapReduce

This homework explore [Safegraph data](https://www.safegraph.com/covid-19-data-consortium) to better understand how NYC response to the COVID-19 pandemic. We will be looking at the [Places](https://docs.safegraph.com/docs/core-places) data set and the [Weekly Pattern](https://docs.safegraph.com/docs/weekly-patterns) data set to answer the following inquiry:

> How many restaurants were closed from 03/17/20 (when the lock down started), and how many were closed from 04/01/20?

### Notes

* *NYC*: we only consider restaurants in NYC, which means those with the city listed as `'New York'`, `'Brooklyn'`, `'Queens'`, `'Bronx'`, or `'Staten Island'` (we will miss a lot of Queens restaurants where cities are listed in names other than `'Queens'`).

* *Closed*: a restaurant is closed for the listed period if there were visits to the restaurants before 03/17/20 but none afterwards. Note that if the restaurant is closed for an entire week, there would be no report (instead of 7 zeros `[0,0,0,0,0,0,0]`) in the *Weekly Pattern* data set. 

### Requirements: 
You must use MRJob and MapReduce in a similar fashion as in Lab.

### INPUT:
To make it easier, we have already joined (and filtered) the two provided data sets into `nyc_restaurant_pattern.csv`, which has the visits pattern of all NYC restaurants. In other words, you only need to deal with a single input file `nyc_restaurant_pattern.cvs`, and would not need to fetch the original Safegraph data.

### OUTPUT:
Your MRJob only needs to output two rows as follows, each consists of a label (e.g. `"The number ..."`), and a count (e.g. `"49"`):
```
"The number of restaurants in NYC closed from March 17, 2020" "49"
"The number of restaurants in NYC closed from April 01, 2020" "496"
```

## Download Data and Packages

In [1]:
!gdown --id 1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d -O nyc_restaurant_pattern.csv
!curl -L "https://drive.google.com/uc?id=1TVhZgb1SWZbQB21J1hadcW-AIMnRiCL4&confirm=t" -o mapreduce.py 
!pip install mrjob

!head -n 3 nyc_restaurant_pattern.csv

Downloading...
From: https://drive.google.com/uc?id=1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d
To: /content/nyc_restaurant_pattern.csv
100% 101M/101M [00:00<00:00, 188MB/s] 
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  2663  100  2663    0     0   7007      0 --:--:-- --:--:-- --:--:--  7007
"placekey","safegraph_place_id","parent_placekey","parent_safegraph_place_id","location_name","street_address","city","region","postal_code","iso_country_code","safegraph_brand_ids","brands","date_range_start","date_range_end","raw_visit_counts","raw_visitor_counts","visits_by_day","visits_by_each_hour","poi_cbg","visitor_home_cbgs","visitor_daytime_cbgs","visitor_country_of_origin","distance_from_home","median_dwell","bucketed_dwell_times","related_same_day_brand","related_same_week_brand","device_type"
22f-225@6

# Task 1
You must complete the **MRFindReciprocal** class below (which is inherited from MRJob), and your code must run with the **mapreduce.py** package **mr.runJob()** as provided. Th expected output is:
```
"The number of restaurants in NYC closed from March 17, 2020" "49"
"The number of restaurants in NYC closed from April 01, 2020" "496"
```

In [2]:
import csv
import datetime
import json
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from io import StringIO

################################
### YOUR WORK SHOULD BE HERE ###
################################
class MRHW2(MRJob):
    '''
    PLEASE COMPLETE THIS CLASS. THIS SHOULD BE THE ONLY PLACE THAT YOU CAN EDIT.
    THE INPUT OF YOUR MAPREDUCE JOB WOULD BE LINE OF TEXT WITHOUT '\n'.
    '''


    def mapper_1(self,_,line):
      reader = csv.reader(StringIO(line),delimiter=',')
        #reader = csv.reader(fi)
      for row in reader:
        #place_key, city, date_range_start, visits_by_day = line.split(',')
        date_range_start = row[12][0:10]
        city = row[6]
        place_key = row[0]
        
        if row[0] != 'placekey':
          visits_by_day = [int(i) for i in row[16][1:-1].split(',')]
          if city in ['New York', 'Brooklyn', 'Queens', 'Bronx', 'Staten Island']:
            yield place_key, (date_range_start, visits_by_day)

    def reducer_1(self, key1, value1):
      yield key1, max(value1)

    def mapper_2(self, key1, value1):
      start = value1[0]
      visits1 = sum(value1[1][1:])
      visits2 = sum(value1[1][2:])

      if start == '2020-03-16':
        if visits1== 0:
          yield ('The number of restaurants in NYC closed from March 17, 2020',1)
      if start == '2020-03-30':
        if visits2 == 0:
          yield ('The number of restaurants in NYC closed from April 01, 2020',1)
      

    def reducer_2(self, key2, value2):
      yield key2, sum(value2)

    def steps(self):
      return [
              MRStep(mapper = self.mapper_1,reducer = self.reducer_1),
              MRStep(mapper = self.mapper_2,reducer = self.reducer_2)
              ]

###################################
### DO NOT EDIT BELOW THIS LINE ###
###################################
job = MRHW2(args=[])
with open('nyc_restaurant_pattern.csv', 'r') as fi:
  next(fi)
  output = list(mr.runJob(enumerate(map(lambda x: x.strip(), fi)), job))

print(len(output))
output

2


[('The number of restaurants in NYC closed from April 01, 2020', 496),
 ('The number of restaurants in NYC closed from March 17, 2020', 49)]

# Task 2
You are asked to convert the MR Job Class in Task 1 into a stand-alone `BDM_HW2_NetID.py` file that can be run directly with `python` similar to our Lab 3 and 4.


In [4]:
!python BDM_HW2_xx2208.py nyc_restaurant_pattern.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/BDM_HW2_xx2208.root.20220308.175637.562374
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/BDM_HW2_xx2208.root.20220308.175637.562374/output
Streaming final output from /tmp/BDM_HW2_xx2208.root.20220308.175637.562374/output...
"The number of restaurants in NYC closed from March 17, 2020"	49
"The number of restaurants in NYC closed from April 01, 2020"	496
Removing temp directory /tmp/BDM_HW2_xx2208.root.20220308.175637.562374...
