In [1]:
# -*- coding: utf-8 -*-
"""
Created on Mon Mar  9 10:17:11 2020

@author: lailai_tvbs
"""
import datetime
from apiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials as Credentials
import pandas as pd
import math

import time
import re
import sys
sys.path.insert(1, '/Users/USER/Desktop/GA_firebase/code/warehouse/')
from db_connect import connect_sql_gcp


In [2]:
#控制篩選資料的日期
def datetime_control(datetotal, datediff):
    today = datetime.date.today()
    batch_datetime = today + datetime.timedelta(days = -datetotal) + datetime.timedelta(days = datediff)
    date = str(batch_datetime)
    return date



In [3]:

#抓Google API的憑證
def get_service():
    creds = Credentials.from_json_keyfile_name('/Users/USER/Desktop/GA_firebase/code/warehouse/GA_Api_for_Intern_New.json', scopes = ['https://www.googleapis.com/auth/analytics.readonly'])
    return build(serviceName='analyticsreporting', version='v4', credentials=creds)



#用Google API憑證抓GA資料
def get_report(service, view_id, date, dimensions, next_page):
    return service.reports().batchGet(
            body={'reportRequests': [{'viewId': view_id,'dateRanges': [{'startDate': date, 'endDate': date}],
                  'metrics':[{'expression':"ga:pageviews"}], #[{'expression': m} for m in metrics]
                  'dimensions': [{'name': d} for d in dimensions],
                  'pageSize': "100000",
                  'pageToken': str(next_page)
                }]
            }).execute()
    


In [4]:
#將GA資料轉成資料集
def to_dataframe(result, date):
    report = result['reports'][0]
    dimensions = report['columnHeader']['dimensions']
    metrics = [m['name'] for m in report['columnHeader']['metricHeader']['metricHeaderEntries']]
    headers = [*dimensions, *metrics]
    data_rows = report['data']['rows']
    tmp_data = []
    for row in data_rows:
        tmp_data.append([*row['dimensions'], *row['metrics'][0]['values']])
    tmp_data = pd.DataFrame(data=tmp_data, columns=[w.replace('ga:', '') for w in headers])
    tmp_data["date"] = date
    return tmp_data

In [5]:
#每次從GA撈100000筆，依序合併
def combine_data(service, proj_name, proj_id, date, dimensions):
    #view_name = view_list[0]; view_id = view_list[1]
    rows = 100000
    next_page = 0
    df_daily = pd.DataFrame()
    while True:
        result = get_report(service, proj_id, date, dimensions, next_page)
        row_num = len(result['reports'][0]['data']['rows'])
        if(row_num < rows):
            df = to_dataframe(result, date)
            df_daily = df_daily.append(df)
            break
        else:
            df = to_dataframe(result, date)
            df_daily = df_daily.append(df)
            next_page += rows

    return df_daily



In [6]:
#刪除emoji
def remove_emoji(df_daily):
    for filt in [x for x in df_daily.columns.tolist() if x not in ('pageviews','date')]:
        df_daily[filt] = df_daily[filt].astype(str).apply(lambda x : re.sub(u"[《！？｡，＂＃＄％＆＇（）＊＋－／：；＜＝＞＠［＼］＾＿｀｛｜｝～｟｠｢｣､、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘'‛“”„‟…‧﹏]+", " ", x))
        df_daily[filt] = df_daily[filt].astype(str).apply(lambda x : ''.join(re.compile(r"[\_\:\/\?\.\-\(\)a-zA-Z0-9\u4e00-\u9fa5]").findall(x)))

In [7]:
#將處理好的data塞回GCP
def insert_data(db_name, table_name, df_daily):
    delete_data(db_name, table_name, df_daily)
#    remove_emoji(df_daily)
    batch_row = 100000
    if len(df_daily) > batch_row:
        for r in range(0,math.ceil(len(df_daily)/batch_row)):
            data = []
            for t,row in df_daily[r*batch_row:(r+1)*batch_row].iterrows():
                data.append(tuple(row))
            to_mysql(db_name, table_name, df_daily, data)
    else:
        data = []
        for t,row in df_daily.iterrows():
            data.append(tuple(row))
        to_mysql(db_name, table_name, df_daily, data)


In [8]:
#刪除GCP舊有的資料
def delete_data(db_name, table_name, df_daily):
    delete = '';select = ''
    delete = "DELETE FROM %s.%s WHERE date = '%s'" % (db_name, table_name, max(df_daily['date']))
    select = "SELECT count(*) FROM %s.%s WHERE date = '%s'" % (db_name, table_name, max(df_daily['date']))
    conn, cur = connect_sql_gcp(db_name)
    cur.execute("LOCK TABLES %s.%s WRITE" % (db_name, table_name))
    cur.execute("START TRANSACTION")
    cur.execute(delete)
    cur.execute("UNLOCK TABLES")
    conn.commit()
    cur.execute(select);cnt = cur.fetchone()
    if cnt[0] > 0:
        print("delete not complete!")
    cur.close()
    conn.close()


In [13]:
#將資料塞回GCP
def to_mysql(db_name, table_name, df_daily, data):
    conn, cur = connect_sql_gcp(db_name)
    cols = ",".join([str(col) for col in df_daily.columns.tolist()])
    insert = "REPLACE INTO " + db_name + '.' + table_name + " (" +cols + ") VALUES (" + "%s,"*(len(tuple(df_daily))-1) + "%s)"
    cur.executemany(insert, data)
    conn.commit()
    cur.close()
    conn.close()


In [10]:
def case(d):
    global db_name, service, totaldate, proj_name, proj_id, table_name, dimensions
    date = datetime_control(totaldate, d)
    try:
        df_daily = combine_data(service, proj_name, proj_id, date, dimensions)
        insert_data(db_name, table_name, df_daily)
        print("table : %s  ;  date : %s  ;  project : %s  ;  nrow : %s" % (table_name, date, proj_name, str(len(df_daily))))
    except Exception as e:
        print("table : %s  ;  date : %s  ;  project : %s  ;  error : %s " % (table_name, date, proj_name, str(e)))


In [11]:
if __name__ == '__main__':
    #紀錄開始時間
    tStart = time.time()
    #要寫入的DB
    db_name = 'TVBS_intern'
    service = get_service()
    proj_name = "Supertaste(Web+EC_CrossDomain)" #"Woman"
    proj_id = "202617739" #"72760850"
    table_name = "GA_SourceMedium_Supertaste" #GA_Pageviews_Supertaste
    dimensions = ["ga:source", "ga:medium"] #[ga:pagePath]
    totaldate = 19
    case(1)
    tEnd = time.time()
    print (str((tEnd - tStart)/60) + 'min')

table : GA_SourceMedium_Supertaste  ;  date : 2020-06-20  ;  project : Supertaste(Web+EC_CrossDomain)  ;  nrow : 388
0.07680338621139526min


In [12]:
if __name__ == '__main__':
    #紀錄開始時間
    tStart = time.time()
    #要寫入的DB
    db_name = 'XXXXX'
    service = get_service()
    proj_name = "TVBS" 
    proj_id = "72595640" 
    table_name = "GA_Pageviews_Supertaste" 
    dimensions = ["ga:pagePath"] 
    totaldate = 20
    case(1)
    tEnd = time.time()
    print (str((tEnd - tStart)/60) + 'min')
# i=4;l=999;d=3;db_name = 'WarehouseServer';table_name = view[i][2];project_id = view[i][1];project_name = view[i][0]

table : GA_Pageviews_Supertaste  ;  date : 2020-06-19  ;  project : TVBS  ;  nrow : 285542
49.78874289194743min
