# Objetivo

O objetivo deste tutorial é conhecer um pouco mais sobre processamento de dados em HDFS. Utilizaremos o Hive e o Presto para entender a diferença entre dois modelos de computação distribuída.

# Hive

Para começar iremos explorar o formato utilizado pelo hive para armazenamento de arquivos. Como os dados são armazenados, consultados e como podemos aproveitar o particionamento para executar operações de forma mais eficiente.

Utilizaremos um sample de dados de twitter para executar as análises.

In [5]:
import pandas as pd
import subprocess
import sys

# imprime um sample dos dados
tweets = pd.read_csv('~/Desktop/tweets.csv', nrows=10)
tweets.head()

Unnamed: 0,tweet_id,user,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone,sentiment
0,"5,70306133677761E+017",cairdin,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada),neutral
1,"5,70301130888122E+017",jnardino,0,@VirginAmerica plus you've added commercials t...,,2015-02-24 11:15:59 -0800,,Pacific Time (US & Canada),positive
2,"5,70301083672814E+017",yvonnalynn,0,@VirginAmerica I didn't today... Must mean I n...,,2015-02-24 11:15:48 -0800,Lets Play,Central Time (US & Canada),neutral
3,"5,70301031407624E+017",jnardino,0,@VirginAmerica it's really aggressive to blast...,,2015-02-24 11:15:36 -0800,,Pacific Time (US & Canada),negative
4,"5,70300817074463E+017",jnardino,0,@VirginAmerica and it's a really big bad thing...,,2015-02-24 11:14:45 -0800,,Pacific Time (US & Canada),negative


Armazene o arquivo tweets.json no HDFS:

```
hdfs dfs -mkdir /user/class/tweets_csv/
hdfs dfs -copyFromLocal ~/Desktop/tweets.csv /user/class/tweets_csv/
```

In [12]:
# esse diretorio esta pronto para ser interpretado como uma tabela hive
p = subprocess.Popen(['hdfs', 'dfs', '-ls', '/user/class/tweets_csv/'], stdout=subprocess.PIPE)
print(p.communicate()[0].decode('utf-8'))

Found 1 items
-rw-r--r--   1 class class    3031610 2017-06-07 13:57 /user/class/tweets_csv/tweets.csv



