In [1]:
from bs4 import BeautifulSoup
import requests as re
import pandas as pd
import pandas.io.sql as sqlio
import numpy as np
import os
import io
import csv
from datetime import datetime
import logging
logging.getLogger().setLevel(logging.INFO)

# https://cloud.google.com/sql/docs/postgres/connect-external-app#languages
import psycopg2
conn = psycopg2.connect(user='airflow', password='Xypherium-0',
                        dbname='jpstat',
                        host='35.224.240.50')

from google.cloud import storage
bucket_name = "i-agility-212104.appspot.com"
mdir = os.path.join('extracts', 'monthly_reports')

In [None]:
#def files_review_task():
    

#Retrieve all months currently available 
main_table = []
#url = input("http://www.e-stat.go.jp/SG1/estat/OtherListE.do?bid=000001006005&cycode=1")
url = "http://www.e-stat.go.jp/SG1/estat/OtherListE.do?bid=000001006005&cycode=1"
path='https://www.e-stat.go.jp'
r = re.get(url)
data = r.text
year='None'
soup = BeautifulSoup(data, "lxml")
table = soup.find('div', {'class': 'stat-cycle_sheet'})
for year_section in table.find_all('ul', {'class': 'stat-cycle_ul_other'}):
    header = year_section.find('li', {'class': 'stat-cycle_header'})
    year = header.find('span').get_text(' ', strip=True)
    #   print('year changed to ' + year)
    months_section = year_section.find('li', {'class': 'stat-cycle_item'})
    for month_row in months_section.find_all('div'):
        month_a = month_row.find('a')
        month = month_a.get_text().rstrip('.\n')
        monthurl = path + month_a.get('href')

        main_table.append({
            'year': year,
            'month': month,
            'url': monthurl
        })

top_df = pd.DataFrame(main_table)

logging.info('DataFrame of months generated: ' + str(top_df.size) + ' months available, from ' 
    + top_df.loc[len(top_df)-1,'year'] + ' ' + top_df.loc[len(top_df)-1,'month'] + ' to ' 
    + top_df.loc[0,'year'] + ' ' + top_df.loc[0,'month'])



In [None]:
retrieve_month_excels_starttime = datetime.utcnow()
logging.info('Starting to retrieve urls of every excel sheet at ' + str(retrieve_month_excels_starttime) + ' UTC.')

excels_df = pd.concat([retrieve_month_excels(murl, path) for murl in top_df['url']])
excelref_df = top_df.merge(excels_df, how='left', on='url')

logging.info('Excel URLs retrieved in ' + str(datetime.utcnow() - retrieve_month_excels_starttime) + '.')
logging.info(str(len(excelref_df)) + ' files to retrieve. Writing table to database...')

In [None]:
tempdf = excelref_df.copy()
tempdf['updated_on'] = retrieve_month_excels_starttime.strftime('%Y-%m-%d')
tempdf.head()

In [None]:
sql = "SELECT year, month, url, excel_num, excel_description, excel_url FROM public.jpstat_excel_urls;"
curr_table_df = sqlio.read_sql_query(sql, conn)
curr_table_df.head()

In [None]:
disjoint_df = pd.concat([curr_table_df, excelref_df]).drop_duplicates(keep=False, subset=['excel_url'])
new_files_df = pd.merge(disjoint_df, excelref_df, how='inner')
new_files_df = new_files_df[curr_table_df.columns]
new_files_df

In [None]:
#Convert DataFrame to stream and upload to PostgreSQL table on Google Cloud SQL
cur = conn.cursor()
excelurl_textstream = io.StringIO()
upload_df = new_files_df.copy()
upload_df['excel_description'].replace(['\n', '\t'], '', regex=True, inplace=True)

upload_df.to_csv(excelurl_textstream, sep='\t', header=False, index=False, quoting=csv.QUOTE_NONE)
excelurl_textstream.seek(0) 
cur.copy_from(excelurl_textstream, 'jpstat_excel_urls', null="") # null values become ''
conn.commit()

logging.info('Excel URL database table updated.')

