# Prueba Práctica: Generación de Alertas basadas en Comportamiento de Usuario

The objective of this project is to develop a Python program that monitors a customer orders database and generates an automatic alert when a specific customer records three or more orders in a "returned" or "canceled" state in the same month. The program should be robust, easily integrable with existing systems, and demonstrate proper exception handling.

To achieve this I have used a combination of tools: 

1. **FastAPI** web framework to build a local server and it's API endpoints.
1. **MongoDB Atlas** as a cloud database service in combination with **Pymongo** driver to work and interact with the database

In [1]:
from dotenv import dotenv_values
config = dotenv_values(".env")

In order to connect the client application to the mongoDB Atlas database I will use the Uniform Resource Identifier (URI) which includes the hostname of the mongoDB Atlas cluster, the authentication credentials and the database name

The "Routes.py" file contains the API endpoints and the different functions called when each of them gets accessed to

In [2]:
import random
from bson import ObjectId
import pandas as pd
from pymongo.database import Database


#Read the data of the simulated database provided by Rocketfy
data = pd.read_csv('db_envios_challenge.csv')

#These arrays will be used afterwards to generate fake names for the Clients of the simulated database that don't contain a name already
maleFirstNames = ["Henry","Simeon","Payton","Cortez","Dwayne","Messiah","Austin","Raiden","Marvin","Emerson","Michael","Kamron","Frank","Ivan","Camden","Corbin","Roman","Skylar","Jase","Aron"]
femaleFirstNames = ["Livia","Brooklynn","Baylee","Khloe","Autumn","Thalia","Azul","Mylee","Nia","Emely","Justine","Itzel","Kyla","Aliza","Jaylyn","Laylah","Marisa","Donna","Sandra","Michaela"]
lastNames = ["Terrell","Melendez","Petersen","Ibarra","Silva","Reeves","Robinson","Choi","Larson","Kim","Hines","Shelton","Kennedy","Nguyen","Walker","Hampton","Lynch","Goodwin","Cole","Stevenson","Castro","Osborne","Underwood","Leach","Flynn","Sloan","Burch","Tran","Bowers","Chan"]

#These regular expressions will be used afterwards to help in the creation of the pipelines and filtering for the MongoDB queries

patternCompanyClients = r"(\w+)_(\d+)" #This pattern will be used to extract the strings with "text_Number" structure found inside "order_vendor_dbname" column, for example "bigcolors_1381618804" 
patternClients = r"^[a-zA-Z0-9]{24}$" #This pattern will be used to extract the strings with an alphanumerical structure found inside "order_vendor_dbname" column, for example "6465208fbc1391265257ed5d"
patternValidDates = r"^((?:19|20)\d\d)-((?:0[1-9]|1[0-2]))-(?:0[1-9]|[12][0-9]|3[01])$" #This pattern will be used to extract the dates with 'yyyy-mm-dd' structure found inside "shipping_date" column, for example "2023-06-01"

#This function generates simulated info of a "Client" for the registers that contain alphanumerical values inside the "order_vendor_dbname" column in the simulated data
def generateClientInfo(isMale : bool, identificationNumber : str):

    if(isMale):
        random_name = random.choice(maleFirstNames) 
    else:
        random_name = random.choice(femaleFirstNames)

    random_lastName = random.choice(lastNames)

    result = {"name" : " ".join([random_name, random_lastName]), 
              "email" : random_name+random_lastName+"_"+str(random.randint(1,100))+"@gmail.com", 
              "_id" : ObjectId(identificationNumber),
              "identificationNumber": identificationNumber} 
    
    return result

#This function generates the info of a "Company Client" for the registers that contain "text_Number" like values inside the "order_vendor_dbname" column in the simulated data
def generateCompanyClientInfo(companyClientName : str, identificationNumber : str):

    result = {"name" : companyClientName, 
              "email" : companyClientName+"_"+str(random.randint(1,100))+"@gmail.com", 
              "identificationNumber" : companyClientName +"_"+ identificationNumber} 
    
    return result

