Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added functionality to mark users as churned #4

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ TESTING=TRUE
DEBUG=FALSE
FLASK_ENV=development
SECRET_KEY=
SLACK_TOKEN=
SLACK_CHANNEL=
GOOGLE_APPLICATION_CREDENTIALS=
DATASET_LOCATION=
DATASET_ID=
USER_CREATION_ALERT_THRESHOLD=
DB_USER=
DB_PASSWORD=
DB_NAME=
DB_HOST=
DB_PORT=
RAPID_PRO_AUTHORIZATION_TOKEN=
CHURNED_USER_GROUP_NAME=
RAPID_PRO_API_URL=
31 changes: 31 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#[ISSUE_ID]
<!--- If there is an open issue, please link to the issue here by replacing [ISSUE_ID]-->
<!-- Make sure the PR is against the `develop` branch -->

### Please complete the following steps and check these boxes before filing your PR:


### Types of changes
<!--- What types of changes does your code introduce? -->
- [ ] Bug fix (a change which fixes an issue)
- [ ] New feature (a change which adds functionality)


### Short description of what this resolves:
<!--- Describe your changes in detail -->
<!--- Why these change required? What problem does it solve? -->


### Checklist:
<!--- Mark the checkboxes accordingly. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're here to help! -->
- [ ] I have performed a self-review of my own code.
- [ ] The code follows the style guidelines of this project.
- [ ] The code changes are passing the CI checks
- [ ] I have documented my code wherever required.
- [ ] The changes requires a change to the documentation.
- [ ] I have updated the documentation based on the my changes.
<!--- TODO: need to uncomment these two checklist once we start with test cases.
- [ ] I have added tests to cover my changes (if not applicable, please state why)
- [ ] All new and existing tests are passing.
-->
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ jobs:
project_id: ${{ secrets.GCP_PROJECT_ID }}
entry_point: trigger
max_instances: 0 # Remove limitation
env_vars: ENV=production,TESTING=${{ secrets.TESTING }},DEBUG=${{ secrets.DEBUG }},DATASET_ID=${{ secrets.DATASET_ID }},SLACK_CHANNEL=${{ secrets.SLACK_CHANNEL }},SLACK_TOKEN=${{ secrets.SLACK_TOKEN }},DATASET_LOCATION=${{ secrets.DATASET_LOCATION }},SECRET_KEY=${{ secrets.SECRET_KEY }},GOOGLE_APPLICATION_CREDENTIALS=${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
env_vars: ENV=production,TESTING=${{ secrets.TESTING }},DEBUG=${{ secrets.DEBUG }},DATASET_ID=${{ secrets.DATASET_ID }},DATASET_LOCATION=${{ secrets.DATASET_LOCATION }},SECRET_KEY=${{ secrets.SECRET_KEY }},GOOGLE_APPLICATION_CREDENTIALS=${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }},DB_USER=${{ secrets.DB_USER }},DB_PASSWORD=${{ secrets.DB_PASSWORD }},DB_NAME=${{ secrets.DB_NAME }},DB_PORT=${{ secrets.DB_PORT }},CONNECTION_NAME=${{ secrets.CONNECTION_NAME }},RAPID_PRO_AUTHORIZATION_TOKEN=${{ secrets.RAPID_PRO_AUTHORIZATION_TOKEN }},CHURNED_USER_GROUP_NAME=${{ secrets.CHURNED_USER_GROUP_NAME }},RAPID_PRO_API_URL=${{ secrets.RAPID_PRO_API_URL }}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ An API to connect with Rapid Pro to update fields or groups by triggering it.
3. Activate the virtual environment
```sh
source ./venv/bin/activate (For Bash)
.\venv\Scripts\activate (For Powershell/CMD)
./venv/Scripts/activate (For Powershell/CMD)
```
4. Install the dependencies:
```sh
Expand Down
6 changes: 6 additions & 0 deletions api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config.from_object("config")
db = SQLAlchemy(app)
9 changes: 9 additions & 0 deletions api/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import

from api import db
from datetime import datetime


class TimestampMixin(object):
created_on = db.Column(db.DateTime, default=datetime.now)
updated_on = db.Column(db.DateTime, onupdate=datetime.now, default=datetime.now)
4 changes: 4 additions & 0 deletions api/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .churned_users import *
from .user_group import *
from .user_program import *
from .user import *
25 changes: 25 additions & 0 deletions api/models/churned_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from api import db, app
from flask_sqlalchemy import BaseQuery
from api.mixins import TimestampMixin


class ChurnedUsersQuery(BaseQuery):
def get_by_user_id(self, user_id):
return (
self.filter(ChurnedUsers.user_id == user_id)
.order_by(ChurnedUsers.id.desc())
.first()
)


class ChurnedUsers(TimestampMixin, db.Model):
query_class = ChurnedUsersQuery
__tablename__ = "churned_users"

id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
user_program_id = db.Column(db.Integer, db.ForeignKey("user_program.id"))
user_phone = db.Column(db.String(20), nullable=False)
previous_status = db.Column(db.String(50), unique=False)
start_date = db.Column(db.Date)
end_date = db.Column(db.Date, nullable=True)
31 changes: 31 additions & 0 deletions api/models/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from api.mixins import TimestampMixin
from api import db
from flask_sqlalchemy import BaseQuery


class UserQuery(BaseQuery):
def get_by_phone(self, phone):
return self.filter(User.phone.contains(phone[-10:])).first()

def get_by_id(self, id):
return self.filter(User.id == id).first()


class User(TimestampMixin, db.Model):
query_class = UserQuery
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), unique=True, nullable=False)
phone = db.Column(db.String(50), nullable=False)
address_line_1 = db.Column(db.String(500))
address_line_2 = db.Column(db.String(500))
postal_code = db.Column(db.String(50))
partner_id = db.Column(db.Integer, db.ForeignKey("partner.id"))
city = db.Column(db.String(100))
district = db.Column(db.String(100))
state = db.Column(db.String(100))
country = db.Column(db.String(50))

@classmethod
def get_by_user_id(self, user_id):
return User.query.get_by_id(user_id)
25 changes: 25 additions & 0 deletions api/models/user_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from api import db
from flask_sqlalchemy import BaseQuery
from api.mixins import TimestampMixin


class UserGroupQuery(BaseQuery):
def get_by_user_id(self, user_id):
return self.filter(UserGroup.user_id == user_id).all()


class UserGroup(TimestampMixin, db.Model):
query_class = UserGroupQuery
__tablename__ = "user_group"

class UserGroupStatus(object):
ACTIVE = "active"
INACTIVE = "inactive"

id = db.Column(db.Integer, primary_key=True)
user_phone = db.Column(db.String(50), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
registration_id = db.Column(db.Integer, db.ForeignKey("registration.id"))
group_name = db.Column(db.String(255), nullable=False)
group_uuid = db.Column(db.String(255))
status = db.Column(db.String(100))
32 changes: 32 additions & 0 deletions api/models/user_program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from api import db, app
from flask_sqlalchemy import BaseQuery
from api.mixins import TimestampMixin


class UserProgramQuery(BaseQuery):
def get_latest_active_user_program(self, user_id):
return (
self.filter(
UserProgram.user_id == user_id,
UserProgram.status == UserProgram.Status.IN_PROGRESS,
)
.order_by(UserProgram.id.desc())
.first()
)


class UserProgram(TimestampMixin, db.Model):
query_class = UserProgramQuery
__tablename__ = "user_program"

id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
program_id = db.Column(db.Integer, db.ForeignKey("program.id"))
preferred_time_slot = db.Column(db.String(50))
status = db.Column(db.String(50))
start_date = db.Column(db.Date)
end_date = db.Column(db.Date)

@classmethod
def get_by_user_id(self, user_id):
return UserProgram.query.get_latest_active_user_program(user_id)
2 changes: 2 additions & 0 deletions api/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .rapid_pro_services import *
from .rp_ivr_system_services import *
Empty file.
34 changes: 34 additions & 0 deletions api/services/rapid_pro_services/update_user_group_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from api import app
import requests
from api.models.user_group import UserGroup


class UpdateUserGroupService:
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
def fetch_existing_groups_of_user(self, user_id):
groups = []
user_group = UserGroup.query.get_by_user_id(user_id)
for record in user_group:
groups.append(record.group_name)
return groups

def add_group(self, data, new_group):
for record in data:
user_id = record.user_id
phone = record.user_phone
group = self.fetch_existing_groups_of_user(user_id)
group.append(new_group)
token = app.config["RAPID_PRO_AUTHORIZATION_TOKEN"]
api_url = app.config["RAPID_PRO_API_URL"]
headers = {
"Authorization": "Token " + token,
"Content-Type": "application/json",
}
post_data = {"groups": group}

update_group = requests.post(
api_url + phone,
headers=headers,
json=post_data,
)
if update_group.status_code == 200:
return True
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
100 changes: 100 additions & 0 deletions api/services/rp_ivr_system_services/churn_users_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from api import app, db
from google.cloud import bigquery
from datetime import datetime
from api.models.user_program import UserProgram
from api.models.churned_users import ChurnedUsers
from api.services.rapid_pro_services.update_user_group_service import (
UpdateUserGroupService,
)
from sqlalchemy import update


class ChurnUsersService(object):
def __init__(self):
self.bigquery_client = bigquery.Client()
self.dataset_id = app.config["DATASET_ID"]

def fetch_users_to_mark_churned(self):
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
query = self.query_to_fetch_users_to_mark_churn()
data = self.bigquery_client.query(query)
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
data_list = list(data)
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
user_added = self.add_churned_user(data_list)
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
churned_group = app.config["CHURNED_USER_GROUP_NAME"]
if user_added:
rp_updated = UpdateUserGroupService().add_group(
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
data_list, new_group=churned_group
)
return True

def query_to_fetch_users_to_mark_churn(self):
query = f"""
With user_details as (
SELECT
right(r.user_phone, 10) as user_phone,
r.user_id,
up.status,
up.id as user_program_id
from
`{self.bigquery_client.project}.{self.dataset_id}.registration` as r
left join `{self.bigquery_client.project}.{self.dataset_id}.user_program` as up on r.user_id = up.user_id
and r.data_source = up.data_source
where
r.data_source = 'rp_ivr'
and DATE(up.start_date) < DATE_ADD(CURRENT_DATETIME(), INTERVAL -3 MONTH)
AND up.status = 'in-progress'
),
valid_campaigns AS (
SELECT
DISTINCT(right(cle.from_number, 10)) as user_phone
FROM
`{self.bigquery_client.project}.{self.dataset_id}.call_log_event` as cle
JOIN
user_details ud
ON
ud.user_phone = right(cle.from_number, 10)
AND DATE(cle.pick_time) >= DATE_ADD(CURRENT_DATETIME(), INTERVAL -3 MONTH)
AND CAST(cle.duration AS INT64) >= 20 )
SELECT
ud.user_program_id,
ud.user_id,
ud.status,
ud.user_phone
FROM
user_details ud
LEFT JOIN
valid_campaigns vc
ON
ud.user_phone = vc.user_phone
WHERE
vc.user_phone IS NULL
"""
return query

def mark_users_as_churned(self, user_ids):
update_query = (
update(UserProgram)
.where(UserProgram.user_id.in_(user_ids))
.values(status="churned")
)
db.session.execute(update_query)
db.session.commit()
return True

def add_churned_user(self, data):
churned_users = []
user_ids = []
for record in data:
user_ids.append(record.user_id)
churned_user = ChurnedUsers(
user_id=record.user_id,
user_program_id=record.user_program_id,
user_phone=record.user_phone,
previous_status=record.status,
start_date=datetime.now().date(),
vivek-kumar-pandey marked this conversation as resolved.
Show resolved Hide resolved
end_date=None,
)
churned_users.append(churned_user)
db.session.bulk_save_objects(churned_users)
db.session.commit()
status_updated = self.mark_users_as_churned(user_ids)
return status_updated
47 changes: 47 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Flask configuration."""
import os

FLASK_ENV = os.environ.get("FLASK_ENV", "development")


if FLASK_ENV == "development":
from os import environ, path
from dotenv import load_dotenv

basedir = path.abspath(path.dirname(__file__))
load_dotenv(path.join(basedir, ".env"))

TESTING = os.environ.get("TESTING")
DEBUG = os.environ.get("DEBUG")
GOOGLE_APPLICATION_CREDENTIALS = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
DATASET_LOCATION = os.environ.get("DATASET_LOCATION")
DATASET_ID = os.environ.get("DATASET_ID")


# Database configuration
POSTGRES = {
"user": os.environ.get("DB_USER"),
"password": os.environ.get("DB_PASSWORD"),
"host": os.environ.get("DB_HOST"),
"port": os.environ.get("DB_PORT"),
"database": os.environ.get("DB_NAME"),
"connection_name": os.environ.get("CONNECTION_NAME"),
}

SQLALCHEMY_DATABASE_URI = (
"postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(database)s" % POSTGRES
)

# For socket based connection
if FLASK_ENV == "staging":
SQLALCHEMY_DATABASE_URI = (
"postgresql://%(user)s:%(password)s@/%(database)s?host=%(connection_name)s/"
% POSTGRES
)

SQLALCHEMY_TRACK_MODIFICATIONS = True
WTF_CSRF_ENABLED = True
SECRET_KEY = os.environ.get("SECRET_KEY")
RAPID_PRO_AUTHORIZATION_TOKEN = os.environ.get("RAPID_PRO_AUTHORIZATION_TOKEN")
CHURNED_USER_GROUP_NAME = os.environ.get("CHURNED_USER_GROUP_NAME")
RAPID_PRO_API_URL = os.environ.get("RAPID_PRO_API_URL")