## Import libraries

In [1]:
import os
import re
import json
import yaml
import time
from datetime import datetime, timedelta, timezone
import warnings
from tenacity import retry, stop_after_attempt, wait_exponential
import requests
from dotenv import load_dotenv
from bs4 import BeautifulSoup
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from tqdm.notebook import tqdm
from typing import Optional, Literal, Callable, Union, Any, List, Tuple

In [2]:
load_dotenv()

True

## Pancakes functions

### Global variable

In [3]:
# NUM_WORKERS = max(os.cpu_count() // 2, 1)
NUM_WORKERS = 6

### Utils

In [4]:
def load_yaml(path: str) -> dict:
    with open(path, 'r', encoding='utf-8') as file:
        res = yaml.safe_load(file)

    return res


def load_json(path: str) -> dict:
    with open(path, 'r', encoding='utf-8') as file:
        result = json.load(file)

    return result


def save_json(path: str, content: Union[dict, list], indent: int = 4) -> None:
    with open(path, 'w', encoding='utf-8') as file:
        json.dump(content, file, indent=indent, ensure_ascii=False)


In [5]:
def string_to_unix_second(s: str, format: str = "%Y-%m-%dT%H:%M:%S") -> int:
    try:
        rounded_s = s.split('.')[0]
    except Exception:
        rounded_s = s

    dt = datetime.strptime(rounded_s, format)
    timestamp = int(dt.timestamp())

    return timestamp

In [6]:
def get_current_time_utc_plus_7():
    # Define UTC+7 timezone
    utc_plus_7 = timezone(timedelta(hours=7))
    
    # Get the current time in UTC+7
    current_time_utc_plus_7 = datetime.now(utc_plus_7)
    
    # Remove microseconds and timezone information
    current_time_utc_plus_7 = current_time_utc_plus_7.replace(microsecond=0, tzinfo=None)
    
    return current_time_utc_plus_7   

In [7]:
def get_day_before(num_days: int, return_type: Literal['date', 'timestamp'] = 'timestamp'):
    now = datetime.now()
    start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)

    result = start_of_day - timedelta(days=num_days)
    if return_type == 'timestamp':
        result = int(result.timestamp())

    return result

In [8]:
def split_time_stamp(since: int, until: int, time_delta: int = 30*24*60*60) -> List[Tuple[int, int]]:
    result = [(t, min(t + time_delta, until)) for t in range(since, until, time_delta)]
    return result

### Getting pages data

In [9]:
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=60)
)
def call_pancake_api(
    url: str,
    parameters: Optional[dict] = None,
    call_type: Literal['get', 'post'] = 'get',
    add_access_token: bool = True,
    return_type: Literal['raw', 'string', 'dictionary'] = 'string',
    **kwargs: Any
) -> Union[bytes, str, dict]:
    
    # Initiate request parameters
    request_params = {}
    if add_access_token:
        request_params['access_token'] = os.getenv('PANCAKE_API')
    if parameters is not None:
        request_params.update(parameters)

    # Prepare the request arguments
    request_args = {
        'url': url,
        'params': request_params
    }

    # Add any additional keyword arguments
    request_args.update(kwargs)

    # Make the request
    if call_type == 'get':
        response = requests.get(**request_args)
    elif call_type == 'post':
        response = requests.post(**request_args)
    else:
        raise ValueError(f"Unsupported call_type: {call_type}")

    # check request status
    response.raise_for_status()

    # Process the response based on return_type
    if return_type == 'raw':
        return response.content
    elif return_type == 'string':
        return response.text
    elif return_type == 'dictionary':
        return json.loads(response.content)
    else:
        raise ValueError(f"Unsupported return_type: {return_type}")

