## Part 1 Data Upload

In [1]:
import requests
import json
import math
import time

#### Downloading the data

In [2]:
! wget "https://pages.cs.wisc.edu/~harter/cs639/data/hdma-wi-2021.csv"

--2023-03-20 19:16:19--  https://pages.cs.wisc.edu/~harter/cs639/data/hdma-wi-2021.csv
Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9
Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 174944099 (167M) [text/csv]
Saving to: ‘hdma-wi-2021.csv’


2023-03-20 19:16:23 (48.1 MB/s) - ‘hdma-wi-2021.csv’ saved [174944099/174944099]



#### Inserting data into our hdfs cluster single.csv without replication and double.csv with

In [2]:
! hdfs dfs -D dfs.block.size=1048576 -D dfs.replication=1 -cp hdma-wi-2021.csv hdfs://main:9000/single.csv

In [3]:
! hdfs dfs -D dfs.block.size=1048576 -D dfs.replication=2 -cp hdma-wi-2021.csv hdfs://main:9000/double.csv

In [4]:
! hdfs dfs -du -h hdfs://main:9000/

166.8 M  333.7 M  hdfs://main:9000/double.csv
166.8 M  166.8 M  hdfs://main:9000/single.csv


In [5]:
resp = requests.get("http://main:9870/webhdfs/v1/single.csv?op=OPEN&offset=100", allow_redirects=False)

In [6]:
resp.headers

{'Date': 'Mon, 20 Mar 2023 20:16:38 GMT, Mon, 20 Mar 2023 20:16:40 GMT', 'Cache-Control': 'no-cache', 'Expires': 'Mon, 20 Mar 2023 20:16:40 GMT', 'Pragma': 'no-cache', 'X-Content-Type-Options': 'nosniff', 'X-FRAME-OPTIONS': 'SAMEORIGIN', 'X-XSS-Protection': '1; mode=block', 'Location': 'http://a85252eaac89:9864/webhdfs/v1/single.csv?op=OPEN&namenoderpcaddress=main:9000&offset=100', 'Content-Type': 'application/octet-stream', 'Content-Length': '0'}

In [7]:
resp.headers['Location']

'http://a85252eaac89:9864/webhdfs/v1/single.csv?op=OPEN&namenoderpcaddress=main:9000&offset=100'

## Part2 : Block Locations

In [9]:
mb = 1048576
nodeDict = {}

#looping over offsets increasing my 1 MB each time till 167MB which is the file size
for i in range(167):
    response = requests.get(f"http://main:9870/webhdfs/v1/single.csv?op=OPEN&offset={i*mb}", allow_redirects=False)
    #only taking node adress which is up until the question mark
    qm = response.headers['Location'].index('?')
    node = response.headers['Location'][0:qm]
    #incrementing node value based if it has current block
    if node in nodeDict.keys():
        nodeDict[node] += 1
    else:
        nodeDict[node] = 1
        
nodeDict

{'http://a85252eaac89:9864/webhdfs/v1/single.csv': 79,
 'http://3034ad7193b6:9864/webhdfs/v1/single.csv': 88}

## Part3: Reading the Data 

In [10]:
status = requests.get("http://main:9870/webhdfs/v1/single.csv?op=GETFILESTATUS")
status = status.json()
status['FileStatus']['length']

174944099

In [24]:
import io

