## Hadoop HDFS

In [1]:
import sys
import json
import requests


class HDFSClient:
    """
        official docs:
            https://hadoop.apache.org/docs/r3.3.5/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
        python library:
            https://github.com/mtth/hdfs
            
        이 프로젝트에서는 namenode 와 datanode 가 Docker 컨테이너 이름으로 서로 통신을 한다.
        하지만 호스트에서는 컨테이너 이름으로 접근할 수 없어 자동으로 리다이렉트를 해주는 hdfs 라이브러리를 사용할 수 없다.
        따라서 namenode로 부터 받아온 datanode 주소를 직접 변경 후에 datanode에 요청을 보내기 위해 Client Class 를 작성하였다.
    """
    def __init__(self, namenode='http://localhost:9870', user='root'):
        self.namenode = namenode
        self.ports = {'datanode1': '9862', 'datanode2': '9863', 'datanode3': '9864'}
        self.session = requests.Session()
        self.session.params['user.name'] = user
    
    def ls(self, hdfs_path):
        hdfs_path = hdfs_path.lstrip('/')
        
        url = f'{self.namenode}/webhdfs/v1/{hdfs_path}?op=LISTSTATUS'
        res = self.session.get(url)
        return [status['pathSuffix'] for status in res.json()['FileStatuses']['FileStatus']]
    
    def mkdir(self, hdfs_path):
        hdfs_path = hdfs_path.lstrip('/')
        
        url = f'{self.namenode}/webhdfs/v1/{hdfs_path}?op=MKDIRS'
        res = self.session.put(url)
        return res.json()
    
    def write(self, hdfs_path, data, overwrite=False, buffersize=128*1024, append=False):
        hdfs_path = hdfs_path.lstrip('/')
        
        # namenode에게 데이터를 쓸 datanode 요청
        url = f"{self.namenode}/webhdfs/v1/{hdfs_path}?"
        if append:
            if overwrite:
                raise ValueError('Cannot both overwrite and append.')
            url += "op=APPEND"
        else:
            url += "op=CREATE"
        url += f"&noredirect=true&overwrite={str(overwrite).lower()}&buffersize={buffersize}"
        res = self.session.post(url) if append else self.session.put(url)

        # datanode 주소 변환
        location = res.json()['Location']
        location = self.transform(location)
        
        # datanode에게 데이터 쓰기 요청
        res = self.session.post(location, data=data) if append else self.session.put(location, data=data)
        return res
    
    def upload(self, hdfs_path, local_path, buffersize=128*1024):
        hdfs_path = hdfs_path.lstrip('/')
        
        try:
            with open(local_path, 'rb') as reader:
                data = reader.read()
                self.write(hdfs_path, data, buffersize=buffersize)
            return True
        except:
            self.delete(hdfs_path)
            return False
    
    def read(self, hdfs_path, buffersize=128*1024):
        hdfs_path = hdfs_path.lstrip('/')
        
        # namenode에게 데이터를 읽울 datanode 요청
        url = f"{self.namenode}/webhdfs/v1/{hdfs_path}?op=OPEN&noredirect=true&buffersize={buffersize}"
        res = self.session.get(url)
        
        # datanode 주소 변환
        location = res.json()['Location']
        location = self.transform(location)
        
        # datanode에게 데이터 읽기 요청
        res = self.session.get(location)
        return res
    
    def download(self, hdfs_path, local_path, buffersize=128*1024):
        hdfs_path = hdfs_path.lstrip('/')
        
        try:
            with open(local_path, 'wb') as writer:
                with self.read(hdfs_path, buffersize=buffersize) as reader:
                    for chunk in reader:
                        writer.write(chunk)
            return True
        except:
            return False
    
    def delete(self, hdfs_path, isdir=False):
        hdfs_path = hdfs_path.lstrip('/')
        
        url = f'{self.namenode}/webhdfs/v1/{hdfs_path}?op=DELETE&recursive={str(isdir).lower()}'
        res = self.session.delete(url)
        return res.json()
    
    def transform(self, location):
        location = location.split("/")
        datanode, _ = location[2].split(":")
        location[2] = f'localhost:{self.ports[datanode]}'
        return '/'.join(location)

In [2]:
hdfs = HDFSClient()

### make directory

In [3]:
hdfs.mkdir('/test')

{'boolean': True}

### list directory

In [4]:
hdfs.ls('/')

['hbase', 'models', 'test']

### write data

In [5]:
res = hdfs.write('/test/test.txt', 'test text file!', overwrite=True, append=False)

### read data

In [6]:
res = hdfs.read('/test/test.txt')
res.content

b'test text file!'

### make directory

In [7]:
import pickle

data = {
    'a': [1, 2.0, 3, 4+6j],
    'b': ('string', u'Unicode string'),
    'c': None
}

with open('data.pkl', 'wb') as f:
    pickle.dump(data, f)

In [8]:
hdfs.upload('/test/data.pkl', 'data.pkl')

True

In [9]:
res = hdfs.read('/test/data.pkl')
pickle.loads(res.content)

{'a': [1, 2.0, 3, (4+6j)], 'b': ('string', 'Unicode string'), 'c': None}

### make directory

In [10]:
hdfs.download('/test/data.pkl', 'downloaded.pkl')

True

In [11]:
with open('downloaded.pkl', 'rb') as f:
    print(pickle.load(f))

{'a': [1, 2.0, 3, (4+6j)], 'b': ('string', 'Unicode string'), 'c': None}


### delete data

In [12]:
hdfs.ls('/test')

['data.pkl', 'test.txt']

In [13]:
hdfs.delete('/test/data.pkl')

{'boolean': True}

In [14]:
hdfs.ls('/test')

['test.txt']

In [15]:
hdfs.delete('/test', isdir=True)

{'boolean': True}

In [16]:
hdfs.ls('/')

['hbase', 'models']

## HBase

In [None]:
!pip install happybase

In [50]:
import happybase

connection = happybase.Connection('localhost', port=9090)
connection.open()

### create table

In [56]:
table_name = b'my_table'
if table_name not in connection.tables():
    connection.create_table(table_name, {'cf': dict()})

### put data

In [59]:
table = connection.table(table_name)

table.put('row-key1', {'cf:col1': b'value1', 'cf:col2': b'value1'})
table.put('row-key2', {'cf:col1': b'value2', 'cf:col2': b'value2'})
table.put('row-key3', {'cf:col1': b'value3', 'cf:col2': b'value3'})
table.put('row-key4', {'cf:col1': b'value4', 'cf:col2': b'value4'})

### read data

In [60]:
table.row('row-key1')

{b'cf:col1': b'value1', b'cf:col2': b'value1'}

### scan data

In [64]:
for key, data in table.scan(row_start='row-key1', row_stop='row-key3', columns=['cf:col2']):
    print(key, data) 

b'row-key1' {b'cf:col2': b'value1'}
b'row-key2' {b'cf:col2': b'value2'}


### delete data

In [65]:
table.delete(b'row_key1')

### delete table

In [None]:
# disable first -> delete
connection.disable_table(table_name)
connection.delete_table(table_name)
connection.tables()