def initData(database : Database):

    clientsCounter = 0

    #Access to database
    database["Orders"].insert_many(data.to_dict('records'))

    # Build the aggregation pipelines
    pipelineCompanyClients = [
        {
            #Filter documents that follow the "companyClientName + _ + number" expression
            '$match': {
            'order_vendor_dbname': {'$regex': patternCompanyClients}
            }
        },
        {
            #Group the query by the column "order_vendor_dbname" previously filtered
            '$group': {
            '_id': '$order_vendor_dbname',
            'count': {'$sum': 1}
        }
        },
        { 
            #Save all data returned by regexFind in the field patternLike
            '$project': {
                '_id': 0,
                'patternLike': {
                    '$regexFind': {
                        'input': '$_id',
                        'regex': patternCompanyClients
                    }
                }
            }
        },
        { 
            #Decompose captured fields into their respective fields 
            '$project': {
                '_id': 0,
                'companyClientName':  {'$arrayElemAt' : ['$patternLike.captures', 0]},
                'companyIdentificationNumber': {'$arrayElemAt' : ['$patternLike.captures', 1]}
            }
        },
        
    ]

    pipelineClients = [
        {
            #Filter documents that have no special characters and are 24 characters long, in other words, ObjectsIds
            '$match': {
            'order_vendor_dbname': {'$regex': patternClients}
            }
        },
        {
            #Group the query by the column "order_vendor_dbname" previously filtered
            '$group':{
                '_id': '$order_vendor_dbname',
                'count': {'$sum': 1}
            }
        },
        {
            #Save the objectId element
            '$project':{
                '_id':0,
                'identificationNumber': '$_id'
            }
        }
    ]
    
    #Aggregate to the database
    print("printing results")
    clientsList = []

    #Generate Company clients Info 
    for document in database["Orders"].aggregate(pipelineCompanyClients):
        client = generateCompanyClientInfo(document["companyClientName"], document["companyIdentificationNumber"])
        clientsCounter += 1
        clientsList.append(client)

    #Generate Clients Info
    for document in database["Orders"].aggregate(pipelineClients):
        client = generateClientInfo(random.choice([True, False]), document["identificationNumber"])
        clientsCounter += 1
        clientsList.append(client)        

    print(clientsCounter)

    #Insert all Clients info into the database
    database['Clients'].insert_many(clientsList)
      
    return f"Sucessfully generated {clientsCounter} clientes"

In [3]:
from emailservice import sendEmail

def sendAlarm(database: Database):

    #Access to database 
    alarmsCounter = 0

    pipelineAlarms = [
        {
            '$match': {
                'shipping_date' : {'$regex': patternValidDates},
                'shipping_status': {"$in": ["cancelled", "returned"]},
                '$or': [{'order_vendor_dbname': {'$regex': patternCompanyClients}}, {"order_vendor_dbname": {"$regex": patternClients}}]
            }
        },
        {
            '$addFields':{
                'regexResult': {'$regexFind': {
                        'input': '$shipping_date',
                        'regex': patternValidDates
                    }
                }
            }
        },
        {
            '$project':{
                '_id': 0,
                'shipping_id': '$shipping_id',
                'shipping_year': {'$arrayElemAt' : ['$regexResult.captures', 0] },
                'shipping_month': {'$arrayElemAt' : ['$regexResult.captures', 1] },
                'order_vendor_dbname': '$order_vendor_dbname'
            }
        },
        {
            '$group':{
                '_id': {
                    'order_vendor_dbname': '$order_vendor_dbname',
                    'shipping_year': '$shipping_year',
                    'shipping_month': '$shipping_month'
                },
                'shipping_ids': { '$push': '$shipping_id'},
                'count': {'$sum': 1}
            }
        },
        {
             '$match': {
                "count": { "$gte": 3 }
            }
        },
        {
            '$limit': 5
        },
        {
            '$lookup': {
                "from": 'Clients',
                "localField": "_id.order_vendor_dbname",
                "foreignField": "identificationNumber",
                "as": "client"
            }
        },
        {
            '$project':{
                '_id': 0,
                'clientName' : {'$arrayElemAt' : ['$client.name', 0]},
                'shipping_year': '$_id.shipping_year',
                'shipping_month': '$_id.shipping_month',
                'shipping_Id': '$shipping_ids',
                'clientEmail' : {'$arrayElemAt': ['$client.email', 0]}
            }
        } 
    ]

    for document in database["Orders"].aggregate(pipelineAlarms):
        result = sendEmail(document['clientEmail'], document['clientName'], document['shipping_year'], document['shipping_month'], document['shipping_Id'])
        print(result)
        alarmsCounter += 1
    print(alarmsCounter)
    
    return f"{alarmsCounter} emails have been sent"

In [4]:
from pymongo import MongoClient

mongodb_client =  MongoClient(config["ATLAS_URI"], uuidRepresentation="standard")
database = mongodb_client[config["DB_NAME"]]
print("Connected to the MongoDB database!")

Connected to the MongoDB database!


In [5]:
#initData(database)

In [6]:
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job():
    
    sendAlarm(database)
    print("Job executed at:", datetime.datetime.now())

# Create a scheduler instance
scheduler = BlockingScheduler()

# Add a job that runs every 5 seconds
scheduler.add_job(my_job, 'interval', minutes=1)

try:
    print("Press Ctrl+C to exit.")
    scheduler.start()
except (KeyboardInterrupt, SystemExit):
    # Shut down the scheduler gracefully on Ctrl+C or system exit
    scheduler.shutdown()


Press Ctrl+C to exit.
Message sent
Message sent
Message sent
Message sent
Message sent
5
Job executed at: 2023-11-13 13:47:25.082814