class hdfsFile(io.RawIOBase):
    def __init__(self, path):
        self.path = path
        self.offset = 0
        status = requests.get(f"http://main:9870/webhdfs/v1/{self.path}?op=GETFILESTATUS")
        status = status.json()
        self.length = int(status['FileStatus']['length'])

    def readable(self):
        return True

    def readinto(self, b):
        # If at end of file, return 0
        if self.offset >= self.length:
            return 0
        
        # determine how much data to request from HDFS
        remaining_bytes = self.length - self.offset
        toRead = min(len(b), remaining_bytes)
        
        # request data from HDFS
        res = requests.get(f"http://main:9870/webhdfs/v1/{self.path}?op=OPEN&offset={self.offset}&length={toRead}")
        
        if res.status_code == 200:
            data = res.content

            #adding to buffer
            b[0:len(data)] = data
            # update offset
            self.offset += toRead
            
            toRet = len(data)
        
        #PART4 CODE: replace missing blocks with new line (our block size is of 1MB (1048576 bytes)
        else:
            nl = bytearray(b'\n')
            b[0:len(nl)] = nl
            self.offset = (self.offset//1048576 + 1) * 1048576
            toRet = 1
        # return number of bytes read
        return toRet

### Counting Single and Multiple Family mentions

#### trying with buffer size 1000000 bytes

In [12]:
singleFam = 0
multFam = 0

t0 = time.time()
for line in io.BufferedReader(hdfsFile("single.csv"), 1000000):
    line = str(line, "utf-8")
    if "Single Family" in line:
        singleFam += 1
    if "Multifamily" in line:
        multFam += 1
        
t1 = time.time()
diff = t1 - t0

print("Counts from single.csv")
print("Single Family:", singleFam)
print("Multi Family:" , multFam)
print("Seconds:", diff)

Counts from single.csv
Single Family: 444874
Multi Family: 2493
Seconds: 15.973117589950562


#### trying with buffer size 100000000 bytes

In [13]:
singleFam = 0
multFam = 0

t0 = time.time()
for line in io.BufferedReader(hdfsFile("single.csv"), 100000000):
    line = str(line, "utf-8")
    if "Single Family" in line:
        singleFam += 1
    if "Multifamily" in line:
        multFam += 1
        
t1 = time.time()
diff = t1 - t0

print("Counts from single.csv")
print("Single Family:", singleFam)
print("Multi Family:" , multFam)
print("Seconds:", diff)

Counts from single.csv
Single Family: 444874
Multi Family: 2493
Seconds: 2.7151737213134766


#### we can see that increasing the buffer size speeds up our read

## Part4: Disaster Strikes

#### this next part of code is ran after killing one of our workers aka datanode

In [16]:
! hdfs dfsadmin -fs hdfs://main:9000/ -report

Configured Capacity: 51642105856 (48.10 GB)
Present Capacity: 33098988668 (30.83 GB)
DFS Remaining: 32570003456 (30.33 GB)
DFS Used: 528985212 (504.48 MB)
DFS Used%: 1.60%
Replicated Blocks:
	Under replicated blocks: 0
	Blocks with corrupt replicas: 0
	Missing blocks: 0
	Missing blocks (with replication factor 1): 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0
Erasure Coded Block Groups: 
	Low redundancy block groups: 0
	Block groups with corrupt internal blocks: 0
	Missing block groups: 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (1):

Name: 172.18.0.4:9866 (project-3-cat-mongers-worker-1.cs544net)
Hostname: 3034ad7193b6
Decommission Status : Normal
Configured Capacity: 25821052928 (24.05 GB)
DFS Used: 269332796 (256.86 MB)
Non DFS Used: 9249955524 (8.61 GB)
DFS Remaining: 16284987392 (15.17 GB)
DFS Used%: 1.04%
DFS Remaining%: 63.07%

In [25]:
singleFam = 0
multFam = 0

t0 = time.time()
for line in io.BufferedReader(hdfsFile("single.csv"), 1048576):
    line = str(line, "utf-8")
    if "Single Family" in line:
        singleFam += 1
    if "Multifamily" in line:
        multFam += 1
        
t1 = time.time()
diff = t1 - t0

print("Counts from single.csv")
print("Single Family:", singleFam)
print("Multi Family:" , multFam)

Counts from single.csv
Single Family: 234461
Multi Family: 1220


We see that single.csv had no replication so has lost data indicated by lesser lines when than when we ran with both nodes running

In [26]:
singleFam = 0
multFam = 0

t0 = time.time()
for line in io.BufferedReader(hdfsFile("double.csv"), 1048576):
    line = str(line, "utf-8")
    if "Single Family" in line:
        singleFam += 1
    if "Multifamily" in line:
        multFam += 1
        
t1 = time.time()
diff = t1 - t0

print("Counts from double.csv")
print("Single Family:", singleFam)
print("Multi Family:" , multFam)

Counts from double.csv
Single Family: 444874
Multi Family: 2493


However, double.csv has been replicated so suffers no dataloss on death of one of the nodes