In [38]:
# coding: utf-8
# encoding：utf-8
import numpy as np
import pandas as pd
import uuid
import time 
import random
import logging

LOG_FORMAT = "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d]- %(message)s"
logging.basicConfig(filename='TD2Triple_python.log', level=logging.INFO, format=LOG_FORMAT)
 
    
def getUid():
    code = str(uuid.uuid1())
    abc = code.split('-')
    delimiter = ''
    code = delimiter.join(abc) 
    return code

sub = []
pre = []
obj = []

def tupleInsertBatch(uid, value, attr_dict, attr):
    for i in range(len(value)):
        sub.append(uid)
        pre.append(attr_dict[attr[i]])
        obj.append(value[i])
        
def tupleInsert(subject, predicate, objectx):
    sub.append(subject)
    pre.append(predicate)
    obj.append(objectx)

try:
    td81_data= pd.read_csv('td81.csv', header = 0)
    cust=pd.read_csv('customer.csv', header = 0)
except IOError:
    logging.error('',exc_info=True)
#格式化
if td81_data['Cust_Id'].dtypes != int:
    td81_data['Cust_Id']=td81_data['Cust_Id'].astype(int)
if td81_data['Rpt_Id'].dtypes != int:
    td81_data['Rpt_Id']=td81_data['Rpt_Id'].astype(int)
if td81_data['End_Tm_Val'].dtypes != float:
    td81_data['End_Tm_Val']=td81_data['End_Tm_Val'].str.replace(",","").astype(float) 
    
cust.columns = ["Cust_Id","name","idt","risk_id","rate"]
prefix= '<http://www.cmbchina.com/bigdata/financial_risk#'
pre_type = '<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>'

        

############校验
#创建异常类并初始化 
class DatapreproError(Exception):
    def __init__ (self,err):
        Exception.__init__(self,err)
        self.err=err 
        
#raw data check 
#TD中去除（lia/ass）不完整的cust数据。本每个cust用该至少有2行
k=0
print td81_data.shape[0]
check_cust=td81_data.duplicated('Cust_Id',keep=False) 
TM=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
for i in td81_data.index.tolist():
    try:
        if check_cust[i]!=True:# true means repeat
            k=k+1
            raise DatapreproError('error')
        else:
            pass
    except DatapreproError:
        logging.error('Raw data row No. %d , "Cust_Id" info is incomplete'%i)
        with open ('Error_data_record.txt','a') as f:
            c=td81_data.loc[i,'Cust_Id'].astype(str)
            r=td81_data.loc[i,'Rpt_Id'].astype(str)
            f.write(TM+'\t'+"cust_id: "+c+"\t"+"rep_id: "+r+"\n")
        td81_data=td81_data.drop([i])#drop会和index 一起删除掉 
#end
td81_data=td81_data.reset_index(drop=True)
with open ('Error_data_record.txt','a') as f:#统计
    f.write(TM+"\t"+"Raw data available: "+str(td81_data.shape[0])+"\t" +"Error: "+str(k)+"\n")
print td81_data.shape[0]
    
    
####################（0）
#########pre-process
#TD数据处理
#表一
td_1=td81_data[td81_data.Row_Id_Nm.isin(['负债合计'])]
td_1=td_1.reset_index(drop=True)
td_1_result= td_1.sort_values(by=['Cust_Id','Rpt_Id'])
td_1['tot_lia']=td_1_result['End_Tm_Val']#负债合计
#表二
td_2=td81_data[td81_data.Row_Id_Nm.isin(['负债和所有者权益']or['负债和所有者权益()'])]
td_2=td_2.reset_index(drop=True)
td_2_result= td_2.sort_values(by=['Cust_Id','Rpt_Id'])# cust_id and rpt_id 组成唯一序号。
td_2['tot_ass']=td_2_result['End_Tm_Val']#负债和所有者权益 

#生成新表
td= pd.concat([td_1, td_2['tot_ass']],axis=1)
#新表校验
ary_rpt=np.array(td_1['Rpt_Id']-td_2['Rpt_Id']).T
ary=np.array(td_1['Cust_Id']-td_2['Cust_Id']).T
k=0
try :
    for i in range(ary.shape[0]):
        if ary[i]!=0 or ary_rpt[i]!=0:
            k=k+1
            raise DatapreproError('The data (tot_lia and tot_ass) splitting  from TD table failed at row %d'%i)           
except DatapreproError:
    if k>10:
        logging.error('TD data issue,please reload TD data')
        raise DatapreproError('TD data issue,please reload TD data...')
    else:
        td=td.drop([i])
        logging.info('remove row %d'%i,exc_info=True)

