In [None]:
# ## for jupyter debugging
# %cd ../modules
# import sys
# sys.path.append('..')

# import asyncio
# if asyncio.get_event_loop().is_running(): # Only patch if needed (i.e. running in Notebook, Spyder, etc)
#     import nest_asyncio
#     nest_asyncio.apply()

# Path + Creadentials loading

In [None]:
from dotenv import load_dotenv
import os
from pathlib import Path

### loading credentials
try:
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
except:
    pass

CRED_PATH = "../.credentials"  ## .credentials folder is in service directory
ENV_PATH = ".."  ## .env file is in service directory
LOG_PATH = "../.logs"
env_path = Path(ENV_PATH) / ".env"
load_dotenv(dotenv_path=env_path)

# Main packages

In [None]:
from typing import Any, Callable, List, Optional, Tuple, Union

import json
import logging
import numpy as np
import pandas as pd
from pydantic import AnyUrl
import pymongo
import re
import time

# project modules
from modules.utils import create_logger

In [None]:
pd.set_option('display.max_columns', 200)

# Logging toolbox 🔊

In [None]:
try:
    logger = create_logger(Path(LOG_PATH), __file__, logging.getLogger(__name__))
except:
    logger = create_logger()

# Helper functions

In [None]:
def load_credentials(
    cred_file: str = os.getenv("PG_CRED_FILE", "pg_credentials.json"),
    cred_path: str = CRED_PATH,
) -> dict:
    """
    Return: dct_credentials; Python dictionary contains credentials to access Postgres DB

    Parameters
    --------
    cred_file: credential JSON file name
    cred_path: relative path to `cred_file`

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> dct_credentials = database_utils.load_credentials('pg_credentials.json.sample', '../../.credentials')
    >>> print(dct_credentials)
    {
        "< a name you understand >: str": {
            "host": "< 192.168.xx.xx >: url",
            "user": "< a username >: str",
            "password": "< password >: str",
            "database": "< database name >: str"
        },
        "< anothername >: str": {
            "host": "< 192.168.xx.xx >: url",
            "database": "< database name >: str"
        }
    }
    ```
    """
    ## inhouse DB
    with open(Path(cred_path) / cred_file) as cred_json:
        dct_credentials = json.load(cred_json)
    return dct_credentials

In [None]:
def sql_query_str_formatter(base_q_str, *args):
    """
    Return formatted query string 
    
    Parameters
    --------
    base_q_string: Base query string / Query string to be base_q_string.format(....)
    *args: arguments to be unpacked onto q_string_to_format
    
    Example
    --------
    >>> from modules import database_utils
    >>> base_q_str = 'SELECT products.sku as {} FROM {}'
    >>> formatted_SQL_q_str = database_utils.sql_query_str_formatter(base_q_str, 'sku', 'products')
    >>> print(formatted_SQL_q_str)
    SELECT products.sku as sku FROM products
    """
    if len(args) == 0:
        base_q_str = re.sub(r'\s+', ' ',re.sub('{}', '', base_q_str))
    try:
        assert len(args)==len(re.findall('{}',base_q_str))
    except:
        raise ValueError('Base query string needs', len(re.findall('{}',base_q_str)), 'arguments, but', len(args), 'were given')

    formatted_SQL_q_str = base_q_str.format(*args)
    return formatted_SQL_q_str

# Mongo utils

In [None]:
def mongo_connect(
    host: str = os.getenv("MONGO_HOST", "social_chat_mongo"),
    port: int = int(os.getenv("MONGO_PORT", 27017)),
    username: Optional[str] = os.getenv("MONGO_USERNAME"),
    password: Optional[str] = os.getenv("MONGO_PASSWORD"),
    logger: Optional[logging.Logger] = logger,
    ) -> pymongo.mongo_client.MongoClient:
    """
    Return: MongoClient object for further operations

    *** Expected to be a helper function for other MongoDB related functions

    Parameters
    --------
    host: MongoDB IP address **Please strip out 'https:// or mongodb://'**
    port: MongoDB port
    username: MongoDB Username
    password: MongoDB Password
    logger: logging handler

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> conn = database_utils.mongo_connect('<some IP>', 27017, '<some username>', '<some password>')
    ```
    """

    if username and password:
        mongo_uri = f"mongodb://{username}:{password}@{host}:{port}"
    else:
        mongo_uri = f"mongodb://{host}:{port}"
        
    try:
        conn = pymongo.MongoClient(mongo_uri)
        return conn
    except Exception as e:
        if logger:
            logger.exception(f"{e}! Uanble to connect")
        raise pymongo.errors.ConnectionFailure(f"{e}! Uanble to connect")

In [None]:
def mongo_getList_databasenames(
    conn: pymongo.mongo_client.MongoClient, logger: Optional[logging.Logger] = logger
) -> list:
    """
    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> lst_db_names = database_utils.mongo_getList_databasenames(database_utils.mongo_connect())
    ```
    """
    lst_db_names = conn.list_database_names()
    return lst_db_names

In [None]:
def mongo_getList_collectionnames(
    conn: pymongo.mongo_client.MongoClient,
    db_name: str,
    logger: Optional[logging.Logger] = logger,
) -> list:
    """
    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> lst_col_names = database_utils.mongo_getList_collectionnames(database_utils.mongo_connect(), '<some db name>')
    ```
    """
    lst_col_names = conn[db_name].list_collection_names()
    return lst_col_names

