In [1]:
import os
import random
import yaml

import re
import pandas as pd
import dropbox
from twilio.rest import Client
from twilio.base.version import TwilioRestException

In [2]:
from typing import (
    Dict,
    Literal,
    List,
    Optional,
    TypeVar,
    Union
)

In [3]:
PhoneNumber = TypeVar('PhoneNumber', bound=str)
file = TypeVar('file', bound=str)
exit_status = TypeVar('exit_status', bound=int)

In [4]:
class InvalidPhoneNumber(Exception):
    """Exception for invalid phone numbers."""
    pass

In [5]:
def read_file_to_str(infile: file) -> str:
    """Reads the contents of a file and returns a string.
    """
    infile: file = os.path.abspath(infile)
    with open(infile,'r') as f:
        t = f.readlines()
        return ' '.join(t).strip('\n')

In [6]:
def _format_phone_number(num: str) -> PhoneNumber:
    """Helper function that formats a string to resemble a phone number 
    for use with Twilio's ``python`` API.
    """
    num: PhoneNumber = re.sub(r'[^\w]','',num) # Remove special characters and white space
    
    if not num.startswith("1"):
        num: PhoneNumber = f"1{num}" # Check if phone number starts with US country code
    
    if not num.startswith("+"):
        num: PhoneNumber = f"+{num}" # Check if '+' is prepended to phone number
    
    if len(num) != 12:
        raise RuntimeError(f"Preprocessed phone number is not 12 characters long: {num}")
    
    return num

In [7]:
def send_sms(to: PhoneNumber,
             account_sid: str,
             service_sid: str,
             auth_token: str,
             body: Optional[str] = None,
             media: Optional[str] = None,
             outgoing: Optional[PhoneNumber] = None
            ) -> exit_status:
    """Utilizes the ``Twilio`` API to send SMS and/or MMS text messages to a known cell phone number.
    Messages sent successfully will have an exit status of ``0`` and ``1`` otherwise.

    NOTE: Account and service SIDs can be obtained from Twilio, alongside the authorization token.

    Usage example:
        >>> send_sms(to="+12346789012",
        ...          body="Hello world!")

    Arguments:
        to: The intended recipient of the text message. **NOTE**: The input phone number **MUST** be 11 digits and be prefixed with '+'.
        account_sid: ``Twilio`` API account service ID.
        service_sid: ``Twilio`` API messaging service ID.
        auth_token: ``Twilio`` API authentication token.
        body: The body of the text message (i.e. the message).
        media: HTTP link to multimedia (image, audio, video, ect).
        outgoing: ``Twilio`` account ``PhoneNumber`` to be used in place of ``service_sid``. **NOTE**: The input phone number **MUST** be 11 digits and be prefixed with '+'.

    Returns:
        exit_status: 0 if successful, 1 otherwise.

    Raises:
        RuntimeError: Arises **IF** the: ``auth_token`` is not specified, OR ``media`` and ``body`` are not specified 
            OR ``service_sid`` or ``outgoing`` are not specified.
    """
    # Check input arguments
    if (service_sid is None) and (outgoing is None):
        raise RuntimeError(f"Neither the 'service_sid' nor the 'outgoing' phone number were provided.")

    if auth_token is None:
        raise RuntimeError("No auth token provided.")

    if (media is None) and (body is None):
        raise RuntimeError("No text body OR media was specified as an argument.")

    # Init client
    client: Client = Client(account_sid, auth_token)

    # Construct SMS/MMS
    try:
        message: Client.messages = client.messages \
                    .create(
                         body=body,
                         media_url=media,
                         messaging_service_sid=service_sid,
                         from_=outgoing,
                         to=to
                     )
        exit_status: exit_status = 0
    except TwilioRestException:
        exit_status: exit_status = 1

    return exit_status