logging.info('Table is ready for calculation') 

td['priority']='a'
td.loc[td['Rpt_Clb_Cd']=="合并",'priority']='2'
td.loc[td['Rpt_Clb_Cd']=="汇总",'priority']='1'
td.loc[td['Rpt_Clb_Cd']=="本部",'priority']='0'
td.loc[td['Aud_Ind']=="是",'priority']=td['priority'] + '1'
td.loc[td['Aud_Ind']=="否",'priority']=td['priority'] + '0'
td.loc[td['Rpt_Afl_Mdl']=="new",'priority']=td['priority'] + '1'
td.loc[td['Rpt_Afl_Mdl']=="old",'priority']=td['priority'] + '0'
td['priority']=td['priority'].astype(int)

td = td.loc[td.groupby('Cust_Id')['priority'].idxmax()].reset_index()
del td['priority']

#取max即计算负债率，校验
rel=td.duplicated('Cust_Id',keep=False)
try:
    for i in td.index.tolist():
        if rel[i]==True:#True means that there is repeat Cust_Id
            raise DatapreproError('The data for calculation error at row %d'%i)  
except DatapreproError:
    td=td.drop([i])
    logging.error('idxmax func error. drop row No.%d'%i,exc_info=True)
    
td=td.reset_index(drop=True)
logging.info('Max priority selecting check done')  
    
tsf_lia=td['tot_lia']
tsf_ass=td['tot_ass']
td['la_rate']=tsf_lia/tsf_ass#25
logging.info('liabilities ratio calculating done') 

td['la_rate']=td['la_rate'].astype(float)
td['tot_lia']=tsf_lia 
td['tot_ass']=tsf_ass
td['uidRpt']='a'#26
td['uidLia']='a'
td['uidAss']='a'
td['uidLarate']='a'#29
try: 
    column=list(td.columns)
    if column.index('uidRpt')!=26:
        raise Exception #for logger
    else:
        pass
except:
    logging.error('Column mismatch when add uid for Rpt',exc_info=True)
    raise Exception #for console

for i in range(td.shape[0]):
    for j in range(26, 30):#26-29
        code = getUid()
        td.iloc[i,j] = code   
Fina=td


#######################（1）
#Industry 对象构建
try:
    idtcd=prefix + 'Idt_Cd>'
    idtnm=prefix + 'Idt_Nm>'
    idt_clea=cust['idt']
    idt_clea=idt_clea.drop_duplicates()
    idt_clea=idt_clea.reset_index(drop=True)
    N=idt_clea.shape[0]
        
    temp=pd.DataFrame(np.random.rand(N,1))
    temp['idt_cd'],temp['idt_nm']=idt_clea.str.split(':',1).str
    del temp[0]
    temp["uidIdt"] ='a'  
    for i in range(N):
        uid = prefix + str(code) + '>'
        temp.iloc[i,2] = code
        tupleInsert(uid, pre_type, prefix + 'IronSteel>')
        tupleInsert(uid,idtcd ,  '"'+temp.iloc[i,0]+'"') 
        tupleInsert(uid, idtnm, '"'+temp.iloc[i,1]+'"')
    logging.info('industry creating done') 
except:
    logging.info('indursty object error. Detial',exc_info=True)
    raise

#########################（2）
#创建customer 对象
try:    
    cust["uidCust"] = 'a' 
    for i in range(cust.shape[0]):
        code = getUid()
        cust.iloc[i,5] = code
#属性property
    Cust_Id = prefix + 'Cust_Core_Id>'
    risk_id = prefix + 'Cust_Rsk_Id>'
    idt = prefix + 'Cust_Idt_Typ>'
    name = prefix + 'Cust_Nm>'
    rate = prefix + 'Cust_Rat>'
#关系relation
    hasIdtTyp = prefix + 'hasIdtTyp>'
    attr_dict = {'Cust_Id' : Cust_Id, 'risk_id' : risk_id, 'idt' : idt, 'name' : name, 'rate' : rate}
    attr = ['Cust_Id', 'name', 'risk_id', 'rate','idt']
    for i in range(cust.shape[0]):
        uid = prefix + str(cust.iloc[i,5]) + '>' 
        sub.append(uid)
        pre.append(pre_type)
        obj.append(prefix + 'Customer>')
    #生成属性
        value_df = cust.iloc[i,[0,1,3,4]]
        value=value_df.astype(str)
        tupleInsertBatch(uid, '"'+ value +'"', attr_dict, attr)         
    #生成关系hasIdtTyp
        idt= cust.iloc[i,2][0:6]
        RowN= temp[(temp['idt_cd']==idt)].index.tolist()
        R=RowN[0]
        tupleInsert(uid, hasIdtTyp, prefix + temp.iloc[R,2] + '>')
    logging.info('customer creating done')            