In [None]:
def mongo_read(
    conn: pymongo.mongo_client.MongoClient,
    db_name: str,
    collection_name: str,
    query: Tuple[dict] = ({}, {}),
    no_id: bool = True,
    sort: dict = None,
    limit: int = None,
    logger: Optional[logging.Logger] = logger,
) -> pd.core.frame.DataFrame:
    """
    Read from Mongo and Store into DataFrame
    :return: df_read; query result

    ref:
    1. https://docs.mongodb.com/manual/reference/method/db.collection.find/

    Parameters
    --------
    conn: MongoClient object for further operations
    db_name: selected database
    collection_name: selected collection in that database
    query: ({query}, {projection}); see ref.1
    no_id: if True, ignore `_id` columns. **can be overwritten if {'_id':1} is in projection; see `query`
    logger: logging handler

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> df_read = database_utils.mongo_read(database_utils.mongo_connect(),
    ...                                     '<some db name>',
    ...                                     '<some collection name>',
    ...                                     ({<some query>, <some projection>})
    ...                                    )
    ```
    """
    # Column selecting
    query1 = {**{"_id": 0}, **query[1]} if no_id else query[1]

    # Make a query to the specific DB and Collection
    cursor = conn[db_name][collection_name].find(query[0], query1)
    if type(sort) == dict:
        sort = [(k, v) for k, v in sort.items() if v == 1 or v == -1]
        if len(sort) > 0:
            cursor = cursor.sort(sort)
    if type(limit) == int:
        if limit > 0:
            cursor = cursor.limit(limit)

    # Expand the cursor and construct the DataFrame
    df_read = pd.DataFrame(list(cursor))

    return df_read

In [None]:
def mongo_drop(
    conn: pymongo.mongo_client.MongoClient,
    db_name: str,
    collection_name: str,
    logger: Optional[logging.Logger] = None,
) -> pd.core.frame.DataFrame:
    """
    Drop Mongo collection

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> database_utils.mongo_drop(database_utils.mongo_connect(), '<some db name>', '<some collection name>')
    ```
    """
    conn[db_name][collection_name].drop()

In [None]:
def mongo_insertmany(
    conn: pymongo.mongo_client.MongoClient,
    db_name: str,
    collection_name: str,
    df: pd.core.frame.DataFrame,
    logger: Optional[logging.Logger] = logger,
):
    """
    Insert data to MongoDB from DataFrame

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> import pandas as pd
    >>> df = pd.read_csv('<some .csv file>')
    >>> database_utils.mongo_insertmany(database_utils.mongo_connect(),
    ...                                 '<some db name>',
    ...                                 '<some collection name>',
    ...                                 df,
    ...                                )
    ```
    """
    if df.shape[0] == 0:
        print("No Operation: df.shape[0]==0")
        return
    conn[db_name][collection_name].insert_many(df.to_dict("records"))

In [None]:
def mongo_recreate_collection(
    conn: pymongo.mongo_client.MongoClient,
    df: pd.core.frame.DataFrame,
    db_name: str,
    collection_name: str,
    logger: Optional[logging.Logger] = logger,
):
    """
    Recreate Insert data to MongoDB from DataFrame
    
    ** mongo_drop() then mongo_insertmany

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> import pandas as pd
    >>> df = pd.read_csv('<some .csv file>')
    >>> database_utils.mongo_insertmany(database_utils.mongo_connect(),
    ...                                 df,
    ...                                 '<some db name>',
    ...                                 '<some collection name>'
    ...                                )
    ```
    """
    if df.shape[0] != 0:
        mongo_drop(conn, db_name, collection_name, logger)
        mongo_insertmany(conn, db_name, collection_name, df, logger)
    else:
        raise ValueError("Empty DataFrame is not allowed in this operation.")

In [None]:
def mongo_update_one(
    conn: pymongo.mongo_client.MongoClient,
    db_name: str,
    collection_name: str,
    filter_: dict,
    update: dict,
    upsert: bool = True,
    logger: Optional[logging.Logger] = logger,
):
    """
    connect to MongoDB and update_one()

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> import pandas as pd
    >>> df = pd.read_csv('<some .csv file>')
    >>> col_name = '<some column name>'
    >>> for dct in df.to_dict('records'):
    ###     dct[col_name] = '<some column values>'
    ...     database_utils.mongo_update_one(database_utils.mongo_connect(),
    ...                                     '<some db name>',
    ...                                     '<some collection name>'
    ...                                     {col_name: dct[col_name]}, ## filter
    ...                                     {'$set': dct}, ## update
    ...                                     upsert=True ## either update or insert
    ...                                    )
    ```
    """
    col = conn[db_name][collection_name]
    result = col.update_one(filter_, update, upsert)
    return result
    

In [None]:
def mongo_deletemany(
    conn: pymongo.mongo_client.MongoClient,
    db_name: str,
    collection_name: str,
    query: dict,
    logger: Optional[logging.Logger] = logger,
) -> pymongo.results.DeleteResult:
    """
    Delete records from MongoDB, and return delete information

    ref:
    1. https://www.w3schools.com/python/python_mongodb_delete.asp

    Parameters
    --------
    conn: MongoClient object for further operations
    db_name: selected database
    collection_name: selected collection in that database
    query: see ref.1
    logger: logging handler

    Example
    --------
    ```python
    >>> from modules import database_utils
    >>> delete_result = database_utils.mongo_deletemany(database_utils.mongo_connect(),
    ...                                                 '<some db name>',
    ...                                                 '<some collection name>',
    ...                                                 {<some keys>: <some values>}
    ...                                                )
    ```
    """

    # Make a query to the specific DB and Collection
    delete_result = conn[db_name][collection_name].delete_many(query)

    return delete_result