In [8]:
def get_dropbox_img_tmp_links(access_token: str,
                              media_files: Optional[List[file]] = None
                             ) -> List:
    """Creates/retrieves temporary HTTP links for files in a dropbox app folder. 
    If no media files are specified, then all of the files in the dropbox folder will have
    temporary links returned.
    
    NOTE: 
        Dropbox ``access_token`` can be found here: https://www.dropbox.com/developers/apps/info/uxjgz81z83cd6pa#settings
    """
    dbx: dropbox.Dropbox = dropbox.Dropbox(access_token)
    
    if (media_files is None):
        media_files: List[str] = []
    
    if len(media_files) == 0:
        for entry in dbx.files_list_folder('').entries:
            media_files.append(entry.path_display)
    
    media_files.sort()
    
    tmp_lnks: List[str] = []
        
    for media_file in media_files:
        tmp_lnk: str = dbx.files_get_temporary_link(media_file)
        tmp_lnks.append(tmp_lnk.link)
    
    return tmp_lnks

In [9]:
def send_mass_msg(csv_file: file,
                  account_sid: str,
                  service_sid: str,
                  auth_token: str,
                  message: str,
                  dry_run: Optional[bool] = False,
                  outgoing: Optional[PhoneNumber] = None,
                  dbx_access_token: Optional[str] = None,
                  media_files: Optional[List[file]] = None,
                  verbose: Optional[bool] = None
                 ) -> Dict[str,List[Literal[exit_status]]]:
    """Description
    NOTE: 
        Dropbox ``access_token`` can be found here: https://www.dropbox.com/developers/apps/info/uxjgz81z83cd6pa#settings
    """
    csv_file: file = os.path.abspath(csv_file)
    df: pd.DataFrame = pd.read_csv(csv_file)
    msg_status: Dict[str,Dict[str,Union[List[exit_status],exit_status]]] = {}
    
    if os.path.isfile(account_sid):
        account_sid: str = read_file_to_str(infile='account.sid')
    
    if os.path.isfile(service_sid):
        service_sid: str = read_file_to_str(infile='service.sid')
    
    if os.path.isfile(auth_token):
        auth_token: str = read_file_to_str(infile='auth.token')
    
    if list(df.columns)[0] != 'contact':
        raise RuntimeError(f"First column should be 'contact', and not {list(df.columns)[0]}.")
    elif list(df.columns)[1] != 'name':
        raise RuntimeError(f"Second column should be 'name', and not {list(df.columns)[1]}.")
    elif list(df.columns)[2] != 'number':
        raise RuntimeError(f"Third column should be 'number', and not {list(df.columns)[2]}.")
    
    # Check for multimedia
    if (dbx_access_token is not None) and (not dry_run):
        img_lnks: List[str] = get_dropbox_img_tmp_links(access_token=dbx_access_token, media_files=media_files)
    else:
        img_lnks: List[str] = []
    
    for _,r in df.iterrows():
        contact: str = r['contact']
        name: str = r['name']
        phone_number: PhoneNumber = r['number']
        
        try:
            phone_number: PhoneNumber = _format_phone_number(num=phone_number)
        except InvalidPhoneNumber:
            print(f"{contact} has an invalid phone number: {phone_number}")
            continue
        
        if dry_run:
            txt_status: exit_status = 1
            img_statuses: List[exit_status] = [1]
        
        if verbose:
            print(f"\nName: {name}")
            print(f"Phone number: {phone_number}")
            print(f"message: {message}")
        
        if not dry_run:
            # Send MMS (if required)
            img_statuses: List[exit_status] = []
            for img_lnk in img_lnks:
                img_status: exit_status = send_sms(to=phone_number,
                                                   account_sid=account_sid,
                                                   service_sid=service_sid,
                                                   auth_token=auth_token,
                                                   media=img_lnk,
                                                   outgoing=outgoing)
                img_statuses.append(img_status)

            # Send SMS
            txt_status: exit_status = send_sms(to=phone_number,
                                               account_sid=account_sid,
                                               service_sid=service_sid,
                                               auth_token=auth_token,
                                               body=message,
                                               outgoing=outgoing)
        
        # Update status dictionary
        tmp_dict: Dict[str,Dict[Union[List[exit_status],exit_status]]] = {
            contact: {
                "SMS status": txt_status,
                "MMS statues": img_statuses
            }
        }
        
        msg_status.update(tmp_dict)
    return msg_status