In [None]:
"""
Returns a dataframe of details on excel sheets of data available for a given url
for a particular month
"""
def retrieve_month_excels(murl, path):
    mcols = ['url', 'excel_num', 'excel_description', 'excel_url']
    mdf = pd.DataFrame(columns=mcols)
    mr = re.get(murl)
    mdata = mr.text
    msoup = BeautifulSoup(mdata, "lxml")
    mtable = msoup.find('div', {'class': 'stat-dataset_list-body'})
    for row in mtable.find_all('article', {'class': 'stat-dataset_list-item'}):
        excel_num = row.find('li', {'class': 'stat-dataset_list-detail-item stat-dataset_list-border-top'}).contents[0].replace('\n','')
        excel_description = row.find('a').contents[0]
        excel_url = ''
        excel_a = row.find_all('a')[1]
        
        if(excel_a['data-file_type'] == 'EXCEL'):
            excel_url = path + excel_a['href']
        mdfrow = pd.DataFrame([[murl, excel_num, excel_description, excel_url]], columns=mcols)
        if(len(mdf)==0):
            mdf = mdfrow
        else:
            mdf = mdf.append(mdfrow, ignore_index=True) #why the hell doesn't df.append work inplace?? Didn't it always use to?
    logging.info("Retrieved excel URLs from month-URL: " + murl + '.')
    return(mdf)

In [3]:
cwd = os.getcwd()
logging.info('Now working in ' + cwd)

if not os.path.exists(mdir):
    os.makedirs(mdir)
logging.info('Saving files to ' + os.path.join(cwd, mdir))

sql = "SELECT year, month, url, excel_num, excel_description, excel_url FROM public.jpstat_excel_urls;"
curr_table_df = sqlio.read_sql_query(sql, conn)

# Building a dictionary of month names in a format more suitable for dirnames
months = curr_table_df.month.unique() #do not sort, retain the order
monthnum = list(range(1,13,1))
monthdirs = [str(num).zfill(2) + month for num,month in zip(monthnum, months)]
monthnamedict = dict(zip(months, monthdirs))

"""
List files already on google cloud storage, and compare them with curr_table_df. 
If files don't already exist on google cloud storage, download them and upload them to gcs
"""
existing_files_list = list_xls_blobs(bucket_name)
print(len(existing_files_list))
print(str(existing_files_list[:4]))

INFO:root:Now working in C:\Users\friedemann.ang\Documents\repos\jpstat
INFO:root:Saving files to C:\Users\friedemann.ang\Documents\repos\jpstat\extracts\monthly_reports


30
['extracts/monthly_reports/2018/01Jan/1.xls', 'extracts/monthly_reports/2018/01Jan/2.xls', 'extracts/monthly_reports/2018/01Jan/3-1.xls', 'extracts/monthly_reports/2018/01Jan/3-2.xls']


In [2]:
def list_xls_blobs(bucket_name):
    """Lists all the blobs in the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    filelist = []
    blobs = bucket.list_blobs()
    
    for blob in blobs:
        path = str(blob.name)
        if(path[-1:] != '/'):
            filelist.append(blob.name)
    
    return filelist

In [4]:
curr_table_df.head()

Unnamed: 0,year,month,url,excel_num,excel_description,excel_url
0,2018,Jan,https://www.e-stat.go.jp/en/stat-search/files?..,0,"Number of Intra-prefectural Migrants, In-migr....",https://www.e-stat.go.jp/en/stat-search/file-d...
1,2018,Jan,https://www.e-stat.go.jp/en/stat-search/files?...,1,"Number of Intra-prefectural Migrants, In-migra...",https://www.e-stat.go.jp/en/stat-search/file-d...
2,2018,Jan,https://www.e-stat.go.jp/en/stat-search/files?...,2,Number of Inter-prefectural Migrants by Sex an...,https://www.e-stat.go.jp/en/stat-search/file-d...
3,2018,Jan,https://www.e-stat.go.jp/en/stat-search/files?...,3-1,Number of In-migrants from Other Prefectures b...,https://www.e-stat.go.jp/en/stat-search/file-d...
4,2018,Jan,https://www.e-stat.go.jp/en/stat-search/files?...,3-2,Number of Out-migrants to Other Prefectures by...,https://www.e-stat.go.jp/en/stat-search/file-d...


In [5]:
curr_table_df['path'] = mdir + '/' + curr_table_df['year'] + '/' + curr_table_df['month'].map(monthnamedict) + '/' + curr_table_df['excel_num'] + '.xls'
curr_table_df['path'] = [x.replace('\\', '/') for x in curr_table_df['path']]

In [6]:
print(len(curr_table_df))
download_table = curr_table_df.copy()
download_table = download_table[~download_table['path'].isin(existing_files_list)]
print(len(download_table))

617
587
