# insert disaster pageviews into mariadb

## password, imports, mariadb_login

###### password

In [2]:
mysql_user = 'ubuntu'
mysql_pass = input(f'Enter the MySQL password for user {mysql_user}: ')

Enter the MySQL password for user ubuntu: mariadb394


###### imports

In [32]:
import os, requests, gzip, pickle, io, logging, inspect, functools, string, re
from IPython.display import clear_output
import pandas as pd, datetime as dt
import mysql.connector as mysql, sqlalchemy

###### login to mariadb

In [4]:
def connect_mariadb(host='localhost', user=mysql_user, passwd=mysql_pass, dbname='jawiki'):
    """
    connect to mariadb and return: 
        cxn, cur, engine, conn
    """
    cxn = mysql.connect(host=host,user=user,passwd=passwd, database=dbname)
    cur = cxn.cursor()

    connection_str = 'mysql+mysqlconnector://'+user+':'+passwd+'@'+host+'/'+dbname  # removed this after host +':'+dbport
    try:
        engine = sqlalchemy.create_engine(connection_str)
        conn = engine.connect()
    except Exception as e:
        print('Database connection error - check creds')
        print(e)
    return cxn, cur, engine, conn

cxn, cur, engine, conn = connect_mariadb()

## get page_titles and page_ids

##### unpickle list of disaster pageids (diz_pageids)

In [5]:
def failed_decode(a):
    try:
        a.decode('utf-8')
    except UnicodeDecodeError:
        return True
    return False

def bytearray_to_str(a:bytearray, encoding='utf-8') -> str:
    if type(a) != bytearray:
        return a        
    while failed_decode(a):
        a = a[:-1]
    return str(a.decode(encoding))

In [6]:
with open('../data/processed/jawiki/' + 'disaster_descendants_raw.pickle', 'rb') as f:
    disaster_descendants_raw = pickle.load(f)

In [7]:
disaster_cat_page_ids = {'火山災害':2390743, '熱帯低気圧':626482, '雪害':2390774, '地震':135264, '津波':765772}  # '自然災害':137069, 
disasters_english = {'火山災害':'VolcanicDisaster', '熱帯低気圧':'TropicalCyclones', '雪害':'SnowDamage', '地震':'Earthquake', '津波':'Tsunami'}
disasters = list(disaster_cat_page_ids.keys())

In [8]:
d = {}
for i in disaster_descendants_raw:
    d[i] = (disaster_descendants_raw[i]
            .drop_duplicates(subset='id')
            .applymap(bytearray_to_str)
           )
    d[i] = d[i][d[i].namespace == 0]
    d[i]['page_title'] = d[i].name.map(lambda x: str(x).split(sep='\n')[-1])
disaster_descendants = d
del d, disaster_descendants_raw

In [9]:
disaster_descendants['火山災害'].columns

Index(['id', 'name', 'type', 'namespace', 'page_title'], dtype='object')

In [10]:
diz_pageids = [j for i in disaster_descendants for j in disaster_descendants[i].id]

## function trees

#### get_pageviews_by_pageids_and_years

In [45]:
def get_pageviews_by_pageids_and_years(
        pids_of_interest:list[int], years:[int]
        ) -> pd.DataFrame:
    """
        INPUTS: 
        pid_of_interest: pageid to return
        years: list of years to include
        OUTPUTS: 
        pd.DataFrame where:
            index: utc_date in yyyymmdd
            columns: 'mobile', 'desktop', 'app'
            values: encoded hourly counts (see note below)
            name: pageid
        NOTE:
        hourly counts are encoded with letters as hours, and numbers as counts
        for example, 'C2' means 2 pageviews between 02:00 and 02:59
    """
    paths = []
    for year in years:
        paths = paths + get_all_paths_in_year(year)
    pageid_filter = make_pageid_filter(pids_of_interest)
    diz_views = {}
    for path in paths[:100]:
        diz_views, yyyymmdd = get_hour_counts_for_pid_matches(path, diz_views, pageid_filter)
        clear_output(wait=True)
        print(f"started {min(years)},\ncompleted {yyyymmdd},\ncontinuing until end of {max(years)}")
    return pd.DataFrame(diz_views).T

##### get_all_paths_in_year

In [13]:
def get_all_paths_in_year(year:int) -> list[str]:
    gen = os.walk(f'../data/temp/jawiki_pageviews/{year}/')
    filepaths = []
    for tup in gen:
        for f in tup[2]:
            filepaths.append(tup[0]+'/'+f)
    filepaths.sort()
    return filepaths

##### make_pageid_filter 

In [57]:
def make_pageid_filter(pid_allowlist:list[int], con=conn) -> dict:
    """
    INPUTS:
        pid_allowlist: list of allowed page_id's
        con: SQLAlchemy connection
    OUTPUT: dict
        keys: all pageids in jawiki  
        values: disaster 1, not-disaster 0
    """
    ct = 0
    sql = "select distinct page_id from page;"
    all_pageids = pd.read_sql(sql, conn).squeeze().to_list()
    d = {i:0 for i in all_pageids}
    for i in pid_allowlist:
        d[i] = 1
    return d

##### get_hour_counts_for_pid_matches

