# NEWS 파일(json)을 Bigquery에 저장

In [121]:
import os
import sys
import json
import requests
import pickle
import gzip
import pandas as pd
import pandas_gbq as gbq
from pathlib import Path
from google.oauth2 import service_account
from google.cloud import bigquery
from IPython.core.debugger import set_trace
from tqdm.auto import tqdm
tqdm.pandas()

## Configuration

In [122]:
proj = 'global-news-crawl'
table_downloaded = 'news_dataset.downloaded'
table_trashed = 'news_dataset.trashed'
credentials = service_account.Credentials.from_service_account_file('global-news-crawl-c48d7cd9aa81.json')

## Bigquery에서 기존 뉴스 ID 리스트 받기

In [129]:
%%time
qry_0 = 'SELECT id FROM `{}`'.format(proj + '.' + table_downloaded)
ids_downloaded = gbq.read_gbq(qry_0, project_id=proj, credentials=credentials)#, use_bqstorage_api=True)

Wall time: 15 s


In [130]:
%%time
qry_1 = 'SELECT id FROM `{}`'.format(proj + '.' + table_trashed)
ids_trashed = gbq.read_gbq(qry_1, project_id=proj, credentials=credentials)

Wall time: 4.71 s


In [131]:
ids_downloaded_set = set(ids_downloaded.id)
ids_trashed_set = set(ids_trashed.id)

In [132]:
assert len(ids_downloaded) == len(ids_downloaded_set), 'duplicated in downloaded'
assert len(ids_trashed) == len(ids_trashed_set), 'duplicated in trashed'
assert len(ids_downloaded_set & ids_trashed_set) == 0, 'ids overlapping'

In [133]:
newsids = ids_downloaded_set | ids_trashed_set; len(newsids)

310547

In [134]:
len(ids_downloaded_set), len(ids_trashed_set)

(293203, 17344)

## 파일에서 뉴스정보 추출
나중에는 필요없는 과정이다. 다운로드 받은 즉시 바로 Bigquery에 전송할 것이므로

In [135]:
def extract_contents(newsids=None, where=None, n=1000):
    df = {}
    _n = 0
    
    for file in Path(where).glob('**/*.json'):
        id = file.stem
        
        if id not in newsids:
            try:
                js = json.loads(file.read_text())
            
                if 'authors' in js:
                    js['authors'] = ', '.join(js['authors'])

                df[id] = js

                _n += 1
                print('\r{}'.format(_n), end='')
                if _n == n: break
                    
            except:
                print(file)
           
    df = pd.DataFrame.from_dict(df, orient='index')
    df.index.name = 'id'
    return df.reset_index()

In [136]:
df_downloaded = extract_contents(newsids=newsids, where='newsdata/downloaded', n=50000); print('\n')
df_trashed = extract_contents(newsids=newsids, where='newsdata/trashed', n=30000)

353

328

## downloaded와 trashed 간에 겹치는 게 없는지 확인
기존의 뉴스파일을 Bigquery에 전송하는 과정에서, 이 둘간에 겹치는 사례가 종종 있었다
(2019.10.18)

In [137]:
intersect = set(df_trashed.id) & set(df_downloaded.id)
assert len(intersect) == 0, 'ids overlapping'

## Bigquery에 업로드

In [138]:
gbq.to_gbq(df_downloaded, table_downloaded, project_id=proj, if_exists='append', chunksize=1000, credentials=credentials)
gbq.to_gbq(df_trashed, table_trashed, project_id=proj, if_exists='append', chunksize=1000, credentials=credentials)

1it [00:06,  6.75s/it]
1it [00:04,  4.22s/it]


In [141]:
dict([('a',1,5), ('b',2,5), ('c',3,7)])

ValueError: dictionary update sequence element #0 has length 3; 2 is required

# 아래 코드를 실험해봐야한다. __init__ 까지는 확인됨

In [39]:
proj = 'global-news-crawl'
table_downloaded = 'news_dataset.downloaded'
table_trashed = 'news_dataset.trashed'
credentials = service_account.Credentials.from_service_account_file('global-news-crawl-c48d7cd9aa81.json')

localpath_to_downloaded = 'newsdata/downloaded'
localpath_to_trashed = 'newsdata/trashed'


