### load data from database
pandas vs blaze

#### Task
* query database just like using ORM without writing any SQL statements
* do calc based on the return results
* monitor the memory usage 

#### datasource
single table in remote database



##### 1. using blaze

In [1]:
from blaze import Data

In [2]:
DB_URL = "mysql://%s:%s@%s:%s/%s::secret_txn_tab" % ('secret')
orders = Data(DB_URL)
# noted the warning msg below

Blaze does not understand a SQLAlchemy type.
Blaze provided the following error:
	No SQL-datashape match for type BLOB
Skipping.
Blaze does not understand a SQLAlchemy type.
Blaze provided the following error:
	No SQL-datashape match for type BLOB
Skipping.


when I use blaze in production, I got `ValueError: Unsupported string encoding u'utf8mb4_unicode_ci` error.
the target database using utf8mb4 charset.

In [3]:
order_by_date = orders[(orders.channelid==70000) & (orders.ctime>=1437840000) & (orders.ctime<1437926400)& (orders.status==1)]

In [4]:
int(order_by_date.count())

# the count() method diff from pandas count()

256

In [5]:
int(order_by_date.amount.sum())/100000

# same usage with pandas

212018

In [6]:
order_by_date.userid.distinct().count()

# pandas use drop_duplicates() instead

##### 2. using pandas

In [7]:
from sqlalchemy import create_engine
import pandas as pd

In [8]:
DB_URL = "mysql://%s:%s@%s:%s/%s" % ('secret')
engine = create_engine(DB_URL)

In [9]:
orders = pd.read_sql('select * from secret_txn_tab',
                 con=engine)

# using pandas still need to write the sql statement
# can specify the columns you need in the select statement.

In [10]:
orders.head()

Unnamed: 0,txnid,userid,refund_txnid,checkoutid,type,amount,currency,channelid,status,channel_status,channel_txnid,ip,action_country,ctime,vtime,mtime,memo,extra_data
0,1000000,11183,0,95,0,89000000,THB,70000,0,0,,2224250195,SG,1432303352,0,1432303352,,{}
1,1000001,11184,0,100,0,59000000,THB,71000,0,200,,3669652968,SG,1432307027,0,1432309356,,"{""transfer_fields"": {""name"": ""Liu jing"", ""memo..."
2,1000002,11184,0,104,0,69000000,THB,70000,0,0,,3669652968,SG,1432307426,0,1432307426,,{}
3,1000003,11184,0,104,0,69000000,THB,70000,1,100,4323108605155000001366,3669652968,SG,1432310801,1432310864,1432310864,,"{""card_number"": ""426569xxxxxx3103"", ""auth_code..."
4,1000004,11174,0,111,0,59000000,THB,70000,0,0,,2088405459,TH,1432311210,0,1432311210,,{}


In [11]:
order_today = orders[(orders.channelid==70000) & (orders.ctime>=1437840000) & (orders.ctime<1437926400)& (orders.status==1)]

In [12]:
total_user = int(order_today.userid.drop_duplicates().count())
total_user

#len(order_today.userid.unique()) VS order_today.userid.drop_duplicates().count()
#哪个更快？后面待测

209

In [13]:
total_txn = int(order_today.txnid.count())
total_txn

256

In [14]:
total_amt = int(order_today.amount.sum()) / 500000
total_amt

42403