In [10]:
def get_page(return_type: Literal['id', 'standard', 'full'] = 'standard') -> Union[dict, list]:
    request_page_list_url = 'https://pages.fm/api/v1/pages'
    response = call_pancake_api(url=request_page_list_url,
                                return_type='dictionary')
    if not response['success']:
        raise Exception('Failed to get pages data from Pancake API.')

    pages = response['categorized']

    if return_type == 'full':
        return {page['id']: page for page in pages['activated']}

    elif return_type == 'standard':
        return{page['id']: {'name': page['name'], 'page_access_token': page['settings'].get('page_access_token', None)} 
               for page in pages['activated']}

    return pages['activated_page_id']

In [11]:
def get_page_access_token(page_info: dict) -> tuple[dict[str, str], list[str]]:
    page_ids = list(page_info.keys())

    # generate page's access token
    request_page_access_token = (
        lambda page_id: call_pancake_api(url=f'https://pages.fm/api/v1/pages/{page_id}/generate_page_access_token',
                                         call_type='post',
                                         return_type='dictionary')
    )
    page_access_tokens = {}
    not_accessible_page_ids = []
    with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
        future_to_access_token = {executor.submit(request_page_access_token, id) : id for id in page_ids}
        
        for future in tqdm(concurrent.futures.as_completed(future_to_access_token), total=len(future_to_access_token), desc='Generate page access tokens'):
            page_id = future_to_access_token[future]
            response = future.result()
            
            if not response['success']:
                print(f'UserWarning: Cannot generate access token for page "{page_info[page_id]['name']}"')
                not_accessible_page_ids.append(id)
                continue
            
            page_access_tokens[page_id] = response['page_access_token']

    return page_access_tokens, not_accessible_page_ids

### Get conversations

In [12]:
def get_page_conversations(page_id: str,
                           page_access_token: str,
                           since: int,
                           until: int,
                           order_by: Literal['insert', 'update'] = 'update',
                           filter: List[str] = ['inbox', 'comment', 'rating']):
    # split the timestamp in case of the time range is longer than 1 month
    timestamp_list = split_time_stamp(since, until)
    
    # loop over each time range
    result = []
    for s, u in timestamp_list:
        # in each time range, go through all pages of response
        page_number = 1
        while True:
            response = call_pancake_api(
                url=f'https://pages.fm/api/public_api/v1/pages/{page_id}/conversations',
                parameters={'page_access_token': page_access_token,
                            'since': s,
                            'until': u,
                            'page_id': page_id,
                            'page_number': page_number,
                            'order_by': 'updated_at' if order_by == 'update' else 'inserted_at'},
                return_type='dictionary',
                add_access_token=False
            )

            if not response['success']:
                print(f"UserWarning: Failed to get conversations for page {page_id}. Respone's message: {response['message']}")
                break
            elif len(response['conversations']) == 0:
                break
            
            # update result
            result += response['conversations']

            # update `page_number`
            page_number += 1
    
    result = [c for c in result if c['type'].lower() in filter]
    return result

### Get messages

In [13]:
def get_sender(sender_dict: dict, is_sender_patterns: Optional[list] = None) -> Literal['customer', 'admin']:
    if 'admin_id' in sender_dict:
        return 'admin'

    admin_patterns = ['Trường Bào Ngư'] + (is_sender_patterns or [])
    for pattern in admin_patterns:
        if re.search(pattern, sender_dict['name']):
            return 'admin'

    return 'customer'

In [14]:
def filter_message(message_response: dict, attrs: Optional[list] = None) -> dict:
    if not message_response['original_message']:
        return None
    
    result = {}
    # message
    result['message'] = message_response['original_message']
    # time
    result['inserted_at'] = message_response['inserted_at']
    # from
    result['from'] = get_sender(message_response['from'])
    
    # additional attributes
    if attrs is not None:
        for k in attrs:
            result[k] = message_response.get(k, None)
         
    return result

