In [3]:
import time
import datetime

import pandas as pd
from wmfdata import hive

In [2]:
create_table = """
CREATE EXTERNAL TABLE IF NOT EXISTS cchen.content_pv (
`date` date, 
`project` string,
`market` string,
`country` string,
`topic` string,
`main_topic` string,
`sub_topic` string,
`pageviews` bigint
)
PARTITIONED BY (
`year` int,
`month` int,
`day` int
)
STORED AS PARQUET
"""

In [3]:
hive.run(create_table)

## Daily data aggregation

use new topic table: isaacj.article_topics_outlinks_2021_07

In [13]:
query_daily_pv = """

WITH monthly_views AS (
    SELECT
        year, month, day,countries.economic_region AS market,
        country,
        database_code,project,page_id, page_title, 
        SUM(view_count) AS pageviews
    FROM wmf.pageview_hourly pv
    LEFT JOIN canonical_data.countries AS countries 
           ON pv.country_code = countries.iso_code
    INNER JOIN canonical_data.wikis w
           ON pv.project = CONCAT(w.language_code,'.',w.database_group)
    WHERE year = {YEAR} and month = {MONTH} and day = {DAY}
      AND country != 'Unknown'
      AND agent_type != 'spider'
      AND database_group = 'wikipedia'
      AND namespace_id =  0 
    GROUP BY year, month, day,economic_region,country,database_code,project, page_id, page_title
) 

INSERT OVERWRITE TABLE cchen.content_pageview
PARTITION(year = {YEAR} , month = {MONTH} , day = {DAY})

SELECT
    CONCAT(mv.year, '-', LPAD(mv.month, 2, '0'), '-',LPAD(mv.day, 2, '0')) AS `date`,
    project,
    market, 
    country,
    ato.topic,
    tc.main_topic, 
    tc.sub_topic,
    SUM(mv.pageviews) as pageviews
FROM monthly_views mv
INNER JOIN isaacj.article_topics_outlinks_2021_07 ato ON (
    mv.database_code =  ato.wiki_db
    AND mv.page_id = ato.pageid
    AND ato.score >= 0.5
)
LEFT JOIN cchen.topic_component tc ON ato.topic = tc.topic
GROUP BY 
    CONCAT(mv.year, '-', LPAD(mv.month, 2, '0'), '-',LPAD(mv.day, 2, '0')) ,
    project,market,
    country, 
    ato.topic,tc.main_topic, 
      tc.sub_topic """

In [14]:
last_month = datetime.date.today().replace(day=1) - datetime.timedelta(days=1)
last_day = last_month.day

metrics_year = last_month.year
metrics_month = last_month.month


In [None]:
for i in range(1,last_day+1):
    
    metrics_day = i
    
    update_daily_pv = query_daily_pv.format(
        YEAR=metrics_year,
        MONTH=metrics_month,
        DAY = metrics_day)
    
    print("Running day {}...".format(i))
    hive.run(update_daily_pv)


In [None]:
#update data base access
os.system("hdfs dfs -chmod -R o+r /user/hive/warehouse/cchen.db/content_pageview")