##### pandas vs blaze
1. the API diffs refer to [blaze website](http://blaze.pydata.org/en/latest/rosetta-pandas.html)
2. can be easily conver using odo or Data()
3. Blaze can simplify and make more readable some common IO tasks that one would want to do with pandas. These examples make use of the odo library. In many cases, blaze will able to handle datasets that can’t fit into main memory, which is something that can’t be easily done with pandas.(but this time I got those charset problems)


In [15]:
def df_size(df):
    """Return the size of a DataFrame in Megabyes"""
    total = 0.0
    for col in df:
        total += df[col].nbytes
    return total/1048576

In [16]:
df_size(orders)

2.9620513916015625

In [17]:
#how many records in this df?
orders.count()

txnid             21569
userid            21569
refund_txnid      21569
checkoutid        21569
type              21569
amount            21569
currency          21569
channelid         21569
status            21569
channel_status    21569
channel_txnid     21569
ip                21569
action_country    21569
ctime             21569
vtime             21569
mtime             21569
memo              21569
extra_data        21569
dtype: int64

In [18]:
# change to a more large dataset
DB_URL = "mysql://%s:%s@%s:%s/%s" % ('secret')
engine = create_engine(DB_URL)
records = pd.read_sql('select * from xx_realtime',
                 con=engine)

In [19]:
df_size(records)

16.8321533203125

In [20]:
records.count()

id          367704
type        367704
date        367704
tick        367704
location    367704
data        367704
dtype: int64

##### load as string? how

In [21]:
DB_URL = "mysql://%s:%s@%s:%s/%s::airpay_daily" % ('secret')
stats = Data(DB_URL)
data = stats[
            (stats.type==4) & (stats.date == '20150726') & (
            stats.location =='TH') & (stats.extra=='Total')]

In [22]:
data.head()

Unnamed: 0,id,type,date,location,extra,data
0,8624,4,2015-07-26,TH,Total,"{""txn_user"": 4147, ""txn_value"": 1895538.140000..."


In [23]:
data[0].data

# seems cannot get the data value like that

In [24]:
# use odo conver blaze object to pandas df
from odo import odo

In [25]:
df = odo(data, pd.DataFrame)

In [26]:
df

Unnamed: 0,id,type,date,location,extra,data
0,8624,4,2015-07-26,TH,Total,"{""txn_user"": 4147, ""txn_value"": 1895538.140000..."


In [27]:
x = df.iloc[0]['data']
x

'{"txn_user": 4147, "txn_value": 1895538.1400000025, "txn_num": 5512}'

In [None]:
json.loads(x)

#### Pandas read_csv()
其实很多时候已经够用了。

In [1]:
log_path = 'data/test.20150927'

In [2]:
import pandas as pd

In [6]:
log = pd.read_csv(log_path, sep='|', header=None, usecols=[3,])
# 日常read_csv最常用的几个参数
# sep：指定数据列间隔的标志
# header：指定第一行是否为列名，一般情况下如果读的是原始log数据，这个会设置成None，要不然默认会把文件的第一行作为列明，从而少了一行数据
# usecols: 读入的时候就能指定自己需要的数据在哪些列，免去了事后操作的麻烦，同时减少内存的不必要使用

CParserError: Error tokenizing data. C error: Expected 4 fields in line 1296, saw 5


In [7]:
# 以上， 原始log的格式不是每一行都是规范的，因此在读的时候有可能出现有毛病的行；
# 例如这个例子就是某些行的列数比其他行多，因而出错

`/var/home/ftp/im_log/80.175/info.log.20150927.23:2015-09-27 23:23:58.898|INFO|user_auth_game_sent_aux|uid=9278794,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=103.1.69.2`


`/var/home/ftp/im_log/80.176/info.log.20150927.00:2015-09-27 00:01:16.929|INFO|0x00000154|user_auth_game_sent_aux|uid=71097840,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=49.144.236.127`

In [8]:
# 上面的第二行是有问题的数据
log = pd.read_csv(log_path, sep='|', header=None, usecols=[3,], error_bad_lines=False)
# 增加新参数
# error_bad_lines=False 遇到错误的行自动忽略
# 这里就涉及到pandas的其中一个坑的，万一第一条log就命中了错误格式的log的话，所以正常的数据都会被视为是错误的

Skipping line 1296: expected 4 fields, saw 5
Skipping line 1297: expected 4 fields, saw 5
Skipping line 1298: expected 4 fields, saw 5
Skipping line 1299: expected 4 fields, saw 5
Skipping line 1300: expected 4 fields, saw 5
Skipping line 1301: expected 4 fields, saw 5
Skipping line 1302: expected 4 fields, saw 5
Skipping line 1303: expected 4 fields, saw 5
Skipping line 1304: expected 4 fields, saw 5
Skipping line 1305: expected 4 fields, saw 5
Skipping line 1306: expected 4 fields, saw 5
Skipping line 1307: expected 4 fields, saw 5
Skipping line 1308: expected 4 fields, saw 5
Skipping line 1309: expected 4 fields, saw 5
Skipping line 1310: expected 4 fields, saw 5
Skipping line 1311: expected 4 fields, saw 5
Skipping line 1312: expected 4 fields, saw 5
Skipping line 1313: expected 4 fields, saw 5
Skipping line 1314: expected 4 fields, saw 5
Skipping line 1315: expected 4 fields, saw 5
Skipping line 1316: expected 4 fields, saw 5
Skipping line 1317: expected 4 fields, saw 5
Skipping l

In [9]:
# 以上， 增加参数后能正常读入原始log
# 每行有问题的log会报警给我们看
log = pd.read_csv(log_path, sep='|', header=None, usecols=[3,], error_bad_lines=False, warn_bad_lines=False)
# 新增参数，可以忽略这些报警
# warn_bad_lines=False

In [10]:
log.columns = ['data']
# 读入的关键数据只在一列，同时要将这一列数据分解成多列

In [11]:
log.head()

Unnamed: 0,data
0,"uid=89331925,client_type=3,appid=100022,redire..."
1,"uid=140220082,client_type=3,appid=100022,redir..."
2,"uid=225503167,client_type=3,appid=100022,redir..."
3,"uid=91462520,client_type=3,appid=100022,redire..."
4,"uid=68544100,client_type=3,appid=100022,redire..."


In [12]:
# 参考 Working with Text Data.ipynb
# expand=True
log = log.data.str.split(',', expand=True)

In [13]:
log.head()

Unnamed: 0,0,1,2,3,4,5
0,uid=89331925,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=180.191.151.126
1,uid=140220082,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=112.202.113.96
2,uid=225503167,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=223.205.76.44
3,uid=91462520,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=103.43.150.138
4,uid=68544100,client_type=3,appid=100022,redirect_uri=gop100022://auth/,scope=,ip=115.66.211.118


#### UTF8 处理
当源数据涉及到其他非英文语言的时候，读取数据的时候最好加上编码信息；

否则在后面处理的时候会各种出现乱码问题。

###### 先看看read_csv的编码设置

encoding : string, default **None**


Encoding to use for UTF when reading/writing (ex. ‘utf-8’).

In [16]:
chinese_file = 'data/Chinese.txt'

In [19]:
raw = pd.read_csv(chinese_file)

In [20]:
raw

Unnamed: 0,����,����.1
0,��ΰ��,60


In [29]:
# 以上，没有设定encoding参数的情况下
raw = pd.read_csv(chinese_file, encoding='utf8')

In [30]:
raw

Unnamed: 0,姓名,分数
0,温伟坤,60


In [27]:
output = 'data/test_chinese_output'
raw.to_csv(output)

UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)

In [28]:
# 以上，不能直接output
# 需要带上编码信息
raw.to_csv(output, encoding='utf8')

In [31]:
raw.columns
# 计算及应用过程都将使用Unicode来进行
# 只在过程的两端使用encode  utf8--Unicode--utf8

Index([u'姓名', u'分数'], dtype='object')

In [36]:
raw.iloc[:,1]

0    60
Name: 分数, dtype: int64

In [37]:
raw.iloc[:,0]

0    温伟坤
Name: 姓名, dtype: object

In [38]:
type(raw.iloc[:,0])

pandas.core.series.Series

##### 从DB中读数据的编码问题
这里需要结合sqlalchemy的编码设置,因为pandas依赖sqlalchemy

In [39]:
from sqlalchemy import create_engine

在URL后加入


`?charset=utf8`

参考：

In [None]:
def get_app_df():
    DB_URL = "mysql://%s:%s@%s:%s/%s?charset=utf8" % (APP_DB['user'], APP_DB['psw'], APP_DB['host'], APP_DB['port'], APP_DB['name'])
    engine = create_engine(DB_URL)
    df = pd.read_sql('select app_id, app_name from %s ' % 'app_tab', con=engine)
    return df