In [15]:
def get_messages(page_id: str,
                 page_access_token: str,
                 conversation_id: str,
                 customer_id: str,
                 since: int,
                 until: int):
    # Get messages
    message_cnt = 0
    update_timestamp = None
    messages = []
    while update_timestamp is None or update_timestamp > since:
        # call API
        response = call_pancake_api(
            url=f'https://pages.fm/api/public_api/v1/pages/{page_id}/conversations/{conversation_id}/messages',
            parameters={
                'current_count': message_cnt,
                'page_access_token': page_access_token,
                'customer_id': customer_id,
                'conversation_id': conversation_id,
                'page_id': page_id
            },
            return_type='dictionary',
            add_access_token=False
        )

        # check response status
        if not response['success']:
            print(f"UserWarning: Failed to get conversations for page {page_id}. Respone's message: {response['message']}")

        # check messages
        if len(response['messages']) == 0:
            break

        # store messages
        messages = response['messages'] + messages

        # update `update_timstamp` and `message_cnt`
        update_timestamp = string_to_unix_second(response['messages'][0]['inserted_at'])
        message_cnt += len(response['messages'])

    # filter messages
    messages = [filter_message(m) for m in messages \
                if since <= string_to_unix_second(m['inserted_at']) <= until]
    messages = [m for m in messages if m]
    
    return messages

## Getting data

Firstly, we need to check some conditions:

- Have we initiated the data scheme?
    - We need to check whether have `./data/data_scheme.json` file.
    - And we also verify that file is valid.

- Is there any new pages?
    - Call pancake API and compare called `page_id` with initiative `page_id`.

- Is there any new conversations in each page?
    - Call pancake API and compare called `conversation_id` with initiative `conversation_id`

- Is there any new messages in each conversations?
    - Call pancake API and compare called `updated_time` in **Conversation API response** with initiative `updated_time`.

In [16]:
def filter_pages(name: str, patterns: List):
    for p in patterns:
        if re.search(p, name):
            return True
        
    return False

In [17]:
def update_page(schema_path: str, default_last_check: int = 30, page_filter_patterns: Optional[List[str]] = None):
    # check exitent schema
    page_schema = {}
    if os.path.exists(schema_path):
        page_schema = load_json(schema_path)

    # call page list
    pages = get_page()
    # filter proper pages
    if page_filter_patterns:
        pages = {k: v for k, v in pages.items() if filter_pages(v['name'], page_filter_patterns)}
    
    # update `page_access_token`
    # update available `page_access_token` in case of it was renewed
    no_access_token_pages = []
    for k, v in pages.items():
        if v['page_access_token'] is None or k not in page_schema:
            no_access_token_pages.append(k)
        elif page_schema[k]['page_access_token'] != v['page_access_token']:
            page_schema[k]['page_access_token'] = v['page_access_token']

    # generate `page_access_token`
    no_access_token_pages = {k: v for k, v in pages.items() if v['page_access_token'] is None}
    if no_access_token_pages:
        page_access_tokens, _ = get_page_access_token(no_access_token_pages)
        for k, v in page_access_tokens.items():
            pages[k]['page_access_token'] = v

    # update latest data schema
    iter_keys = list(set(pages.keys()) - set(page_schema.keys()))
    for k in iter_keys:
        if not pages[k]['page_access_token']:
            continue

        # others case
        page_schema[k] = {}
        page_schema[k]['name'] = pages[k]['name']
        page_schema[k]['page_access_token'] = pages[k]['page_access_token']
        # set up for conversations, messages scheme
        page_schema[k]['last_check'] = get_day_before(default_last_check)
        page_schema[k]['conversations'] = {}
    
    return page_schema