In [14]:
def get_hour_counts_for_pid_matches(path:str, diz_views:dict, pageid_filter:dict=pageid_filter) -> tuple[dict, str]:
    """
        INPUTS:
            path: bz2 pageviews_complete daily-file filepath
            diz_views: accumulated filtered pageviews data
            pageid_filter: dict mapping all jawiki pageids to 1's if "disaster" page, 0 otherwise
        OUTPUTS:
            diz_views: updated filtered pageviews data
            yyyymmdd: date as string-num
    """
    yyyymmdd = path.split('/')[-1].split('-')[1]
    with open(path) as f:
        try: # check if a dict has been assigned for this date yet
            diz_views[yyyymmdd]
        except KeyError: # make one if it isn't there yet
            diz_views[yyyymmdd] = {}
        
        while (line := f.readline()):
            line_split = line.split()
            
            if len(line_split) < 6: # discard abnormal records missing fields 
                continue
            
            pid_str = line_split[2]
            try: # redundant check, ignoring pageviews of redirects etc that don't have page_id
                pid = int(pid_str)
            except ValueError:
                continue 
            
            try: # check if page_id is in the list
                (ps:=pageid_filter[pid])
            except KeyError:
                continue
            
            if ps==1:
                access_method = line_split[3]
                hour_counts = line_split[5]
                diz_views[yyyymmdd][pid_str+'_'+access_method] = hour_counts
    return (diz_views, yyyymmdd)

##### ---------------------------------------------------

#### insert_pageviews_into_mariadb

In [None]:
def insert_pageviews_into_mariadb(
    years:list[int], pagetitles:list[str], project:str='ja', 
    logfilepath:str=logfilepath, con=conn):
    
    if not does_table_exist('pageviews'):
        create_pageviews_table(con)
    # start counts
    process_start_time = dt.datetime.now()
    file_count = 0
    for year in years:
        urls = get_pageviews_urls_by_year(year)
        # urls = urls[:3] # truncate if debugging 
        for url in urls:
            
            # actual work
            temp_fpath = download_file(url, dirpath='../data/temp/')
            pageviews = get_pageviews_subset_by_proj_and_pagetitles(
                            temp_fpath, pagetitles=pagetitles)
            pageviews.to_sql(name='pageviews',con=con, if_exists='append', index=False)
            
            # complete counts
            file_count += 1
            # log and cleanup
            log_forecast_of_completion(file_count, process_start_time, years)
            os.remove(temp_fpath)

###### does_table_exist

In [None]:
def does_table_exist(tablename:str, dbname:str='jawiki', con=conn) -> bool:
    """
    run SQL query to look for tablename, return boolean
    """
    sql = f"""
    SELECT * 
        FROM information_schema.tables
    WHERE table_schema = '{dbname}' 
        AND table_name = '{tablename}'
    LIMIT 1;
    """
    return bool(pd.read_sql(sql, con).shape[0])

###### create_pageviews_table

In [None]:
def create_pageviews_table(con=conn):
    """
    run SQL code to create pageviews table
    """
    
    sql = """
    CREATE TABLE pageviews (
        row_id BIGINT(20) AUTO_INCREMENT PRIMARY KEY
        ,page_id BIGINT(20)
        ,utc_date DATE
        ,utc_hourly_count TEXT
    )
    ;
    """
    conn.execute(sql)

##### tz_shift_hourly_views_utc2ja

In [16]:
def tz_shift_hourly_views_utc2ja(ser:pd.Series) -> pd.Series:
    AZ = string.ascii_uppercase
    ja_tz_map = {i:j for i,j in zip(AZ[17:]+AZ[:17], AZ[:17]+AZ[17:])}
    ja_tz_map = AZ.maketrans(ja_tz_map)
    a = ser.str.extract('([R-Z].*$)').shift(1).fillna('')
    b = ser.str.extract('((?:[A-Q][0-9]*)*)').fillna('')
    c = (a+b).squeeze().str.translate(ja_tz_map)
    return c

##### get_daily_views_from_hourly_views

In [55]:
def get_daily_views_from_hourly_views(pd_obj):
    if isinstance(pd_obj, pd.core.frame.DataFrame):
        return pd_obj.applymap(lambda x: sum((int(i) for i in re.findall(r"[A-Z]([0-9]*)", x))))
    if isinstance(pd_obj, pd.core.series.Series):
        return pd_obj.map(lambda x: sum((int(i) for i in re.findall(r"[A-Z]([0-9]*)", x))))

## main program

In [None]:
df_utc = get_pageviews_by_pageids_and_years([1051, 18508], [2016])

In [33]:
df_ja = df.apply(tz_shift_hourly_views_utc2ja)

True

In [None]:
df_ja.to_sql('test1', conn)

In [17]:
earthquake_pids = [18508, 159816, 1051, 2339185, 1516544]

---

run main program

In [None]:
etl_pageviews_by_years_and_projects([2016,2017,2018,2019,2020,2021], all_titles)

---

### probably won't need this concept

###### function tosql_utcdate_pageid_hourlycount_accessmethod

In [None]:
def populate_pageviews_utc(paths:list[str]):
    # bz2 fields: wiki_code, article_title, page_id, daily_total, hourly_counts
    # sql fields: utc_date, page_id, hourly_count, access_method
    
    # check the line starts with "ja.wikipedia"
    # split the line
    # skip record if the list is too short
    # skip record if there's no integer pageid
    # convert access_method to integer:
        # 'mobile': 1, 'desktop': 2, 'app': 3
    # insert record to sql    
    pass