except:
    logging.info('curstomer object error. Detial',exc_info=True)
    raise
    
#############################（3）
#创建对象Finatial_report,
try:
    typ = prefix + 'Rpt_Typ_Cd>' 
    aud = prefix + 'Rpt_Aud_Ind>'
    clb = prefix + 'Rpt_Clb_Cd>'
    rpt_id=prefix + 'Rpt_Id>'
    dt = prefix + 'Rpt_Dt>'
    prd = prefix + 'Rpt_Prd_Typ_Cd>'
    updt = prefix + 'Rpt_Upd_Dt>'

    rpt_bal = Fina[['Rpt_Id','Rpt_Clb_Cd', 'Rpt_Prd_Typ_Cd', 'Rpt_Dt', 'Aud_Ind', 'Upd_Dt', 'Rpt_Afl_Mdl','uidRpt']]
    attr_dict = {'type':typ, 'aud':aud, 'clb':clb, 'dt':dt, 'prd':prd, 'updt':updt, 'rpt_id':rpt_id}
    attr = ['rpt_id','clb', 'prd', 'dt', 'aud', 'updt', 'type']       
    for i in range(rpt_bal.shape[0]):
        uid = prefix + str(rpt_bal.iloc[i,7]) + '>'
        sub.append(uid)
        pre.append(pre_type)
        obj.append(prefix + 'Balance_Sheet>')
        value_df = rpt_bal.iloc[i,0:7] 
        value=value_df.astype(str)
        tupleInsertBatch(uid, '"'+ value +'"', attr_dict, attr)

    df1 = cust[['Cust_Id', 'uidCust']]
    df2 = Fina[['Cust_Id', 'uidRpt']]
    rel = pd.merge(df1, df2)
    for i in range(rel.shape[0]):
        subject = prefix + str(rel.iloc[i,1]) + '>'
        predicate = prefix + 'hasFinRpt>'
        objectx = prefix + str(rel.iloc[i,2]) + '>'
        tupleInsert(subject, predicate, objectx)
    logging.info('financial report creating done') 
except:
    logging.info('report object error. Detial',exc_info=True)
    raise
    
##############################（4）
#创建对象Financial_Subject
try:
    end_val =prefix +'Fin_Sub_End_Tm_Val>'
    sub_name =prefix +'Fin_Sub_Nm>'
    hasSub = prefix + 'hasFinSub>'
    sub_bal = Fina[['tot_lia', 'tot_ass', 'la_rate', 'uidRpt', 'uidLia', 'uidAss', 'uidLarate']]       
    for i in range(sub_bal.shape[0]):
        fin_sub = prefix + 'Financial_Subject>'
        uidlst = []
        for j in range(3,7):
            uid = prefix + str(sub_bal.iloc[i,j]) + '>'
            uidlst.append(uid)
        for k in range(1,4):#
            tupleInsert(uidlst[0], hasSub, uidlst[k])
            tupleInsert(uidlst[k], pre_type, fin_sub)
            if k==1:
                tupleInsert(uidlst[k], sub_name, '"'+'负债合计'+'"')
                sub_tot_lia=sub_bal.loc[i, 'tot_lia'].astype(str)
                tupleInsert(uidlst[k], end_val,'"'+ sub_tot_lia +'"')
            if k==2:
                tupleInsert(uidlst[k], sub_name,'"'+'负债所有者权益'+'"')
                sub_tot_ass=sub_bal.loc[i, 'tot_ass'].astype(str)
                tupleInsert(uidlst[k], end_val, '"'+ sub_tot_ass+'"')
            if k==3:
                tupleInsert(uidlst[k], sub_name, '"'+'资产负债率'+'"')
                sub_la_rate=sub_bal.loc[i, 'la_rate'].astype(str)
                tupleInsert(uidlst[k], end_val, '"'+sub_la_rate +'"')
    logging.info ('financial subject creating done') 
except:
    logging.infor('report_subject error. Detial:',exc_info=True) 
    raise
    
#################（4）
#输出文件
tuple_pre= pd.DataFrame([sub, pre, obj])
final= tuple_pre.T
final['end'] = '.'
N_triple=tuple_pre.shape[1]
logging.info('Total %d Triples'%N_triple)
try:
    final.to_csv('TD2Triple.csv', index = False, sep = ' ',quotechar="'", header = False, encoding='utf-8')
except IOError:
    logging.error('',exc_info=True)
    raise
else:
    logging.info ('output file... \nEnd!')

256
256
