In [81]:
import importlib
from pathlib import Path

import numpy as np
import pandas as pd

import config
import tests
from modules import module

try:
    importlib.reload(module)  # reload module
except NameError:
    pass

try:
    importlib.reload(tests)  # reload module
except NameError:
    pass

try:
    importlib.reload(config)  # reload module
except NameError:
    pass

# Load DF

In [82]:
path = Path(config.path_raw_file)
df = pd.read_csv(path)

target_cols = [
    "External ID",
    "Display Name",
    "Date of Birth",
    "Email",
    "Phone",
    "Contract Type",
    "From",
    "To",
    "Fully Paid Date",
    "Membership/Display Name",
    "Membership Status",
    "Partner/Branch/Display Name",
    "Partner/Date of Birth",
    "Partner/Age",
    "Partner/City",
    "Partner/Gender",
    "Partner/Household income/month",
    "Partner/Industry/Display Name",
    "Partner/Job Position",
    "Partner/Occupation",
    "Partner/Street",
    "Partner/Street2",
    "Followers (Partners)/Hobby",
    "Followers (Partners)/Interest",
    "Partner/Hobby",
    "Partner/Interest",
]
current_cols = list(df.columns)
map_cols = dict(zip(current_cols, target_cols))
df.columns = df.columns.map(map_cols)

# Clean DF

In [83]:
df_clean = (
    df
    # drop na
    .dropna(subset="From")
    .dropna(subset="To")
    # drop date too old
    .loc[lambda df_: pd.to_datetime(df_["From"]).dt.year >= 2020]
    # drop membership code na
    .loc[lambda df_: ~(df_["Membership/Display Name"].isna())]
    # filter cancelled member and free member and non member
    .loc[
        lambda df_: ~(
            df_["Membership Status"].isin(
                ["Cancelled Member", "Free Member", "Non Member"]
            )
        )
    ]
    # rename column
    .rename(columns=lambda c: module.clean_col_name(c))
    .rename(columns=module.to_rename)
    # drop unused cols
    .drop(columns=module.to_drop)
    .assign(
        # clean city
        city=lambda df_: (df_["city"].str.title().str.strip()),
        # clean dob, if dob1 blank then dob2
        dob=lambda df_: np.where(
            df_["dob"].isna(), pd.to_datetime(df_["dob2"]), pd.to_datetime(df_["dob"])
        ),
        # clean start_date, end_date, fp_date
        start_date=lambda df_: pd.to_datetime(df_["start_date"]),
        end_date=lambda df_: pd.to_datetime(df_["end_date"]),
        fp_date=lambda df_: pd.to_datetime(df_["fp_date"]),
        # clean income
        income=lambda df_: df_["income"].astype(module.income_cat),
        # clean job
        job=lambda df_: module.clean_job(df_),
        # create age
        age=lambda df_: module.get_age(df_),
        # create id
        id=lambda df_: df_["name"] + " " + df_["dob"].astype(str),
        # create membership code
        membership_code=lambda df_: module.get_membership_code(df_["product"]),
        # create membership duration
        membership_duration=lambda df_: module.get_membership_duration(df_["product"]),
    )
    # merge with membership mapping to obtain membership
    .merge(
        right=pd.read_excel(Path("input/membership_mapping.xlsx")),
        on="membership_code",
        how="left",
    )
    # make sure that all corporate is mapped
    .assign(
        is_cpt=lambda df_: np.where(
            df_["product"].str.lower().str.contains("cpt|corporate|corp", regex=True),
            True,
            df_["is_cpt"],
        )
    )
    # create student center and area 
    .assign(
        center=lambda df_: module.clean_center(df_),
        area=lambda df_: module.clean_area(df_),
    )
    # sort by id
    .sort_values(["id", "end_date"], ascending=[True, False])
    # create is next contract col
    .assign(is_next_contract=lambda df_: df_["id"].duplicated(keep="last"))
    # drop unused columns
    .drop(columns=["dob2", "job1", "job2", "name"])
    # drop membership code na
    .loc[lambda df_: ~(df_["membership_code"].isna())]
    # filter staff
    .loc[lambda df_: df_["contract_type"] != "Employee"]
    .loc[lambda df_: df_["core_product"] != "Staff"]
    # sort column
    .sort_index(axis=1)
    # ! create is_active
    .assign(
        active_jan_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 jan 2023"
        ),
        active_feb_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 feb 2023"
        ),
        active_mar_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 mar 2023"
        ),
        active_apr_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 apr 2023"
        ),
        active_may_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 may 2023"
        ),
        active_jun_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 jun 2023"
        ),
        active_jul_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 jul 2023"
        ),
        active_aug_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 aug 2023"
        ),
        active_sep_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 sep 2023"
        ),
        active_oct_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 oct 2023"
        ),
        active_nov_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 nov 2023"
        ),
        active_dec_2023=lambda df_: module.is_active(
            df_, "start_date", "end_date", "1 dec 2023"
        ),
    )
)

# Tests and Assertions

In [84]:
tests.test_all_centers_mapped(df)
tests.test_all_centers_are_filled(df_clean)
tests.test_all_areas_are_filled(df_clean)
tests.test_all_memberships_are_filled(df_clean)
tests.test_all_membership_mapped(df_clean)

# Save DF

In [85]:
import os
file= df_clean
path= f'output/member_data_{Path(config.path_raw_file).stem}.parquet'

if not os.path.exists(path):
    file.to_parquet(path)
    print('File saved.')
else:
    print('File already exist.')

File already exist.


# Experiment

In [86]:
# df.loc[df['Membership/Display Name'].str.lower().str.contains('ilt', na= False)].sort_values('From')

In [87]:
# df['Membership/Display Name'].unique()

In [88]:
# df_clean['is_cpt'].value_counts()

In [89]:
# df["Partner/Branch/Display Name"].unique()