In [None]:
import re
import time
import random
from datetime import datetime, timedelta

import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings("ignore")
from data import campaign, engagement

# Helper Functions


In [None]:
def to_lowercase(df):
	df = df.copy()
	df.rename({i:i.lower() for i in df.columns.values}, axis=1, inplace=True)
	return df

def to_snakecase(df):
	snakecase = {i: re.sub(r"[,.;@#?!&$]+\ *", "", i.strip()).replace(" ", "_") for i in df}
	df.rename(columns=snakecase, inplace=True)
	return df
	
def get_users(path="data/users.csv", date_format = "%Y-%m"):
	users = to_snakecase(to_lowercase(pd.read_csv(path)))
	users['user'] = users['user'].astype(str).str.pad(width=4, side='left', fillchar='0')
	users['birth_year'] = users['birth_year'].astype(str) + '-' + users['birth_month'].astype(str)
	users['birth_year'] = pd.to_datetime(users['birth_year'], format=date_format)
	users = users.drop(columns= ['birth_month'])
	users = users.rename(columns={'user':'customer_id', 'birth_year': 'birth_year_month'})
	return users


def get_creditcards(path="data/credit_cards.csv", date_format = "%m/%Y"):
	credit_cards = to_snakecase(to_lowercase(pd.read_csv("../../../data/credit_cards.csv")))
	credit_cards['user'] = credit_cards['user'].astype(str).str.pad(width=4, side='left', fillchar='0')
	credit_cards['expires'] = pd.to_datetime(credit_cards['expires'], format=date_format)
	credit_cards['acct_open_date'] = pd.to_datetime(credit_cards['expires'], format=date_format)
	credit_cards['year_pin_last_changed'] = pd.to_datetime(credit_cards['year_pin_last_changed'], format="%Y")
	return credit_cards

def get_transactions(path="data/transactions.csv", date_format = "%m/%Y"):
	transactions = to_snakecase(to_lowercase(pd.read_csv(path)))
	transactions.insert(0, 'identifier', transactions.index + 1) 
	transactions['user'] = transactions['user'].astype(str).str.pad(width=4, side='left', fillchar='0')
	transactions = transactions.rename(columns={'card':'card_index'})
	hour_min = transactions['time'].str.split(":", expand=True).rename(columns={0:'hour', 1:'minute'})
	transactions = pd.concat([transactions, hour_min], axis=1)

	date_cols = ['year', 'month', 'day', 'hour', 'minute']
	transactions['date'] = pd.to_datetime(transactions[date_cols])

	cc_no = get_creditcards()[['user', 'card_index', 'card_number']]
	card_no = transactions.merge(cc_no, how='inner', on=['user', 'card_index'])['card_number'].astype(str).str.pad(width=4, side='right', fillchar='0')
	transactions.insert(1, 'card_number', card_no) 
	transactions = transactions.drop(columns= ['card_index', 'time'] + date_cols)
	transactions = transactions.rename(columns={'user':'customer_id'})
	return transactions

def get_churn(users, 
        start_date = datetime(2023, 1, 1),
        end_date = datetime(2024, 10, 31)
    ):
    customer_ids = users["customer_id"].tolist()
    # Helper function to generate random churn date
    def random_churn_date():
        return start_date + timedelta(days=random.randint(0, (end_date - start_date).days))

    # Generate churn data
    churn_data = {
        "customer_id": [],
        # "has_churned": [],
        "churn_date": []
    }

    for customer_id in customer_ids:
        has_churned = random.random() < 0.2  # 10% churn rate
        if has_churned:
            churn_data["customer_id"].append(customer_id) 
            # churn_data["has_churned"].append(has_churned)
            churn_data["churn_date"].append(random_churn_date())

    # Create DataFrame
    churn = pd.DataFrame(churn_data)
    return churn