In [10]:
def read_config(config: file) -> Dict[str,str]:
    """Reads the contents of a YAML (configuration) file to
    a dictionary.

    Arguemnts:
        config: Filename (and path) to configuration file.
    
    Returns:
        Dictionary of strings matched valued pairs.
    """
    config: str = os.path.abspath(
        os.path.realpath(config)
        )
    
    with open(config,"r") as file:
        # cfg: Dict[str,str] = yaml.safe_load(file) 
        return yaml.safe_load(file)

In [11]:
# cfg = 'config.people.yml'
cfg = 'config.test.yml'

In [12]:
read_config(config=cfg)

{'Adebayo': {'phone': 6203916062}, 'Billie': {'phone': 6203916062}}

In [13]:
def peer_match(config: file) -> Dict[str,str]:
    """Performs random peer-to-peer (person-to-person) matching.
    Peer exclusion is allowed if required with the ``exclude`` key
    if required.

    Arguments:
        config: Filename (and path) to configuration file.

    Returns:
        Dictionary with string matched pairs of peers.
    """
    # Read people config file
    ppl_dict: Dict[str,str] = read_config(config=cfg)

    # Set run condition
    run_condition: bool = True
    while run_condition:
        new_dict: Dict = {}
        assigned: List = []
        for p in ppl_dict.keys():
            recips = []
            if 'exclude' in ppl_dict.get(p):
                exclude: str = ppl_dict.get(p).get('exclude',None)

                # NOTE: Split comma separated names, flatten into one list
                exclude: List[str] = exclude.split(" , ")
                exclude: List[List[str]] = [name.split(", ") for name in exclude]; exclude: List[str] = [item for sublist in exclude for item in sublist]
                exclude: List[List[str]] = [name.split(" ,") for name in exclude]; exclude: List[str] = [item for sublist in exclude for item in sublist]
                exclude: List[List[str]] = [name.split(",") for name in exclude]; exclude: List[str] = [item for sublist in exclude for item in sublist]
                # exclude = list(exclude.split(","))

                recips: List[str] = list(set(ppl_dict.keys()) - set(exclude) - set(assigned))
            else:
                recips: List[str] = list(set(ppl_dict.keys()) - set(assigned))

            try:
                recips.remove(p)
            except ValueError:
                pass

            try:
                rand: int = random.randint(0, (len(recips))-1)
                random_recip: str = recips[rand]
                assigned.append(random_recip)
                tmp_dict: Dict[str,str] = {p: random_recip}
                new_dict.update(tmp_dict)
                del tmp_dict
                run_condition: bool = False
            except ValueError:
                run_condition: bool = True
                pass
    return new_dict

In [14]:
peer_match(cfg)

{'Adebayo': 'Billie', 'Billie': 'Adebayo'}

In [39]:
s = "Hanna(h) V, Hannah L, Billie, Adebayo"

In [41]:
s.split(", ")

['Hannah V', 'Hannah L', 'Billie', 'Adebayo']

In [78]:
s1 = "Hannah V,Hannah L,Billie,Adebayo, Betsy, Wayne, Big Man"

In [79]:
s1 = s1.split(" , ")
s1 = [ name.split(" ,") for name in s1 ]; s1 = [item for sublist in s1 for item in sublist]
s1 = [ name.split(", ") for name in s1 ]; s1 = [item for sublist in s1 for item in sublist]
s1 = [ name.split(",") for name in s1 ]; s1 = [item for sublist in s1 for item in sublist]
# s1 = s1.split(",")
s1

['Hannah V', 'Hannah L', 'Billie', 'Adebayo', 'Betsy', 'Wayne', 'Big Man']

