In [1]:
import ibis

In [2]:
ibis.options.sql.default_limit = None

hdfs_conn = ibis.hdfs_connect(host='bottou03.sjc.cloudera.com')

ibis_conn = ibis.impala.connect(host='bottou01.sjc.cloudera.com',
                                    port=21050,
                                    hdfs_client=hdfs_conn)

In [3]:
hdfs_data_path = '/user/juliet'
hdfs_conn.mkdir(hdfs_data_path)

In [4]:
hdfs_conn.ls('/user/juliet')

[u'.Trash', u'.sparkStaging', u'CPG_Demo', u'eggs', u'pageviews-gz']

In [50]:
import os
# System indepedent way to join paths
local_data_path = os.path.join(os.getcwd(), "pageviews-gz")

In [52]:
filenames = os.listdir(local_data_path)
filenames[3][:-3]

'pageviews-20160101-040000'

must put each file in its own dir because we need info from file name, using impala.

In [73]:
hdfs_conn.mkdir('/user/juliet/pageviews-gz')
def mv_files(filename):
    dir_name = '/user/juliet/pageviews-gz/{}'.format(filename[:-3])
    hdfs_conn.mkdir(dir_name)
    filepathtarget='/'.join([dir_name, filename])
    hdfs_conn.put(filepathtarget, os.path.join(local_data_path, filename))
    return dir_name
    
hdfs_gz_dirs = [mv_files(filename) for filename in filenames]

In [13]:
hdfs_gz_dirs = hdfs_conn.ls('/user/juliet/pageviews-gz/')

In [55]:
hdfs_conn.mv('/user/juliet/python_Leenshtein-0.12.0-py2.7-linux-x86_64.egg', '/user/juliet/eggs')

In [56]:
hdfs_conn.ls('/user/juliet')

[u'.Trash', u'.sparkStaging', u'CPG_Demo', u'eggs']

In [21]:
hdfs_gz_dirs = [ '/user/juliet/pageviews-gz/' + filename for filename in hdfs_conn.ls('/user/juliet/pageviews-gz')]
hdfs_gz_dirs

[u'/user/juliet/pageviews-gz/pageviews-20160101-000000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-010000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-020000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-030000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-040000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-050000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-060000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-070000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-080000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-090000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-100000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-110000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-120000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-130000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-140000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-150000',
 u'/user/juliet/pageviews-gz/pageviews-20160101-160000',
 u'/user/juliet/pageviews-gz/pa

In [23]:
import pandas as pd

def extract_datetime(filename):
    _, date_str, time_str = filename.split("-")
    year = date_str[:4]
    month = date_str[4:6]
    day = date_str[-2:]
    hour = time_str[:2]
    return year, month, day, hour

def to_pd_dt(filename):
    return pd.to_datetime(filename, format='pageviews-%Y%m%d-%H0000')

In [82]:
# quick check that this does the right thing
extract_datetime('pageviews-20160102-030000.gz')
to_pd_dt('pageviews-20160102-030000')

Timestamp('2016-01-02 03:00:00')

Each file actually has important information about itself in its title. We need to decompressed each file (because gzip is not splittable and we could do better with compression codecs), read each into a dataset, concatenate all the datasets, write out the output in a better format. (Ie parquet with LZO compression)

We could do this with spark or impala. Spark writes parquiet files without row groups (perhaps not strictly incorrect, but impala does not handle this well) For best mutual compatibility we should write out our parquet file with Impala, not Spark.

# Making a better file
## Create Impala Table

In [21]:
ibis_conn.create_database('u_juliet')

In [17]:
ibis_conn.drop_table(table_name='wiki_pageviews', database='u_juliet')
file_schema = ibis.schema([('project_name', 'string'),
                           ('page_name', 'string'),
                           ('monthly_total', 'int64'),
                           ('hourly_total', 'int64')])

In [7]:
pageviews_tbl = ibis_conn.table('wiki_pageviews', database='u_juliet')

In [28]:
ibis_conn.execute(pageviews_tbl.count())

4148173

In [15]:
u_juliet_db = ibis_conn.database('u_juliet')
u_juliet_db.tables

['wiki_pageviews']

In [27]:
def gz_2_data_insert(data_dir, db_expr):
    tmp_table = ibis_conn.delimited_file(hdfs_dir=data_dir,
                                  schema=file_schema,
                                  delimiter=' ')
    pdtime = to_pd_dt(data_dir.split("/")[-1])
    # create a column named time
    tmp_w_time = tmp_table.mutate(time=pdtime)
    if 'wiki_pageviews' in db_expr.tables:
        ibis_conn.insert('wiki_pageviews', tmp_w_time, database='u_juliet')
    else:
        ibis_conn.create_table('wiki_pageviews', obj=tmp_w_time, database='u_juliet')

gz_2_data_insert(hdfs_gz_dirs[4], u_juliet_db)

IbisInputError: Cannot safely cast timestamp to string

In [108]:
u_juliet_db.tables

['wiki_pageviews']

In [78]:
pageviews_tbl = ibis_conn.table('wiki_pageviews', database='u_juliet')

Unnamed: 0,project_name,page_name,monthly_total,hourly_total
0,aa,Main_Page,11,0
1,aa,MediaWiki:Sitenotice,1,0
2,aa,Special:Contributions/Pathoschild,1,0
3,aa,Special:Contributions/Shanel,1,0
4,aa,Special:ListFiles,1,0
5,aa,Special:ListUsers,1,0
6,aa,Special:ListUsers/sysop,1,0
7,aa,Special:Log/,2,0
8,aa,Special:Statistics,1,0
9,aa,User:Shanel,1,0