def get_Products(path="data/recodataset.csv", 
				column_mapping = {
					"fecha_dato": "report_date",
					"ncodpers": "customer_id",
					"ind_empleado": "employee_index",
					"pais_residencia": "country_residence",
					"sexo": "gender",
					"age": "age",
					"fecha_alta": "contract_start_date",
					"ind_nuevo": "new_customer_index",
					"antiguedad": "seniority_months",
					"indrel": "primary_customer_status",
					"ult_fec_cli_1t": "last_primary_customer_date",
					"indrel_1mes": "customer_type_start_month",
					"tiprel_1mes": "customer_relation_type",
					"indresi": "residence_index",
					"indext": "foreigner_index",
					"conyuemp": "spouse_employee_index",
					"canal_entrada": "join_channel",
					"indfall": "deceased_index",
					"tipodom": "address_type",
					"cod_prov": "province_code",
					"nomprov": "province_name",
					"ind_actividad_cliente": "activity_index",
					"renta": "gross_income",
					"segmento": "customer_segment",
					"ind_ahor_fin_ult1": "saving_account",
					"ind_aval_fin_ult1": "guarantee",
					"ind_cco_fin_ult1": "current_account",
					"ind_cder_fin_ult1": "derivada_account",
					"ind_cno_fin_ult1": "payroll_account",
					"ind_ctju_fin_ult1": "junior_account",
					"ind_ctma_fin_ult1": "more_particular_account",
					"ind_ctop_fin_ult1": "particular_account",
					"ind_ctpp_fin_ult1": "particular_plus_account",
					"ind_deco_fin_ult1": "short_term_deposits",
					"ind_deme_fin_ult1": "medium_term_deposits",
					"ind_dela_fin_ult1": "long_term_deposits",
					"ind_ecue_fin_ult1": "e_account",
					"ind_fond_fin_ult1": "funds",
					"ind_hip_fin_ult1": "mortgage",
					"ind_plan_fin_ult1": "pensions",
					"ind_pres_fin_ult1": "loans",
					"ind_reca_fin_ult1": "taxes",
					"ind_tjcr_fin_ult1": "credit_card",
					"ind_valo_fin_ult1": "securities",
					"ind_viv_fin_ult1": "home_account",
					"ind_nomina_ult1": "payroll",
					"ind_nom_pens_ult1": "pensions_payments",
					"ind_recibo_ult1": "direct_debit"
				}
	):
	santender = pd.read_csv(path)
	santender = santender.rename(columns=column_mapping)
	santender['customer_id'] = santender['customer_id'].apply(lambda x: str(x).zfill(4))
	santender = to_snakecase(to_lowercase(santender))
	santender['report_date'] = pd.to_datetime(santender['report_date'])
	santender['contract_start_date'] = pd.to_datetime(santender['contract_start_date'])
	santender['last_primary_customer_date'] = pd.to_datetime(santender['last_primary_customer_date'])
	str_cols = santender.select_dtypes(include='object').columns
	santender[str_cols] = santender[str_cols].apply(lambda x: x.str.strip(), axis=1)
	santender[str_cols] = santender[str_cols].replace(regex=[r'NA'], value=None)
	return santender

users = get_users()
transactions = get_transactions()
churn = get_churn(users)
santender = get_Products()

# Connect to Database

In [None]:
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, Mapped, mapped_column
from sqlalchemy import create_engine, MetaData, Column, Integer, String, Double, DateTime, ForeignKey
from sqlalchemy.dialects.mysql import LONGTEXT


def create_db(user="root", password="msql1234", server="localhost", database="transact"):
    SQLALCHEMY_DATABASE_URL = "mysql+pymysql://{}:{}@{}/{}".format(
        user, password, server, database
    )
    engine = create_engine(SQLALCHEMY_DATABASE_URL)

    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    Base = declarative_base()

    return engine, SessionLocal, Base

engine, SessionLocal, Base = create_db()


# Schemas

