# Exploratory API call and inserting to DuckDB

## test API and examine the returned JSON

In [5]:
import os    
import requests
import pandas as pd

url = "https://en.wikipedia.org/w/api.php"

params = {
    "action": "query",
    "format": "json",
    "list": "recentchanges",
    "rcstart": '2024-10-31T00:00:01Z',
    "rcend": '2024-10-31T00:00:00Z',
    "rclimit": 500, # maximum value is 500
    "rcprop": "title|timestamp|userid|user|comment|flags|sizes"
}

response = requests.get(url, params=params)
print(response)
edit_records = response.json()['query']['recentchanges']
print('\n ---------------------------- \n printing query/recentchanges only', '\n', edit_records)
tumbling_window_df = pd.DataFrame(edit_records)
tumbling_window_df.head()

<Response [200]>

 ---------------------------- 
 printing query/recentchanges only 
 [{'type': 'categorize', 'ns': 14, 'title': 'Category:Saturn Award-winning films', 'user': 'JJMC89 bot III', 'userid': 35936988, 'bot': '', 'oldlen': 0, 'newlen': 0, 'timestamp': '2024-10-31T00:00:01Z', 'comment': '[[:Category:WALL-E]] removed from category, [[Special:WhatLinksHere/Category:WALL-E|this page is included within other pages]]'}, {'type': 'categorize', 'ns': 14, 'title': 'Category:Saturn Award–winning films', 'user': 'JJMC89 bot III', 'userid': 35936988, 'bot': '', 'oldlen': 0, 'newlen': 0, 'timestamp': '2024-10-31T00:00:01Z', 'comment': '[[:Category:WALL-E]] added to category, [[Special:WhatLinksHere/Category:WALL-E|this page is included within other pages]]'}, {'type': 'edit', 'ns': 14, 'title': 'Category:WALL-E', 'user': 'JJMC89 bot III', 'userid': 35936988, 'bot': '', 'minor': '', 'oldlen': 192, 'newlen': 194, 'timestamp': '2024-10-31T00:00:01Z', 'comment': 'Moving [[:Category:Saturn A

Unnamed: 0,type,ns,title,user,userid,bot,oldlen,newlen,timestamp,comment,minor,anon
0,categorize,14,Category:Saturn Award-winning films,JJMC89 bot III,35936988,,0,0,2024-10-31T00:00:01Z,"[[:Category:WALL-E]] removed from category, [[...",,
1,categorize,14,Category:Saturn Award–winning films,JJMC89 bot III,35936988,,0,0,2024-10-31T00:00:01Z,"[[:Category:WALL-E]] added to category, [[Spec...",,
2,edit,14,Category:WALL-E,JJMC89 bot III,35936988,,192,194,2024-10-31T00:00:01Z,Moving [[:Category:Saturn Award-winning films]...,,
3,categorize,14,Category:Use mdy dates from October 2024,Lepricavark,28779459,,0,0,2024-10-31T00:00:00Z,"[[:Obe Blanc]] added to category, [[Special:Wh...",,
4,log,6,File:BhoomiThayiyaChochchalaMagafilmposter.jpg,Explicit,4842600,,0,0,2024-10-31T00:00:00Z,[[WP:CSD#F5|F5]]: Unused non-free media file,,


## initiate logging procedure

In [None]:
import os
import logging
import sys
from contextlib import closing

log_path = '../logs/py_exec.log' 

# Delete log file - inherited code
if os.path.exists(log_path):
    try:
        # standard closing statement - inherited code
        for handler in logging.root.handlers[:]:
            with closing(handler):
                logging.root.removeHandler(handler)
        os.remove(log_path)
    except Exception as e:
        # for notebook replace with print
        # standard code for stdout - for batch, inherited code
        sys.stdout.write(f"Error deleting log file: {str(e)}\n")
        sys.stdout.flush()
if os.path.exists(log_path):
    try:
        # standard closing statement - inherited code
        for handler in logging.root.handlers[:]:
            with closing(handler):
                logging.root.removeHandler(handler)
        os.remove(log_path)
    except Exception as e:
        sys.stdout.write(f"Error deleting log file: {str(e)}\n")
        sys.stdout.flush()

# initiate logging process
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('..\logs\py_exec.log'),  # logging to file
        logging.StreamHandler()  # logging to console - for Jupyer interactive session
    ]
)

logger = logging.getLogger(__name__)

In [1]:
import os    
import requests
import duckdb
import pandas as pd
import datetime
import time
# import logging   # will be implemented - time constraint
from typing import Dict, List
# check api call 
url = "https://en.wikipedia.org/w/api.php"

"""
Fetch recent changes from Wikipedia for a specific date.

Args:
    start_date: Date in format "YYYY-MM-DD"
    limit: Maximum number of changes to retrieve
    
Returns:
    List of dictionaries containing change information
"""

input_date = '2024-10-31'     # given as is
limit = 500    # maximum allowed value is 500
n = 30  # tumbling window - in second

params = {
    "action": "query",
    "format": "json",
    "list": "recentchanges",
    "rcstart": '',
    "rcend": '',
    "rclimit": 500, # maximum value is 500
    "rcprop": "title|timestamp|userid|user|comment|flags|sizes"
}

# create a data-less df to append data
df = (pd.DataFrame(columns=['type', 'ns', 'title', 'user', 'userid', 'bot', 
                          'oldlen', 'newlen', 'timestamp', 'comment', 
                          'minor', 'anon', 'new'])
        .astype({
                    'type': 'string',      # Instead of object
                    'title': 'string',     # Instead of object
                    'user': 'string',      # Instead of object
                    'comment': 'string',   # Instead of object
                    'ns': 'int64',
                    'userid': 'int64',
                    'bot': 'bool',
                    'oldlen': 'int64',
                    'newlen': 'int64',
                    'timestamp': 'datetime64[ns]',
                    'minor': 'bool',
                    'anon': 'bool',
                    'new': 'bool'
                })
)


target_date = datetime.datetime.strptime(input_date, "%Y-%m-%d").date()

# timestamp of dates
start_time = datetime.datetime.combine(target_date, datetime.time(0, 0, 0))  # e.g. 2024-10-31 00:00:00
end_time = start_time + datetime.timedelta(days=1) - datetime.timedelta(seconds=1) # e.g. 2024-10-31 23:59:59

# initial timestamp
current_window_start_time = start_time

# iteration condition
while current_window_start_time < end_time:

    rcend_str = current_window_start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    logger.info(f"current_window_start_time: {rcend_str}")
    
    current_window_start_time += datetime.timedelta(seconds=n)
    current_window_end_time = min(current_window_start_time - datetime.timedelta(seconds=1), end_time)  # end date must be before next day
    rcstart_str = current_window_end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    logger.info(f"current_window_end_time: {rcstart_str}")
    
    # !! API parameter is named in retrospective manner
    params['rcend'] = rcend_str     # start of tumbling window 
    params['rcstart'] = rcstart_str   # end of tumbling window

    logger.info(params)    
    response = requests.get(url, params=params)

    if response.status_code == 200:
        edit_records = response.json()['query']['recentchanges']
        tumbling_window_df = pd.DataFrame(edit_records)
        
        logger.info("tumbling window df-size is", len(tumbling_window_df))
        
        # check if dataset does not exceed 500 rows
        if len(tumbling_window_df) == 500:
            raise SystemExit('error reached max records')
        
        else:
            df = pd.concat([df, pd.DataFrame(edit_records)], ignore_index=True)

            logger.info("current total size is", len(df))

    else:
        logger.info(response.text)
        raise SystemExit('error termination')

current_window_start_time: 2024-10-31T00:00:00Z
current_window_end_time: 2024-10-31T00:00:29Z
tumbling window df-size is 411
current total size is 411
current_window_start_time: 2024-10-31T00:00:30Z
current_window_end_time: 2024-10-31T00:00:59Z
tumbling window df-size is 232
current total size is 643
current_window_start_time: 2024-10-31T00:01:00Z
current_window_end_time: 2024-10-31T00:01:29Z
tumbling window df-size is 176
current total size is 819
current_window_start_time: 2024-10-31T00:01:30Z
current_window_end_time: 2024-10-31T00:01:59Z
tumbling window df-size is 265
current total size is 1084
current_window_start_time: 2024-10-31T00:02:00Z
current_window_end_time: 2024-10-31T00:02:29Z
tumbling window df-size is 160
current total size is 1244
current_window_start_time: 2024-10-31T00:02:30Z
current_window_end_time: 2024-10-31T00:02:59Z
tumbling window df-size is 190
current total size is 1434
current_window_start_time: 2024-10-31T00:03:00Z
current_window_end_time: 2024-10-31T00:03:2

## create backup parquet locally

In [None]:
try:
   # Save DataFrame to parquet
   df.to_parquet()
   logger.info(f'{os.path.getsize(file_path) / (1024 * 1024)}')  # Convert to MB
   
except Exception as e:
   logger.info(f"Error saving file: {e}")

# DuckDb

## open connection to DuckDB at data\ directory

In [14]:
import os
import duckdb

# Construct the path to the data directory
db_path = os.path.join("..", "data", "wikipedia.db") 
# db_path = "wikipedia.db"

conn = duckdb.connect(db_path)

In [14]:
conn.execute("SELECT COUNT(*) FROM iv.wiki_edits").df().head()

Unnamed: 0,count_star()
0,356300


In [17]:
conn.execute("""
            SELECT timestamp, count(*) FROM iv.wiki_edits
            group by timestamp
             """
             ).df().head()

Unnamed: 0,timestamp,count_star()
0,2024-10-31 00:00:27,14
1,2024-10-31 00:00:16,10
2,2024-10-31 00:00:57,3
3,2024-10-31 00:00:48,15
4,2024-10-31 00:00:30,5


In [8]:
# check total count of records
conn.execute("SELECT COUNT(*) FROM iv.wiki_edits").df().head()

Unnamed: 0,count_star()
0,356300


## steps to create schema + insert records

In [81]:
import os
import duckdb
# Construct the path to the storage directory
db_path = os.path.join("..", "data", "wikipedia.db") 
logger.info(db_path)

# create database wikipedia
conn = duckdb.connect(db_path)

..\data\wikipedia.db


In [27]:
# create schema
conn.execute("CREATE SCHEMA IF NOT EXISTS iv")

<duckdb.duckdb.DuckDBPyConnection at 0x24265c36a70>

In [29]:
create_table_sql = """
        CREATE TABLE IF NOT EXISTS iv.wiki_edits (
            type VARCHAR,
            title VARCHAR,
            user VARCHAR,
            userid BIGINT,
            timestamp TIMESTAMP,
            comment VARCHAR
        )
        """
conn.execute(create_table_sql)

<duckdb.duckdb.DuckDBPyConnection at 0x2424d8deaf0>

## check if table/views are created by dbt

In [86]:
conn.execute("""
    SELECT *
    FROM information_schema.tables
    WHERE table_schema = 'iv' 
""").df().head(500)

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action,TABLE_COMMENT
0,wikipedia,iv,ranked_edits,BASE TABLE,,,,,,YES,NO,,
1,wikipedia,iv,wiki_edits,BASE TABLE,,,,,,YES,NO,,
2,wikipedia,iv,api_edits_base,VIEW,,,,,,NO,NO,,
3,wikipedia,iv,timeseries_base,VIEW,,,,,,NO,NO,,


## check the result of ranked_edits

In [89]:
conn.execute("""
    select * from iv.ranked_edits
    """).df().head()

Unnamed: 0,sliding_window_start,sliding_window_end,changes_count,unique_users
0,2024-10-31 22:45:00,2024-10-31 23:15:00,13300,980
1,2024-10-31 22:30:00,2024-10-31 23:00:00,13188,940
2,2024-10-31 22:15:00,2024-10-31 22:45:00,12341,1033
3,2024-10-31 23:00:00,2024-10-31 23:30:00,11980,1001
4,2024-10-31 21:30:00,2024-10-31 22:00:00,11911,1057


## close connection to DuckDB

In [12]:
conn.close()

# dbt logic construction

In [71]:
# check total number of records 
len(
    conn.execute("""
    select type, title, user, userid, timestamp, comment
    from iv.wiki_edits
    where timestamp is not null
    """).df()
)

356371

In [72]:
conn.execute("""
    select type, count(*) as "rec_count"
    from iv.wiki_edits
    group by type
    order by "rec_count" desc
""").df().head(200)

Unnamed: 0,type,rec_count
0,edit,196670
1,categorize,145422
2,log,8326
3,new,5953


## sliding window - generated time-series table

In [15]:
conn.execute("""
                select
                    sliding_window_start,
                    --add 30 minutes of length
                    sliding_window_start + interval '30 minutes' as sliding_window_end
                from (
                    -- generate starting time-series, sliding 15 minutes
                    select
                    CAST('2024-10-31' AS TIMESTAMP)
                        -- interval 'x minutes', handled similarly to timedelta in python
                        + (INTERVAL '15 minutes' * timeseries) AS sliding_window_start
                
                    FROM range(96) AS t(timeseries)
                    -- Generate 96 values (0 to 95): 24 hours * 4 (15 minutes)
                    -- t() is table alias in DuckDB
                    )
             """).df().tail()

Unnamed: 0,sliding_window_start,sliding_window_end
91,2024-10-31 22:45:00,2024-10-31 23:15:00
92,2024-10-31 23:00:00,2024-10-31 23:30:00
93,2024-10-31 23:15:00,2024-10-31 23:45:00
94,2024-10-31 23:30:00,2024-11-01 00:00:00
95,2024-10-31 23:45:00,2024-11-01 00:15:00


## Integrating the sliding window of 30 min, freq 15 min   

In [18]:
conn.execute("""with timeseries_base as(
                    select
                        sliding_window_start,
                        --add 30 minutes of length
                        sliding_window_start + interval '30 minutes' as sliding_window_end
                    from (
                        -- generate starting time-series, sliding 15 minutes
                        select
                        CAST('2024-10-31' AS TIMESTAMP)
                            -- interval 'x minutes', handled similarly to timedelta in python
                            + (INTERVAL '15 minutes' * timeseries) AS sliding_window_start
                    
                        FROM range(96) AS t(timeseries)
                        -- Generate 96 values (0 to 95): 24 hours * 4 (15 minutes)
                        -- t() is table alias in DuckDB
                        )
                )
                select
                ts.sliding_window_start,
                ts.sliding_window_end,
                count(*) as changes_count,
                count(distinct we.user) as unique_users
                from timeseries_base as ts  --timeseries
                left join iv.wiki_edits as we  --wiki edit
                    on we.timestamp >= ts.sliding_window_start
                    and we.timestamp < ts.sliding_window_end
                group by 1, 2
                order by changes_count desc
             """).df().head()

Unnamed: 0,sliding_window_start,sliding_window_end,changes_count,unique_users
0,2024-10-31 22:45:00,2024-10-31 23:15:00,13298,980
1,2024-10-31 22:30:00,2024-10-31 23:00:00,13185,940
2,2024-10-31 22:15:00,2024-10-31 22:45:00,12340,1033
3,2024-10-31 23:00:00,2024-10-31 23:30:00,11980,1001
4,2024-10-31 21:30:00,2024-10-31 22:00:00,11911,1057


In [36]:
conn.execute("""
select * from iv.wiki_edits
""").df().head()

## Answer to the Interview Question

In [6]:
## open connection to DuckDB at data\ directory
import os
import duckdb

# Construct the path to the data directory
db_path = os.path.join("..", "data", "wikipedia.db")
conn = duckdb.connect(db_path)
# check total count of records

conn.execute("SELECT COUNT(*) FROM iv.wiki_edits").df().head()



Unnamed: 0,count_star()
0,356166


In [7]:
conn.execute("""
    SELECT *
    FROM information_schema.tables
    WHERE table_schema = 'iv' 
""").df().head(500)

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action,TABLE_COMMENT
0,wikipedia,iv,ranked_edits,BASE TABLE,,,,,,YES,NO,,
1,wikipedia,iv,wiki_edits,BASE TABLE,,,,,,YES,NO,,
2,wikipedia,iv,api_edits_base,VIEW,,,,,,NO,NO,,
3,wikipedia,iv,timeseries_base,VIEW,,,,,,NO,NO,,


In [11]:
conn.execute(f"""
                select * from iv.ranked_edits
                order by changes_count desc
            """).df().head()

Unnamed: 0,sliding_window_start,sliding_window_end,changes_count,unique_users
0,2024-10-31 22:45:00,2024-10-31 23:15:00,13298,980
1,2024-10-31 22:30:00,2024-10-31 23:00:00,13185,940
2,2024-10-31 22:15:00,2024-10-31 22:45:00,12340,1033
3,2024-10-31 23:00:00,2024-10-31 23:30:00,11980,1001
4,2024-10-31 21:30:00,2024-10-31 22:00:00,11911,1057