class Recorder:
    def __init__(self, storage='local'):
        self.storage = storage
        self.ids = self._get_ids(storage)

    def _query_ids_from_bigquery(self, tb):
        qry = 'SELECT id FROM `{}`'.format(proj + '.' + tb)
        return gbq.read_gbq(qry, credentials=credentials).id
        
    def _retreive_ids_from_local(self, path):
        return [p.stem for p in Path(path).glob('**/*.json')]
        
        
    def _get_ids(self, storage):
        print('checking ' + storage + ' storage... ', end='')

        if storage == 'bigquery':
            ids_downloaded = self._query_ids_from_bigquery(table_downloaded)
            ids_trashed = self._query_ids_from_bigquery(table_trashed)

        elif storage == 'local':
            ids_downloaded = self._retreive_ids_from_local(localpath_to_downloaded)
            ids_trashed = self._retreive_ids_from_local(localpath_to_trashed)

        ids_downloaded_set = set(ids_downloaded)
        ids_trashed_set = set(ids_trashed)
        
        if len(ids_downloaded) != len(ids_downloaded_set):
            '''downloaded articles의 uniqueness'''
            raise self.DuplicatesInSingleTable('duplicated in downloaded')

        if len(ids_trashed) != len(ids_trashed_set):
            '''trashed articles의 uniqueness'''
            raise self.DuplicatesInSingleTable('duplicated in trashed')

        if len(ids_downloaded_set & ids_trashed_set) != 0:
            '''downloaded와 trashed 간의 uniqueness'''
            raise self.DuplicatesBetweenTwoTables('duplicated between downloaded and trashed')
    
        ids = ids_downloaded_set | ids_trashed_set
        
        print('done')
        print('we have total {} articles ({} downloaded, {} trashed)'.format(len(ids), len(ids_downloaded_set), len(ids_trashed_set)))
        return ids


    def has(self, id):
        return id in self.ids


    def update(self, downloaded=None, trashed=None, chunksize=1000, subdir_len=3):
        '''
        downloaded or trashed = {
            id0: {...}, 
            id1: {...}, 
            ...
        }
        '''
        if self.storage == 'bigquery':
            self._update_bigquery('downloaded', downloaded, chunksize)
            self._update_bigquery('trashed', trashed, chunksize)
        
        elif self.storage == 'local':
            self._update_local('downloaded', downloaded, subdir_len)
            self._update_local('trashed', trashed, subdir_len)
            

    def _update_local(self, newstype, newsdict, subdir_len):
        if newsdict is not None:
            if newstype == 'downloaded':
                path = localpath_to_downloaded
            elif newstype == 'trashed':
                path = localpath_to_trashed
            
            for id, article in newsdict.items():
                if newstype == 'downloaded':
                    _dir = Path(path)
                elif newstype == 'trashed':
                    _dir = Path(path + '/' + id[:subdir_len])
                    _dir.mkdir(parents=True, exist_ok=True)
                
                fname = id + '.json'
                fpath = _dir / fname
                with fpath.open('w') as f:
                    json.dump(article, f)


    def _update_bigquery(self, newstype, newsdict, chunksize):
        if newsdict is not None:
            if newstype == 'downloaded':
                tb = table_downloaded
            elif newstype == 'trashed':
                tb = table_trashed

            df = pd.DataFrame.from_dict(newsdict, orient='index')
            df.index.name = 'id'
            gbq.to_gbq(df.reset_index(), tb, project_id=proj, if_exists='append', chunksize=chunksize, credentials=credentials)
            

    class DuplicatesInSingleTable(Exception):
        pass

    class DuplicatesBetweenTwoTables(Exception):
        pass

In [31]:
recorder = Recorder(storage='bigquery')

checking bigquery storage... done
we have total 446750 articles (275619 downloaded, 171131 trashed)


In [55]:
recorder.has('51331d7eb195e41b4d9cd1d4b73c7c4ecb8526e0')

False

In [37]:
for src in tqdm(Path('newsdata/downloaded').glob('**/*.json')):
    p_dest = Path('newsdata/downloaded2/' + src.name[:3])
    p_dest.mkdir(parents=True, exist_ok=True)
    dest = p_dest / src.name
    dest.write_text(src.read_text())

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))