In [None]:
class Users(Base):
	__tablename__ = 'users'
	customer_id = Column(String(10), primary_key=True, nullable=False)
	person = Column(String(32))
	current_age = Column(Integer)
	retirement_age = Column(Integer)
	birth_year_month = Column(DateTime)
	gender = Column(String(32))
	address = Column(String(64))
	apartment = Column(Integer)
	city = Column(String(32))
	state = Column(String(32))
	zipcode = Column(String(32))
	latitude = Column(Double)
	longitude = Column(Double)
	per_capita_income = Column(Double)
	yearly_income = Column(Double)
	total_debt = Column(Double)
	fico_score = Column(Double)
	num_credit_cards = Column(Integer)

class Transactions(Base):
	__tablename__ = "transactions"
	identifier = Column(Integer, primary_key=True, autoincrement=True)
	customer_id = Column(String(10), ForeignKey("users.customer_id", ondelete="CASCADE"), nullable=False)
	card_number = Column(String(16), nullable=False)
	date = Column(DateTime)
	amount = Column(Double)
	use_chip = Column(String(32))
	merchant_name = Column(String(32))
	merchant_city = Column(String(32))
	merchant_state = Column(String(32))
	zip = Column(String(16))
	mcc = Column(Integer)
	errors = Column(LONGTEXT)
	is_fraud = Column(String(3))

class Campaigns(Base):
	__tablename__ = "campaign"
	campaign_id = Column(String(32), primary_key=True, nullable=False)
	campaign_name = Column(String(32))
	start_date = Column(DateTime)
	end_date = Column(DateTime)
	target_segment = Column(String(32))
	budget = Column(Double)
	channel = Column(String(32))
	goal = Column(String(32))
	displays = Column(Integer)

class Engagement(Base): # Independent
	__tablename__ = "engagement"
	engagement_id = Column(String(10), primary_key=True)
	campaign_id = Column(String(32), ForeignKey("campaign.campaign_id", ondelete="CASCADE"), nullable=False)
	customer_id = Column(String(10), ForeignKey("users.customer_id", ondelete="CASCADE"), nullable=False)
	# customer_id = Column(String(10), nullable=False)
	engagement_date = Column(DateTime)
	action_type = Column(String(32))
	device_type = Column(String(32))
	feedback_score = Column(Integer)
	conversion_value  = Column(Double)
	

class Churn(Base): # Independent
	__tablename__ = "churn"
	customer_id = Column(String(10), ForeignKey("users.customer_id", ondelete="CASCADE"), primary_key=True)
	churn_date = Column(DateTime)


class Product(Base):
	__tablename__ = 'santender'
	report_date = Column(DateTime)
	customer_id = Column(String(32), primary_key=True)
	employee_index =  Column(String(16))
	country_residence = Column(String(32))
	gender =  Column(String(16))
	age = Column(Integer)
	contract_start_date = Column(DateTime)
	new_customer_index = Column(Integer)
	seniority_months = Column(Integer)
	primary_customer_status = Column(Integer)
	last_primary_customer_date = Column(DateTime)
	customer_type_start_month = Column(Integer)
	customer_relation_type = Column(String(16))
	residence_index = Column(String(16))
	foreigner_index = Column(String(16))
	spouse_employee_index = Column(String(16))
	join_channel = Column(String(16))
	deceased_index = Column(String(16))
	address_type = Column(Integer)
	province_code = Column(Integer)
	province_name = Column(String(32))
	activity_index = Column(Integer)
	gross_income = Column(Double)
	customer_segment = Column(String(32))
	saving_account = Column(Integer)
	guarantee = Column(Integer) 
	current_account = Column(Integer)
	derivada_account = Column(Integer)
	payroll_account = Column(Integer)
	junior_account = Column(Integer)
	more_particular_account = Column(Integer)
	particular_account = Column(Integer)
	particular_plus_account = Column(Integer)
	short_term_deposits = Column(Integer)
	medium_term_deposits = Column(Integer)
	long_term_deposits = Column(Integer)
	e_account = Column(Integer)
	funds = Column(Integer)
	mortgage = Column(Integer)
	pensions = Column(Integer)
	loans = Column(Integer)
	taxes = Column(Integer)
	credit_card = Column(Integer)
	securities = Column(Integer)
	home_account = Column(Integer)
	payroll = Column(Integer)
	pensions_payments = Column(Integer)
	direct_debit = Column(Integer)


Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