In [18]:
def update_conversation(page_schema: dict, new_check: int):
    # call newest conversations of all pages
    conversations = {}
    with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
        future_to_conversations = {
            executor.submit(
                get_page_conversations,
                k, # page_id
                v['page_access_token'], # page_access_token
                v['last_check'], # since
                new_check, # until,
                'update', # order_by
                ['inbox'] # filter
            ): k
            for k, v in page_schema.items()
        }

        for future in tqdm(concurrent.futures.as_completed(future_to_conversations), total=len(future_to_conversations), desc='Request conversations'):
            page_id = future_to_conversations[future]
            data = []
            try:
                data = future.result()
            except Exception as exc:
                print(f'Error occurred while fetching conversations for page {page_id}: {exc}')

            conversations[page_id] = data

    # check which conversation has new messages
    update_conversations = []
    for page_id, conversation_list in conversations.items():
        page_access_token = page_schema[page_id]['page_access_token']
        last_conversations = page_schema[page_id]['conversations']
        
        for con in conversation_list:
            con_id = con['id']
            if con_id not in last_conversations:
                print(f'{con_id} not in {page_id}')
                # update `last_crawled_conversations`
                last_conversations[con_id] = {
                    'customer_id': con['customer_id'],
                    'last_updated': con['updated_at'],
                }

                # append to list to crawl data
                update_conversations.append(
                    # append page_id, page_access_token, conversation_id, customer_id, last_update
                    (page_id, page_access_token, con_id, con['customer_id'], get_day_before(30))
                )

            elif con['updated_at'] != last_conversations[con_id]['last_updated']:
                print(f"Page {page_id} has new message")
                print(con['updated_at'], last_conversations[con_id]['last_updated'])
                # update last crawl time
                last_conversations[con_id]['last_updated'] = con['updated_at']
                
                # add to crawling list
                last_update_timestamp = string_to_unix_second(con['updated_at'])
                update_conversations.append(
                    (page_id, page_access_token, con_id, con['customer_id'], last_update_timestamp)
                )

    return update_conversations

In [19]:
def update_messages(conversations: dict, new_check: int):
    messages = []
    with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
        future_to_messages = {
            executor.submit(
                get_messages,
                m[0],
                m[1],
                m[2],
                m[3],
                m[4],
                new_check
            ): m[2] for m in conversations
        }
        for future in tqdm(concurrent.futures.as_completed(future_to_messages), total=len(future_to_messages), desc='Request messages'):
            con_id = future_to_messages[future]
            try:
                mess = future.result()
                if mess:
                    messages += mess
            except Exception as exc:
                print(f'Error occurred while fetching messages for conversation {con_id}: {exc}')

    return messages

In [20]:
def pancake_etl(schema_path: str, default_last_check: int = 30, filter_patterns: Optional[List] = None):
    # 1. Update pages
    page_schema = update_page(schema_path, default_last_check, filter_patterns)

    # 2. Update conversations
    new_check = int(time.time())
    conversations = update_conversation(page_schema, new_check)

    # 3. Completing update schema, save it.
    save_json(schema_path, page_schema)
    
    # 3. Get new messages
    messages = update_messages(conversations, new_check)

    return messages

In [21]:
config = load_yaml('../config.yml')

messages = pancake_etl(config['schema'], config['default-last-check'], config['filter-page-keywords'])

Generate page access tokens:   0%|          | 0/1 [00:00<?, ?it/s]



Request conversations:   0%|          | 0/114 [00:00<?, ?it/s]

380451151813143_8215561791884413 not in 380451151813143
440355119151770_8104229263033365 not in 440355119151770
108275989023513_8849549828408081 not in 108275989023513
108275989023513_7824477541014404 not in 108275989023513
108275989023513_7831112700327554 not in 108275989023513
108275989023513_8962750997072448 not in 108275989023513
273379939201019_7806126152826249 not in 273379939201019
273379939201019_8250864194990802 not in 273379939201019
273379939201019_8361853610597060 not in 273379939201019
273379939201019_27670185525913897 not in 273379939201019
273379939201019_8740339945996499 not in 273379939201019
273379939201019_8442320622471745 not in 273379939201019
273379939201019_9471784332838136 not in 273379939201019
273379939201019_8296898117067300 not in 273379939201019
273379939201019_8791623030888678 not in 273379939201019
273379939201019_6590966827693916 not in 273379939201019
273379939201019_26848894548090465 not in 273379939201019
273379939201019_9264524930231102 not in 273379

Request messages:   0%|          | 0/3035 [00:00<?, ?it/s]