# Data Wrangling with HDFS, Hive and HBase

-----
Initialization

In [None]:
import os
import pandas as pd
pd.set_option("display.max_columns", 50)
import matplotlib.pyplot as plt
%matplotlib inline

username = os.environ['JUPYTERHUB_USER']
hiveaddr = os.environ['HIVE_SERVER_2']
print("Operating as: {0}".format(username))

In [None]:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem()
hdfs.ls("/user/{0}".format(username))

-----
## HDFS / Hive Storage format

Hive gives you the ability to choose between many storage formats for your data. You can find a list in the [Hive Storage Formats](https://cwiki.apache.org/confluence/display/Hive/FileFormats) documentation ([references](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-StorageFormats))

Among them, the most commonly used formats are _textfile_, _parquet_ and _Optmized Row Columnar (ORC)_.

The _textfile_ format should be used mostly for external files that are already in text format, or to make the tables available as external files to machines or humans who are not able to process other formats. They are mostly seen at the _edges_ of your big-data processing pipeline, which is where the data comes in, or out, and where interoperability with external sources or data consumers is required.

In other situations, we will prefer the _parquet_ or _ORC_ format, which are optimized to store Hive data more efficiently. In particular, _parquet_ or _ORC_ should be your first choice for _Hive-managed_ tables, or temporary tables.

We will import data from our HDFS storage that contains data from the SBB.

#### Converting from _textfile_ to _ORC_

When receiving a file in a text format, sometimes it makes sense to create a copy using the more efficient storage formats _parquet_ or _ORC_. Transforming the data into these formats has a (CPU) cost, however it only needs to be paid once, when the tables are created, and it is largely paid back by the read performance boost we get from them. Furtheremore, those formats are understood by many other utilities of the big data platform, such as Spark. We thus want to store this data in an _external_ tables, in a location that is accessible to those utilities.

We illustrate the format conversion in 3 steps in the next exercise.

------
We must first create a connection to the Hive server.

In [None]:
from pyhive import hive

# create connection
conn = hive.connect(host=hiveaddr, 
                    port=10000,
                    username=username) 
# create cursor
cur = conn.cursor()

-----
Assume you have received the sbb data in CSV format and that this data is now stored in HDFS. In this exercise we use a subset of the sbb data (December 2020), which you can find under `/data/sbb/istdaten/2020/12` on HDFS.

**Step 1.** Create an [_external table_](https://cwiki.apache.org/confluence/display/Hive/Managed+vs.+External+Tables) stored in _textfile_ format, located on `/data/sbb/istdate/2019/12`.

* Create a database using your name

* Make the database your default with `use`.

* Drop the table if it exists

* Create the database

In [None]:
query = """
    create database if not exists {0}
""".format(username)
cur.execute(query)

query = """
    use {0}
""".format(username)
cur.execute(query)

In [None]:
query = """
    drop table if exists {0}.sbb_csv_2020_12
""".format(username)
cur.execute(query)

In [None]:
query = """
    create external table {0}.sbb_csv_2020_12(
        BETRIEBSTAG string,
        FAHRT_BEZEICHNER string,
        BETREIBER_ID string,
        BETREIBER_ABK string,
        BETREIBER_NAME string,
        PRODUKT_ID string,
        LINIEN_ID string,
        LINIEN_TEXT string,
        UMLAUF_ID string,
        VERKEHRSMITTEL_TEXT string,
        ZUSATZFAHRT_TF string,
        FAELLT_AUS_TF string,
        BPUIC string,
        HALTESTELLEN_NAME string,
        ANKUNFTSZEIT string,
        AN_PROGNOSE string,
        AN_PROGNOSE_STATUS string,
        ABFAHRTSZEIT string,
        AB_PROGNOSE string,
        AB_PROGNOSE_STATUS string,
        DURCHFAHRT_TF string
    )
    row format delimited fields terminated by ';'
    stored as textfile
    location '/data/sbb/csv/istdaten/2020/12/'
""".format(username)
cur.execute(query)

Skipping the header line:

In [None]:
query ="""
alter table {0}.sbb_csv_2020_12 set tblproperties ("skip.header.line.count"="1")
""".format(username)
cur.execute(query)

Verify external table `sbb_csv_2020_12`

In [None]:
query = """
select * from {0}.sbb_csv_2020_12 limit 5
""".format(username)
pd.read_sql(query, conn)

----
**Step 2.** Create a new table in your HDFS folder, under `/user/{0}/hive/sbb/orc`.

Notes:
* We store this data in [ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-HiveQLSyntax) storage format
* We use `TBLPROPERTIES` to set the compression mode to `SNAPPY`
* The table is external. If we drop the table, the generated _ORC_ files will still be available to other big data applications.
* It is a new table, and it is empty. We will insert data into it.
* Hive will create the folder at the specified location on HDFS if it does not exist. Because we are all going to create this new table, we do not want to write over each other data, we will therefore locate this external table into our HDFS home folders.

In [None]:
query = """
    drop table if exists {0}.sbb_orc_2020_12
""".format(username)
cur.execute(query)

In [None]:
query = """
    create external table {0}.sbb_orc_2020_12(
        BETRIEBSTAG string,
        FAHRT_BEZEICHNER string,
        BETREIBER_ID string,
        BETREIBER_ABK string,
        BETREIBER_NAME string,
        PRODUKT_ID string,
        LINIEN_ID string,
        LINIEN_TEXT string,
        UMLAUF_ID string,
        VERKEHRSMITTEL_TEXT string,
        ZUSATZFAHRT_TF string,
        FAELLT_AUS_TF string,
        BPUIC string,
        HALTESTELLEN_NAME string,
        ANKUNFTSZEIT string,
        AN_PROGNOSE string,
        AN_PROGNOSE_STATUS string,
        ABFAHRTSZEIT string,
        AB_PROGNOSE string,
        AB_PROGNOSE_STATUS string,
        DURCHFAHRT_TF string
    )
    row format delimited fields terminated by ';'
    STORED AS ORC
    location '/user/{0}/hive/sbb/orc'
    TBLPROPERTIES ("orc.compress"="SNAPPY")
""".format(username)
cur.execute(query)

-----
**Step 3.** This new table is currently empty. You can import data from another table using the [insert overwrite](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries) command. We import  datafrom our `sbb_csv_2020-12` table.

In [None]:
query = """
insert overwrite table {0}.sbb_orc_2020_12 select * from {0}.sbb_csv_2020_12 
""".format(username)
cur.execute(query)

Verify that the content of the `sbb_orc_2020_12_01` table is similar to the content of the `sbb_csv_2020_01` table. Note that the ordering of the tables can be different. Since we did not impose a particular ordering (order by, sort by), the ordering varies depending on underlying storage formats, and can be arbitrary.

In [None]:
query = """
select * from {0}.sbb_orc_2020_12 limit 5
""".format(username)
pd.read_sql(query, conn)

Voila. You have a created table stored in _ORC_ format. Because the table is external, dropping this table in Hive will not delete the ORC files, and you can reuse them in other Hive tables, or in your Spark applications, etc.

----
## Part 2 - Hive Serialization/Deserialization Format

In the next set of exercises we review the methods to used to move external data in and out of Hive tables.



**Step 1.** Import one day of Twitter data into Hive.

Create an external table from HDFS dir `/data/twitter/json/2019/09/30` and call it **_username_.**`twitter_2019_09_30`. The table should have a single column named json of type string. Do not forget to use your database for the table instead of the Hive default.

A few hints:
1. The files have only one field per line
2. If you do not specify the row format, the default format fields terminated by '\n' will be used.

After the table `twitter_2019_09_30` is created, select its first row with a select command (limit 1). Use the output of the select query to identify the json fields where the the language and the timestamp information of the tweet are stored. You can use http://jsonprettyprint.com/, or the `jq` command to pretty print the json string.


In [None]:
query = """
drop table if exists {0}.twitter_2019_09_30
""".format(username)
cur.execute(query)

In [None]:
query = """
create external table {0}.twitter_2019_09_30(json string)
  stored as textfile
  location '/data/twitter/json/2019/09/30'
""".format(username)
cur.execute(query)

In [None]:
query = """
select * from {0}.twitter_2019_09_30 limit 1
""".format(username)
cur.execute(query)
cur.fetchall()

----
**Step 2.** Extract JSON fields from raw text format.

Hive parses the file as raw text format, ignoring its JSON structure.

In the next query we use the following [User Defined Functions](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) to extract and process the JSON fields from the text.

* get_json_object
* from_unix_time
* cast
* min
* max

Can you guess the meaning of this Hive command?

For further reading, you can also learn more about the subtle distinction between [**order by** and **sort by**](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy). It describes the side effects of the underlying _MapReduce_ technology on top of which Hive is built.

In [None]:
query = """
with q as (
    select
        get_json_object(json, '$.lang') as lang,
            from_unixtime(
                cast(
                    cast(
                        get_json_object(json, '$.timestamp_ms') as bigint
                    ) /1000 as bigint
                )
        ) as time_str
    from {0}.twitter_2019_09_30
)
select lang,count(*) as count,min(time_str) as first_ts_UTC,max(time_str) as last_ts_UTC
from q
group by lang
order by count desc
""".format(username)
pd.read_sql(query,conn)

----
**Step 3.** Use available Serialization/Deserialization (_SerDe_) libraries.

Using `get_json_object` in every `select` queries can be cubersome, and error prone. Hive provides the [_SerDe_ framework](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RowFormats&SerDe) to simplify the data IO serialization and deserialization. _SerDe_ properties are specified when Hive tables are created.


In [None]:
query="""drop table if exists {0}.twitter_serde_2019_09_30""".format(username)
cur.execute(query)

In [None]:
query="""
create external table {0}.twitter_serde_2019_09_30(
        timestamp_ms string,
        lang string
    )
    row format serde 'org.apache.hadoop.hive.serde2.JsonSerDe'
    WITH SERDEPROPERTIES(
        "ignore.malformed.json"="true"
    )
    stored as textfile
    location '/data/twitter/json/2019/09/30'
""".format(username)
cur.execute(query)

In [None]:
query="""
    select * from {0}.twitter_serde_2019_09_30 limit 10
""".format(username)
pd.read_sql(query,conn)

-----
## Part 3 - Hive over HBase

Hive is a data warehouse built on top of Hadoop. Hive queries get translated into MapReduce batch jobs that run on a distributed cluster. Hive is thus better for bulk inserts or updates of millions of rows at a time. It is not designed for fast individual lookups, operations on individual rows, or for real-time data. Other databases, such as MongoDB, Cassandra, [Apache Accumulo](https://accumulo.apache.org/) or [HBase](https://hbase.apache.org/) are better suited for real-time data. However they lack the relational DBMS flavor of Hive and an SQL-like interface. They are not ideal for complex relational queries.

In the following exercises, we will illustrate how to get the best from both worlds by integrating Hive with HBase. With this configuration it is possible to ingest high rates of individual rows of data, such as sensor measurements, in HBase, and run batch queries, possibly mixed with data HDFS data, using Hive.

More details about what follows can be found in the [Hive/HBase integration](https://cwiki.apache.org/confluence/display/Hive/HBaseintegration) documentation.

-----
### HBase

HBase in a few bullet points:
* HBase is a _noSQL_ (or non-relational) database.
  - You use HBase when you need random, realtime read/write access to Big Data.
  - HBase is schema-less, and is not relational DBMS. We do not use it for relational queries.
* It is a _key-value_ store - rows in HBase are indexed by their row keys.
* It is a _wide column store_. It can handle billions of rows on millions of columns on clusters of commodity hardware.
  - Columns are organized into _column families_.
  - _Column families_ must be conjured upfront. They apply to all the rows.
  - Columns and their names are not fixed. They are conjured on the fly when rows are created or updated.
  - It is a _sparse_ database. Empty columns take no space, they do not exist in HBase.
  - Column values are versioned.
* Tables have a _name_ and a _namespace_, are are uniquely identified by _namespace:name_
* Main data model operations on HBase are:
  - `put`: add a new row, or update an existing row
  - `get`: get the value from a row
  - `scan`: iterate over range of contigous rows, optionally with a _Filter_.
  - `delete`: delete a row
* It is built on top of HDFS. This is counter intutive, given HDFS's block-based nature. HBase manages it with periodic data compaction.


Conceptually, the structure of a HBase table looks like this:

| Row key       | timestamp | ColumnFamily1 | ColumnFamily2  |
| ------------- |:---------:|:---------------:|:----------------:|
| 01.12.2019/80:06____:17004:000 | 1583867978 | produkt_id=Zug,linenid=17004 | bpuic=8500090 |
| 01.12.2019/80:06____:17017:000 | 1583868321 | produkt_id=Bus,linenid=701   | bpuic=8301093 |

A _{row,column-family:column,version}_ specifes a _cell_ in the table.

References: [hbase.apache.org](https://hbase.apache.org/book.html)


----
#### Create a connection to HBase

We use [happybase](https://happybase.readthedocs.io/en/latest/) to connect remotely to the HBase server. The happybase API supports a very limited subset of all the commands possible with HBase. For more complex tasks we would use the `hbase` command line interface.

The python package is already installed. Otherwise, it can be installed with `pip install happybase`

In [None]:
import happybase
hbaseaddr = os.environ['HBASE_SERVER']
hbase_connection = happybase.Connection(hbaseaddr, transport='framed',protocol='compact')

----
#### Create an HBase table

We use your ID (from variable username) for the table namespace in HBase, as we did in Hive.

You may need to delete the table first if it exists. HBase table must be `disabled` before they can be deleted or altered.

In [None]:
try:
    hbase_connection.delete_table('{0}:sbb_hbase'.format(username),disable=True)
except Exception as e:
    print(e.message)
    pass

-----
Create a new HBase table, called `sbb_hbase` under your namespace.

Note: if we do not create the table, a default table will be created when we create the Hive table.

In [None]:
hbase_connection.create_table(
    '{0}:sbb_hbase'.format(username),
    {'cf1': dict(max_versions=10),
     'cf2': dict()
    }
)

-----
List all the tables in HBase

In [None]:
print(hbase_connection.tables())

-----
Inspect the tables. The properties of the column families can be specified when the table is created, e.g. `{ 'cf1': dict(max_version=10,block_cache_enabled=False) }`

In [None]:
hbase_connection.table('{0}:sbb_hbase'.format(username)).families()

-----
We can scan the HBase table to verify that it is empty

In [None]:
for r in hbase_connection.table('{0}:sbb_hbase'.format(username)).scan():
    print(r)

-----
Create an external Hive table on top of the HBase table. We first delete it if it exists.

The table will contain the following fields:
* `RowKey`
* `BETRIEBSTAG`
* `FAHRT_BEZEICHNER`
* `ABFAHRTSZEIT`
* `BPUIC`

RowKey is the key we will use to index the row. Other columns are populated from the sbb table.

Note:
* The table is stored using the `org.apache.hadoop.hive.hbase.HBaseStorageHandler` for HBase.
* TBLPROPERTIES `hbase.table.name` specifies the name of the HBase table that the external Hive table should point to. It is optional, and default to the same name as the Hive table.
* SERDEPROPERTIES `hbase.columns.mapping` defines the mapping between the Hive columns and the HBase columns. The definition are listed in the Hive column order, that is `RowKey` maps to `:key`, the key of the HBase table, `BETRIEBSTAG` maps to `cf1:betriebstag` in HBase (Column family cf1, column betriebstag), and so on.


In [None]:
query = """
drop table {0}.sbb_hive_on_hbase
""".format(username)
cur.execute(query)

In [None]:
query = """
CREATE EXTERNAL TABLE {0}.sbb_hive_on_hbase(
    RowKey string,
    BETRIEBSTAG string,
    FAHRT_BEZEICHNER string,
    ABFAHRTSZEIT string,
    BPUIC bigint
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
    "hbase.columns.mapping"=":key,cf1:betriebstag,cf1:fahrt_bezeichner,cf2:abfahrtszeit,cf2:bpuic"
)
TBLPROPERTIES(
    "hbase.table.name"="{0}:sbb_hbase",
    "hbase.mapred.output.outputtable"="{0}:sbb_hbase"
)
""".format(username)
cur.execute(query)

In [None]:
query = """
show tables from {0}
""".format(username)
pd.read_sql(query, conn)

-----
The external Hive table is backed by the HBase table, which is currently empty.

In [None]:
query = """
select * from {0}.sbb_hive_on_hbase limit 1
""".format(username)
pd.read_sql(query, conn)

-----
We may now populate the HBase table with SBB data. We do this through a `insert overwrite ... select`, as before. We copy from {0}.sbb_orc, to {0}.sbb_hive_on_hbase. This command may take a few minutes to complete.

In [None]:
query="""
insert overwrite table {0}.sbb_hive_on_hbase
    select
         concat(BETRIEBSTAG,":",FAHRT_BEZEICHNER) as RowKey,
         BETRIEBSTAG,
         FAHRT_BEZEICHNER,
         ABFAHRTSZEIT,
         BPUIC
    from {0}.sbb_orc_2020_12 limit 20
""".format(username)
cur.execute(query)

-----
A scan the HBase table shows the rows inserted by Hive

In [None]:
for r in hbase_connection.table('{0}:sbb_hbase'.format(username)).scan():
    print(r)

------
Cleanup (optional)

In [None]:
query = """
drop table {0}.sbb_hive_on_hbase
""".format(username)
cur.execute(query)

In [None]:
query = """
drop table {0}.sbb_orc_2020_12
""".format(username)
cur.execute(query)

In [None]:
query = """
drop table {0}.sbb_csv_2020_12 
""".format(username)
cur.execute(query)

In [None]:
try:
    hbase_connection.delete_table('{0}:sbb_hbase'.format(username),disable=True)
except Exception as e:
    pass

#### Resources:
* Hive toturial: [https://cwiki.apache.org/confluence/display/Hive/Tutorial](https://cwiki.apache.org/confluence/display/Hive/Tutorial)
* HBase integration: [https://cwiki.apache.org/confluence/display/Hive/HBaseintegration](https://cwiki.apache.org/confluence/display/Hive/HBaseintegration)