# Insert into Database

In [None]:
with engine.connect() as db:
	dct = {'users': users, 'transactions':transactions,
		'churn':churn, 'campaign': campaign,
		'engagement': engagement,
		'santender': santender}
	for k,v in dct.items():
		try:
			v.to_sql(k, con=engine, if_exists='append', index=False)
			db.commit()
			print("{} Ok".format(k))
		except:
			db.rollback()
			print("{} Failed".format(k))
	db.close()

# Example on How to get data

In [None]:
import pandas as pd
import numpy as np

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


def create_db(user="root", password="msql1234", server="localhost", database="transact"):
    SQLALCHEMY_DATABASE_URL = "mysql+pymysql://{}:{}@{}/{}".format(
        user, password, server, database
    )
    engine = create_engine(SQLALCHEMY_DATABASE_URL)

    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    Base = declarative_base()

    return engine, SessionLocal, Base

engine, SessionLocal, Base = create_db()

def get_data(query_string):
	with engine.connect() as db:
		fetched = pd.DataFrame(db.execute(query_string).fetchall())
		db.close()
	return fetched

## Get Raw Churn Data

In [None]:
query_string = sqlalchemy.text(
	"""
	SELECT * 
	FROM users u
	LEFT JOIN churn c
	ON u.customer_id = c.customer_id;
	"""
)
fetched = get_data(query_string)
fetched

## Get Raw Transaction/RFM Data

In [None]:
query_string = sqlalchemy.text(
	"""
	SELECT u.*, t.amount, t.date
	FROM users u, transactions t
	WHERE u.customer_id = t.customer_id;
	"""
)
fetched = get_data(query_string)
fetched

## Get Raw Engagement Data

In [None]:
query_string = sqlalchemy.text(
	"""
	SELECT c.*, e.customer_id, e.engagement_date, 
	e.action_type, e.device_type, e.feedback_score,
	e.conversion_value
	FROM campaign c, engagement e
	WHERE c.campaign_id = e.campaign_id;
	"""
)
fetched = get_data(query_string)
fetched

## Get Raw Clicks/Leads Data

In [None]:
query_string = sqlalchemy.text(
	"""
	SELECT t1.campaign_id,
	SUM(t1.budget) AS mark_spent,
	t1.start_date AS c_date,
	t1.channel as category,
	t1.displays,
	SUM(CASE WHEN t1.action_type = 'clicked' THEN 1 ELSE 0 END) AS clicks,
	SUM(CASE WHEN t1.action_type = 'credentials' THEN 1 ELSE 0 END) AS leads, 
	SUM(CASE WHEN t1.action_type = 'converted' THEN 1 ELSE 0 END) AS orders
	FROM 
	(SELECT c.campaign_id,
	c.campaign_name, 
	c.start_date, c.end_date,
	c.target_segment, 
	c.budget,
	c.channel,
	c.displays,
	e.customer_id, 
	e.engagement_date, 
	e.action_type, e.device_type, e.feedback_score,
	e.conversion_value
	FROM campaign c, engagement e
	WHERE c.campaign_id = e.campaign_id) AS t1
	GROUP BY t1.campaign_id, t1.channel
	ORDER BY t1.campaign_id, t1.start_date, t1.channel;
	"""
)
fetched = get_data(query_string)
fetched

## Get Santander Dataset (Eng)

In [None]:
query_string = sqlalchemy.text(
	"""
	SELECT *
	FROM santender;
	"""
)
fetched = get_data(query_string)
fetched