In [None]:
s1 = s1.split(" , ")
s1 = [ name.split(" ,") for name in s1 ]; s1 = [item for sublist in s1 for item in sublist]
s1 = [ name.split(", ") for name in s1 ]; s1 = [item for sublist in s1 for item in sublist]
s1 = [ name.split(",") for name in s1 ]; s1 = [item for sublist in s1 for item in sublist]

In [17]:
def gift_exchange(config: str,
                  account_sid: str,
                  service_sid: str,
                  auth_token: str,
                  dry_run: bool = False,
                  budget: float = 15.00,
                  year: int = 2021,
                 ) -> Dict[str,exit_status]:
    """Randomized gift exchange function. Matches a random person
    to another person at random. The matched person is then texted,
    with information about the gift to be given and who the recipient is.
    
    Arguments:
        config: Configuration file that contains names, and phone numbers (and exclusion list(s)).
        account_sid: ``Twilio`` API account service ID.
        service_sid: ``Twilio`` API messaging service ID.
        auth_token: ``Twilio`` API authentication token.
        dry_run: If true, names and recipients are printed. If false, then text messages are sent.
        budget: Dollar amount for the budget.
        year: Year of the gift exchange.
    
    Returns:
        None
    """
    # Read in credentials
    if os.path.isfile(account_sid):
        account_sid: str = read_file_to_str(infile='account.sid')
    
    if os.path.isfile(service_sid):
        service_sid: str = read_file_to_str(infile='service.sid')
    
    if os.path.isfile(auth_token):
        auth_token: str = read_file_to_str(infile='auth.token')
    
    # Format budget
    budget: str = "${:,.2f}".format(float(budget))
    
    # Read information into dictionary
    people: Dict[str,Dict[str,str]] = read_config(config=config)
    person_match: Dict[str,str] = peer_match(config=config)

    runtime_status: Dict = {}

    # Iterate through peer-matched dictionaries and send
    #   text messages
    for person in people.keys():
        phone_num: PhoneNumber = _format_phone_number(num=str(people.get(person).get('phone')))
        peer: str = person_match.get(person)

        message: str = f"""Hi {person},
Welcome to the {int(year)} family gift exchange.
This year, you will be gifting to {peer}.
The recommended spending limit this year is {budget}.

Gifts will be exchanged at family dinner Thursday, Dec. 2, 2021.
        """

        if dry_run:
            status: exit_status = 1
            tmp_dict: Dict[str,str] = {person: status}
            runtime_status.update(tmp_dict)
            print(f"{person} gifts to {peer}.")
        else:
            status: exit_status = send_sms(to=phone_num,
                                           account_sid=account_sid,
                                           service_sid=service_sid,
                                           auth_token=auth_token,
                                           body=message)
            
            tmp_dict: Dict[str,str] = {person: status}
            runtime_status.update(tmp_dict)
    return runtime_status

In [20]:
gift_exchange(config=cfg,
            account_sid='account.sid',
            service_sid='service.sid',
            auth_token='auth.token',
            budget=10.00,
            year=2021,
            dry_run=False)

{'Adebayo': 0, 'Billie': 0}

In [38]:
people: Dict[str,Dict[str,str]] = read_config(config=cfg)
people

{'Adebayo': {'phone': 6203916062, 'exclude': 'Adebayo'},
 'Billie': {'phone': 6203916062}}

In [39]:
person_match: Dict[str,str] = peer_match(config=cfg)
person_match

{'Adebayo': 'Billie', 'Billie': 'Adebayo'}

In [40]:
for person in people.keys():
    print(person)

Adebayo
Billie


In [36]:
person_match.get('Adebayo')

'Billie'

In [48]:
for key,val in people.items():
    print(people.get(key).get('phone'))

6203916062
6203916062


In [57]:
f = "15.00304"
print(f"{round(float(f),2)}")

15.0


In [61]:
"${:,.2f}".format(float(89))

'$89.00'