# Covid19 MapReduce Exercises

## Prepare the Dataset

This data file contains information about COVID-19 from January 21, 2020, to May 13, 2022, across various counties in the United States. The data was collected by *The New York Times*, based on reports from state and local health agencies. The original file can be found on Kaggle: *NY-TIMES COVID-19 USA dataset*, and it is distributed under a Creative Commons Attribution-NonCommercial 4.0 International (CC BY-NC) license.

The file we are using is a modified version of the original, as preprocessing was done to remove records with null values, and one column (FIPS) was removed for practical simplification, as it did not provide relevant information for the dataset.

There is a total of 2,445,227 records, and the fields are as follows:

- **date**: The date the data was recorded.
- **county**: The county where cases and deaths were recorded.
- **state**: The state where the county is located.
- **cases**: The number of confirmed cases.
- **deaths**: The number of confirmed deaths.

Link: [https://github.com/Vega90/datasets/raw/main/covid19-NY.zip](https://github.com/Vega90/datasets/raw/main/covid19-NY.zip)

### Download the Dataset

In [1]:
from zipfile import ZipFile

# Create a folder to save the file
! mkdir -p /media/notebooks/covid19

# Download the file and save it
! wget https://github.com/Vega90/datasets/raw/main/covid19-NY.zip \
-O /media/notebooks/covid19/covid19-NY.zip

# Unzip the file
zip = ZipFile('/media/notebooks/covid19/covid19-NY.zip')
zip.extractall('/media/notebooks/covid19/')
zip.close()

# Delete the zip file
! rm /media/notebooks/covid19/covid19-NY.zip

--2024-08-28 11:49:16--  https://github.com/Vega90/datasets/raw/main/covid19-NY.zip
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/Vega90/datasets/main/covid19-NY.zip [following]
--2024-08-28 11:49:16--  https://raw.githubusercontent.com/Vega90/datasets/main/covid19-NY.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24556797 (23M) [application/zip]
Saving to: ‘/media/notebooks/covid19/covid19-NY.zip’


2024-08-28 11:49:20 (18.6 MB/s) - ‘/media/notebooks/covid19/covid19-NY.zip’ saved [24556797/24556797]



### Upload it to HDFS

In [2]:
# Upload the file to HDFS 
! hdfs dfs -mkdir /covid19
! hdfs dfs -put /media/notebooks/covid19/covid19-NY.csv /covid19
! hdfs dfs -ls /covid19

Found 1 items
-rw-r--r--   3 root supergroup   90199922 2024-08-28 11:49 /covid19/covid19-NY.csv


### Create the folder for the exercises

In [3]:
import os
# Create the folder for the exercises
! mkdir -p /media/notebooks/covid19/mapreduce

os.chdir("/media/notebooks/covid19/mapreduce")
! pwd

/media/notebooks/covid19/mapreduce


### Pandas dataframe to help us understand the data

We will use a pandas dataframe to help us understand the data and we can also check the exercises by comparing the given output and the expected output.

In [4]:
import pandas as pd

# Data columns
columnas = ['date', 'county', 'state', 'cases', 'deaths']

# Read the CSV to a dataframe
dataframe = pd.read_csv('/media/notebooks/covid19/covid19-NY.csv', names=columnas)

dataframe.head()

Unnamed: 0,date,county,state,cases,deaths
0,2020-01-21,Snohomish,Washington,1,0
1,2020-01-22,Snohomish,Washington,1,0
2,2020-01-23,Snohomish,Washington,1,0
3,2020-01-24,Cook,Illinois,1,0
4,2020-01-24,Snohomish,Washington,1,0


## Exercise 1 - Counties Counter

The first exercise is to count the number of unique values in the county column. That way, we will know exactly how many counties we have data on the Coronavirus cases we are going to study.

**MAP Process # 01**

In [5]:
%%writefile 01_mapper.py
#!/usr/bin/env python3

import sys

# Standard input STDIN
for linea in sys.stdin:
  # Delete spaces
  fila = linea.strip()
  # Divide each line in columns
  campos = fila.split(',')

  # Write each county to be the input of the reducer
  condado = campos[1]
  print(f"{condado}")

Writing 01_mapper.py


**REDUCE Process # 01**

In [6]:
%%writefile 01_reducer.py
#!/usr/bin/env python3
from collections import defaultdict
import sys
  
# We use a set to avoid duplicates
condados = {linea.strip() for linea in sys.stdin}

# Total number of counties
print(f"Total number of counties:\t{len(condados)}")  

Writing 01_reducer.py


In [7]:
# Run map-reduce and display the result to make sure that the files are correctly defined
! cat /media/notebooks/covid19/covid19-NY.csv \
| python3 01_mapper.py | sort | python3 01_reducer.py > salida-01

! cat salida-01

Numero total de condados:	1856


**Hadoop Streaming Execution # 01**

In [8]:
! hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.4.0.jar \
-files 01_mapper.py,01_reducer.py \
-mapper 01_mapper.py \
-reducer 01_reducer.py \
-input /covid19/covid19-NY.csv \
-output /covid19/mapreduce/01_salida

packageJobJar: [/tmp/hadoop-unjar3530511188227510689/] [] /tmp/streamjob6913980281073132578.jar tmpDir=null
2024-08-28 11:50:03,080 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:50:03,360 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:50:03,897 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1724833657149_0030
2024-08-28 11:50:05,289 INFO mapred.FileInputFormat: Total input files to process : 1
2024-08-28 11:50:05,741 INFO mapreduce.JobSubmitter: number of splits:2
2024-08-28 11:50:07,134 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1724833657149_0030
2024-08-28 11:50:07,134 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-08-28 11:50:07,962 INFO conf.Configuration: resource-types.xml not found
2024-08-28 11:50:07,963 INFO resource.ResourceUtils

In [9]:
# Display the output of exercise 1
! hdfs dfs -cat /covid19/mapreduce/01_salida/part-00000

# Check the number of unique values for that column in the dataframe 
num = dataframe['county'].nunique()
print(f"\nValores en el dataframe:\t{num}")

Numero total de condados:	1856

Valores en el dataframe:	1856


## Exercise 2- Search for the day with the highest number of fatalities

This second exercise consists of obtaining the day on which the greatest number of deaths was recorded. To do this we must program two phases:
1. Obtain the value of deaths and the day.
2. Add the partial results grouped by day.


**MAP Process # 02**

Processes CSV input line by line, extracts the date and the number of deaths, handles any missing values by assigning a default of 0, and then outputs the results as key-value pairs with the date as the key and the number of deaths as the value.


In [10]:
%%writefile 02_mapper.py
#!/usr/bin/env python3

import sys

# Standard Input STDIN
for linea in sys.stdin:
    
  fila = linea.strip()
  # Divide each line in columns
  campos = fila.split(',')

  # Keep the fields we want
  fecha = campos[0]
  muertes = campos[4]

  if muertes == None:
      muertes = 0;

  # Write the fields to be the input of the reducer        
  print(f"{fecha}\t{muertes}")

Writing 02_mapper.py


**Reduce Process # 02**

Processes the key-value pairs outputted by the Mapper (date and number of deaths), accumulates the total deaths for each date, and then calculates the day with the highest number of deaths. It finally prints the date and the total number of deaths for that day.

In [11]:
%%writefile 02_reducer.py
#!/usr/bin/env python3
from collections import defaultdict
import sys

# Dictionary for the date and number of deaths
muertes_dia = defaultdict(int)

# Process each input line
for linea in sys.stdin:
    fecha, muertes = linea.strip().split('\t')
    muertes_dia[fecha] += int(muertes)

# Calculate the day with the highest number of deaths
max_dia, max_muertes = max(muertes_dia.items(), key=lambda x: x[1])

print(f"El dia con mayor numero de muertes fue el {max_dia}")
print(f"Hubo un total de {max_muertes} muertes")  

Writing 02_reducer.py


In [12]:
# Run map-reduce from the console
! cat /media/notebooks/covid19/covid19-NY.csv \
| python3 02_mapper.py | sort | python3 02_reducer.py > salida-02

! cat salida-02

El dia con mayor numero de muertes fue el 2022-05-13
Hubo un total de 998279 muertes


**Hadoop Streaming Execution # 02**

In [13]:
! hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.4.0.jar \
-files 02_mapper.py,02_reducer.py \
-mapper 02_mapper.py \
-reducer 02_reducer.py \
-input /covid19/covid19-NY.csv \
-output /covid19/mapreduce/02_salida

packageJobJar: [/tmp/hadoop-unjar5085335737554451389/] [] /tmp/streamjob6791996230282016498.jar tmpDir=null
2024-08-28 11:51:11,800 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:51:12,074 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:51:12,555 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1724833657149_0031
2024-08-28 11:51:13,480 INFO mapred.FileInputFormat: Total input files to process : 1
2024-08-28 11:51:13,771 INFO mapreduce.JobSubmitter: number of splits:2
2024-08-28 11:51:14,214 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1724833657149_0031
2024-08-28 11:51:14,214 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-08-28 11:51:14,683 INFO conf.Configuration: resource-types.xml not found
2024-08-28 11:51:14,684 INFO resource.ResourceUtils

In [14]:
# Display the output of exercise 2
! hdfs dfs -cat /covid19/mapreduce/02_salida/part-00000 

# Check the result with the dataframe
df_agrupado = dataframe[['date', 'deaths']].groupby('date').sum().reset_index()
max_deaths = df_agrupado[df_agrupado['deaths'] == df_agrupado['deaths'].max()]
print("\nSalida del dataframe:\n")
max_deaths

El dia con mayor numero de muertes fue el 2022-05-13	
Hubo un total de 998279 muertes	

Salida del dataframe:



Unnamed: 0,date,deaths
843,2022-05-13,998279


## Exercise 3 - State with the highest average number of deaths due to positive cases

In this exercise we are going to extract the result of the deaths by positive cases. This way we will be able to know which is the state with the most deaths by positive cases. Again we will need the 'state' field and this time, we will use the 'cases' field in addition to the previous 'deaths' field.

**MAP Process # 03**

Processes input data line by line from the standard input (STDIN). It strips any leading or trailing whitespace, splits each line into columns by commas, and extracts three specific fields: `estado` (state), `casos` (cases), and `muertes` (deaths). It then outputs these fields in the format `estado\tcasos\tmuertes`, which will serve as input for the **Reducer** in a MapReduce job.

In [15]:
%%writefile 03_mapper.py
#!/usr/bin/env python3

import sys

# STDIN
for linea in sys.stdin:
  fila = linea.strip()
  campos = fila.split(',')

  estado = campos[2]
  casos = campos[3]
  muertes = campos[4]

  # Write the fields to be the input of the reducer          
  print(f"{estado}\t{casos}\t{muertes}")

Writing 03_mapper.py


**REDUCE Process # 03**

Processes input data line by line, which contains information about states, cases, and deaths. It uses a dictionary to track the total deaths and cases for each state. For each line of input, it splits the data by tab (`\t`) and updates the death and case counts for the respective state. After processing all the input, the script calculates the state with the highest mortality rate by dividing the total deaths by the total cases for each state. It then prints the state with the highest mortality rate and the corresponding percentage of deaths among the cases.

In [16]:
%%writefile 03_reducer.py
#!/usr/bin/env python3
import sys
from collections import defaultdict

# Use a dictionary for deaths and cases by state
# Initialize with default values 0
muertes_por_casos = defaultdict(lambda: [0, 0])

# For each line in the STDIN
for linea in sys.stdin:
    estado, casos, muertes = linea.strip().split('\t')
    muertes_por_casos[estado][0] += int(muertes)
    muertes_por_casos[estado][1] += int(casos)

# Calculate the state with the highest average number of deaths due to positive cases
max_estado, max_media = max(
    ((estado, (muertes / casos) if casos != 0 else 0)
     for estado, (muertes, casos) in muertes_por_casos.items()),
    key=lambda x: x[1]
)

porcentaje = round(max_media * 100, 2)

print(f"El estado con mayor mortandad por casos positivos fue {max_estado}")
print(f"Hubo un {porcentaje}% de muertes de los casos encontrados")

Writing 03_reducer.py


In [17]:
# Run our map-reduce and see the result from the console 
! cat /media/notebooks/covid19/covid19-NY.csv \
| python3 03_mapper.py | sort | python3 03_reducer.py > salida-03

! cat salida-03

El estado con mayor mortandad por casos positivos fue Puerto Rico
Hubo un 35.78% de muertes de los casos encontrados


**Hadoop Streaming Execution # 03**

In [18]:
! hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.4.0.jar \
-files 03_mapper.py,03_reducer.py \
-mapper 03_mapper.py \
-reducer 03_reducer.py \
-input /covid19/covid19-NY.csv \
-output /covid19/mapreduce/03_salida

packageJobJar: [/tmp/hadoop-unjar4306110333602406323/] [] /tmp/streamjob2949191051670331604.jar tmpDir=null
2024-08-28 11:52:47,379 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:52:48,143 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:52:49,513 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1724833657149_0032
2024-08-28 11:52:50,956 INFO mapred.FileInputFormat: Total input files to process : 1
2024-08-28 11:52:51,304 INFO mapreduce.JobSubmitter: number of splits:2
2024-08-28 11:52:52,038 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1724833657149_0032
2024-08-28 11:52:52,038 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-08-28 11:52:52,620 INFO conf.Configuration: resource-types.xml not found
2024-08-28 11:52:52,621 INFO resource.ResourceUtils

In [19]:
# See exercise 3 output
! hdfs dfs -cat /covid19/mapreduce/03_salida/part-00000 

# Check in the dataframe the state with the most deaths by case
df_agrupado = dataframe[['state', 'deaths', 'cases']].groupby('state').sum().reset_index()

# Create a new column with the percentage division
df_agrupado['muertes-casos'] = (( df_agrupado['deaths'] / df_agrupado['cases'] ) * 100).round(2)
max_deaths_cases = df_agrupado[df_agrupado['muertes-casos'] == df_agrupado['muertes-casos'].max()]
print("\nSalida del dataframe:\n")
max_deaths_cases

El estado con mayor mortandad por casos positivos fue Puerto Rico	
Hubo un 35.78% de muertes de los casos encontrados	

Salida del dataframe:



Unnamed: 0,state,deaths,cases,muertes-casos
42,Puerto Rico,1605636,4488011,35.78


## Exercise 4 - Ranked count. Number of counties analyzed by state

We can also perform range counting using MapReduce. In this exercise we will count for each state the unique counties that have been analyzed. We will need the state and county field.

**MAP Process # 04**

Processes input data line by line, extracting the state and county information from each record. It splits each line into columns, selects the relevant columns (state and county), and then outputs a key-value pair where the key is the state and the value is the county.

In [20]:
%%writefile 04_mapper.py
#!/usr/bin/env python3

import sys

# STDIN
for linea in sys.stdin:
  fila = linea.strip()
  campos = fila.split(',')

  estado = campos[2]
  condado = campos[1]
    
  print(f"{estado}\t{condado}")

Writing 04_mapper.py


**REDUCE Process # 04**

Processes the input data by reading each line containing a state and county pair. It stores this information in a dictionary, where the key is the state and the value is a list of counties associated with that state. After processing all the input, it filters the counties to count the unique counties for each state using a set. Finally, it prints the number of unique counties for each state, indicating how many distinct counties are present in the data for each state.

In [21]:
%%writefile 04_reducer.py
#!/usr/bin/env python3
from collections import defaultdict
import sys

# dictionary for state and counties
estados_totales = defaultdict(list)

# For each line read the state and county
for linea in sys.stdin:
    estado, condado = linea.strip().split('\t')

    # Append the county to your state list
    estados_totales[estado].append(condado)

for estado, condados_totales in estados_totales.items():
    # With a set object we filter out the unique counties and count them
    num_condados = len(set(condados_totales))

    # Total number of counties analyzed by state
    print(f"{num_condados} condados - Estado de {estado}")   

Writing 04_reducer.py


In [22]:
# Run our map-reduce and see the result in the console 
! cat /media/notebooks/covid19/covid19-NY.csv \
| python3 04_mapper.py | sort | python3 04_reducer.py > salida-04

! cat salida-04 | head -n 10

67 condados - Estado de Alabama
28 condados - Estado de Alaska
1 condados - Estado de American Samoa
16 condados - Estado de Arizona
76 condados - Estado de Arkansas
59 condados - Estado de California
65 condados - Estado de Colorado
9 condados - Estado de Connecticut
4 condados - Estado de Delaware
1 condados - Estado de District of Columbia


**Hadoop Streaming Execution # 04**

In [23]:
! hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.4.0.jar \
-files 04_mapper.py,04_reducer.py \
-mapper 04_mapper.py \
-reducer 04_reducer.py \
-input /covid19/covid19-NY.csv \
-output /covid19/mapreduce/04_salida

packageJobJar: [/tmp/hadoop-unjar8960482822706783748/] [] /tmp/streamjob2748713371978554974.jar tmpDir=null
2024-08-28 11:54:41,883 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:54:42,279 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmanager/172.18.0.7:8032
2024-08-28 11:54:43,146 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1724833657149_0033
2024-08-28 11:54:45,211 INFO mapred.FileInputFormat: Total input files to process : 1
2024-08-28 11:54:45,791 INFO mapreduce.JobSubmitter: number of splits:2
2024-08-28 11:54:46,841 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1724833657149_0033
2024-08-28 11:54:46,842 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-08-28 11:54:47,338 INFO conf.Configuration: resource-types.xml not found
2024-08-28 11:54:47,339 INFO resource.ResourceUtils

In [24]:
# See exercise 4 output
! hdfs dfs -cat /covid19/mapreduce/04_salida/part-00000 | head -n 10

# Count the unique counties for each state
df_condados_estados = dataframe[['state', 'county']].groupby('state').nunique()
print("\nSalida del dataframe:\n")
df_condados_estados.sort_values(by='state').head(10)

67 condados - Estado de Alabama	
28 condados - Estado de Alaska	
1 condados - Estado de American Samoa	
16 condados - Estado de Arizona	
76 condados - Estado de Arkansas	
59 condados - Estado de California	
65 condados - Estado de Colorado	
9 condados - Estado de Connecticut	
4 condados - Estado de Delaware	
1 condados - Estado de District of Columbia	

Salida del dataframe:



Unnamed: 0_level_0,county
state,Unnamed: 1_level_1
Alabama,67
Alaska,28
American Samoa,1
Arizona,16
Arkansas,76
California,59
Colorado,65
Connecticut,9
Delaware,4
District of Columbia,1