In [16]:
# criar uma tabela no hive
table = '''CREATE EXTERNAL TABLE tweets_csv (
    id VARCHAR(50),
    username VARCHAR(50),
    retweet INT,
    text VARCHAR(200),
    coord VARCHAR(500),
    dt DATE,
    location VARCHAR(500),
    timezone VARCHAR(500),
    sentiment VARCHAR(50)
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/class/tweets_csv';
'''
p = subprocess.Popen(['hive', '-e', table], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = p.communicate()
print(out[0].decode('utf-8'))
print(out[1].decode('utf-8'))


ls: cannot access '/usr/lib/spark/lib/spark-assembly-*.jar': No such file or directory

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
OK
Time taken: 1.613 seconds



In [18]:
# criar uma tabela orc particionada
cmd = '''CREATE EXTERNAL TABLE tweets (
    id VARCHAR(50),
    username VARCHAR(50),
    retweet INT,
    text VARCHAR(200),
    coord VARCHAR(500),
    dt DATE,
    location VARCHAR(500),
    timezone VARCHAR(500),
    sentiment VARCHAR(50)
) PARTITIONED BY (day DATE)
STORED AS ORC
LOCATION 'hdfs:///user/class/tweets';
'''

p = subprocess.Popen(['hive', '-e', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = p.communicate()
print(out[0].decode('utf-8'))
print(out[1].decode('utf-8'))


ls: cannot access '/usr/lib/spark/lib/spark-assembly-*.jar': No such file or directory

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
OK
Time taken: 1.29 seconds



In [23]:
# converte os dados em csv para formato colunar
partitions = ['2015-02-16', '2015-02-17', '2015-02-18',
              '2015-02-19', '2015-02-20', '2015-02-21',
              '2015-02-22', '2015-02-23', '2015-02-24']

for part in partitions:
    print('Loading data: %s'%(part))
    cmd = '''INSERT OVERWRITE TABLE tweets PARTITION(day='%s')
    SELECT * FROM tweets_csv WHERE DATE_FORMAT(dt,'yyyy-MM-dd')='%s';'''%(part, part)
    p = subprocess.Popen(['hive', '-e', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out = p.communicate()
    print(out[0].decode('utf-8'))
    print(out[1].decode('utf-8'))

Loading data: 2015-02-16

ls: cannot access '/usr/lib/spark/lib/spark-assembly-*.jar': No such file or directory

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
Query ID = class_20170607144352_62824fbc-f7ca-433b-a1df-603d0b097c29
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1496845734730_0014, Tracking URL = http://class-VirtualBox:8088/proxy/application_1496845734730_0014/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1496845734730_0014
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2017-06-07 14:44:03,823 Stage-1 map = 0%,  reduce = 0%
2017-06-07 14:44:10,263 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.31 sec
MapReduce Total cumulative CPU time: 3 seconds 310 msec
Ended Job = job_1496845734730_0014
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out 


ls: cannot access '/usr/lib/spark/lib/spark-assembly-*.jar': No such file or directory

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
Query ID = class_20170607144631_af259d95-472e-4c4e-b82a-9f1a8b309874
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1496845734730_0020, Tracking URL = http://class-VirtualBox:8088/proxy/application_1496845734730_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1496845734730_0020
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2017-06-07 14:46:40,765 Stage-1 map = 0%,  reduce = 0%
2017-06-07 14:46:48,280 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.64 sec
MapReduce Total cumulative CPU time: 3 seconds 640 msec
Ended Job = job_1496845734730_0020
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.
Mo

In [24]:
# A nova tabela contem os dados particionados por dia
p = subprocess.Popen(['hdfs', 'dfs', '-ls', '/user/class/tweets/'], stdout=subprocess.PIPE)
print(p.communicate()[0].decode('utf-8'))
p = subprocess.Popen(['hdfs', 'dfs', '-ls', '/user/class/tweets/day=2015-02-16'], stdout=subprocess.PIPE)
print(p.communicate()[0].decode('utf-8'))

Found 9 items
drwxrwx---   - class class          0 2017-06-07 14:44 /user/class/tweets/day=2015-02-16
drwxrwx---   - class class          0 2017-06-07 14:44 /user/class/tweets/day=2015-02-17
drwxrwx---   - class class          0 2017-06-07 14:45 /user/class/tweets/day=2015-02-18
drwxrwx---   - class class          0 2017-06-07 14:45 /user/class/tweets/day=2015-02-19
drwxrwx---   - class class          0 2017-06-07 14:45 /user/class/tweets/day=2015-02-20
drwxrwx---   - class class          0 2017-06-07 14:46 /user/class/tweets/day=2015-02-21
drwxrwx---   - class class          0 2017-06-07 14:46 /user/class/tweets/day=2015-02-22
drwxrwx---   - class class          0 2017-06-07 14:47 /user/class/tweets/day=2015-02-23
drwxrwx---   - class class          0 2017-06-07 14:47 /user/class/tweets/day=2015-02-24

Found 1 items
-rwxrwx---   1 class class       1980 2017-06-07 14:44 /user/class/tweets/day=2015-02-16/000000_0



# Alternando entre o Presto e o Hive

Para rodar o presto, execute os seguintes comandos (em terminais distintos).

```
cd ~/presto-server-0.149/
./bin/launcher run

sudo service hive-metastore stop
hive --service metastore

presto
> use hive.default;
```

Para executar o hive:

```
sudo service hive-metastore start (com o outro metastore desligado)
hive
```

# Execute os seguintes experimentos

1. Faça consultas filtrando por partições e avalie a diferença no tempo de processamento.
2. Faça consultas de agregação e note a diferença nos tempos de processamento entre o ORC e o CSV.
3. Faça consultas de agregação variando o número de colunas e avaliando o que ocorre com o tempo de processamento.
4. Experimente outros formatos de armazenamento no hive.
5. Tente criar um algoritmo de trending topics utilizando o presto.