In [1]:
#
# source = inspect.getsource(train)
# parsed = ast.parse(source)

# for node in ast.walk(parsed):
#     if isinstance(node,ast.Call):
#         if isinstance(node.func, ast.Attribute):
#             if (node.func.value.id == 'layer'):
#                     if(node.func.attr == 'get_dataset'):
#                         print(ast.dump(node))
#                         print(node.args[0].value)


# ast.dump(parsed)

import pandas as pd
import inspect
import os
import sys
import inspect
import ast


class Layer:
    entities = []
    entity_context = None

    def __init__(self, project_name, environment):
        self.project_name = project_name
        self.environment = environment

    def setup(self):
        if os.path.exists(self.environment):
            file1 = open(self.environment, 'r')
            for lib in file1.readlines():
                print(f"Layer Infra: Installing {lib.strip()}...")
        else:
            print(f"Environment file not found: {self.environment}")

    def log_parameter(self, metric, value):
        print(f"\t{Layer.entity_context} > Parameter > {metric}:{value}")

    def log_metric(self, metric, value):
        print(f"\t{Layer.entity_context} > Metric >{metric}:{value}")

    def log(self, message):
        print(f"\t{Layer.entity_context} > {message}")

    def run(self, entities):
        self.entities = []
        for entity in entities:
            if entity._type == "dataset":
                self.entities.append(Dataset(entity))
            elif entity._type == "model":
                self.entities.append(Model(entity))

        print(f"--- Layer Infra: Running Project: {self.project_name} ---")

        self.setup()

        for entity in self.entities:
            entity.run()
        print(f"\n--- Layer Infra: Run Complete! ---")

    def get_dataset(self, name):
        for entity in self.entities:
            if entity.name == name:
                return entity
        raise Exception(f"Entity '{name}' not found!")


class Model:
    result = None

    def __init__(self, func):
        if func:
            self.name = func._name
            self.func = func

    def run(self):
        self.result = self.func()


class Dataset:
    result = None

    def __init__(self, func):
        if func:
            self.name = func._name
            self.func = func

    def run(self):
        self.result = self.func()

    def to_pandas(self):
        return self.result


def dataset(name):
    def inner(func):
        func._type = "dataset"
        func._name = name

        def wrapped(*args):
            Layer.entity_context = func._name
            print(f'\nBuilding {Layer.entity_context}...')
            res = func()
            # TODO save returning entity to catalog
            return res
        wrapped._type = "dataset"
        wrapped._name = name

        return wrapped

    return inner


def model(name):
    def inner(func):
        def wrapped(*args):
            Layer.entity_context = name
            print(f'\nTraining {Layer.entity_context}...')
            res = func()
            # TODO save returning entity to catalog
            return res
        wrapped._type = "model"
        wrapped._name = name

        return wrapped

    return inner

In [23]:
# Import packages
import pandas as pd
import numpy as np
from typing import Any
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score, roc_auc_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

In [24]:
@dataset('orders_dataset')
def read_orders_table() -> Any:
    
    # Read the csv file
    orders_df = pd.read_csv("/Users/burakozen/Jupyter_Projects/data/olist_orders_dataset.csv")
    
    # Converting object types to datetime for further analysis
    orders_df.order_purchase_timestamp = pd.to_datetime(orders_df.order_purchase_timestamp)
    orders_df.order_approved_at = pd.to_datetime(orders_df.order_approved_at)
    orders_df.order_delivered_carrier_date = pd.to_datetime(orders_df.order_delivered_carrier_date)
    orders_df.order_delivered_customer_date = pd.to_datetime(orders_df.order_delivered_customer_date)
    orders_df.order_estimated_delivery_date = pd.to_datetime(orders_df.order_estimated_delivery_date)
    
    # Important Data Info 
    layer.log(f"All order_ids are unique: {orders_df.customer_id.nunique()==len(orders_df)}")
    layer.log(f"Max order purchase date: {orders_df.order_purchase_timestamp.max()}")
    layer.log(f"Min order purchase date: {orders_df.order_purchase_timestamp.min()}")
    layer.log(f"Different order statuses: {orders_df.order_status.unique()}")
    
    return orders_df

In [25]:
@dataset('payments_dataset')
def read_payments_table() -> Any:
    
    # Read the csv file
    payments_df = pd.read_csv("/Users/burakozen/Jupyter_Projects/data/olist_order_payments_dataset.csv")
    
    # Important Data Info
    layer.log(f"Total number of rows: {len(payments_df)}")
    layer.log(f"Total number of unique orders: {payments_df.order_id.nunique()}")
    layer.log(f"Payment Types: {payments_df.payment_type.unique()}")
    layer.log(f"Payment Value Mean: {payments_df.payment_value.mean()}")
    
    return payments_df

In [26]:
@dataset('customers_dataset')
def read_customers_table() -> Any:
    
    # Read the csv file
    customers_df = pd.read_csv("/Users/burakozen/Jupyter_Projects/data/olist_customers_dataset.csv")
    
    # Important Data Info
    layer.log(f"Total number of customers: {customers_df.customer_unique_id.nunique()}")
    layer.log(f"Total number of different cities: {customers_df.customer_city.nunique()}")
    layer.log(f"Total number of different states: {customers_df.customer_state.nunique()}")
    
    return customers_df

In [None]:
@dataset('orders_general_features')
def extract_orders_general_features() -> Any:
    
    # ORDER TABLE BASED FEATURE EXTRACTION - ORDER FEATURES
    
    # Get dataset from Layer and convert it into a pandas dataframe
    orders_df = layer.get_dataset("orders_dataset").to_pandas()
    
    # Drop rows which has NA values in the timestamp columns
    orders_df = orders_df[orders_df['order_approved_at'].notna() 
                          & orders_df['order_purchase_timestamp'].notna() 
                          & orders_df['order_delivered_carrier_date'].notna() 
                          & orders_df['order_approved_at'].notna()
                          & orders_df['order_estimated_delivery_date'].notna() 
                          & orders_df['order_delivered_customer_date'].notna()]
    

    # Data Sanity Check
    orders_df = orders_df.loc[~((orders_df['order_approved_at'] < orders_df['order_purchase_timestamp']) | (orders_df['order_delivered_carrier_date'] < orders_df['order_approved_at'])),:]

    # Computing new features: total_waiting & days_between_estimate_actual_delivery
    orders_df["payment_approvement_waiting"]=(orders_df.order_approved_at - orders_df.order_purchase_timestamp).dt.days
    orders_df["delivered_carrier_waiting"]=(orders_df.order_delivered_carrier_date - orders_df.order_approved_at).dt.days
    orders_df["total_waiting"] = orders_df.payment_approvement_waiting + orders_df.delivered_carrier_waiting
    orders_df["days_between_estimate_actual_delivery"]=(orders_df.order_estimated_delivery_date - orders_df.order_delivered_customer_date).dt.days

    # Select features to be returned
    orders_general_features=orders_df[['order_id','order_status','total_waiting','days_between_estimate_actual_delivery']]
    
    # Important Data Info
    layer.log(f"All total_waiting values non-negative: {(orders_general_features.total_waiting>=0).all()}")
    layer.log(f"Max of days between estimate and actual delivery: {orders_general_features.days_between_estimate_actual_delivery.max()}")
    layer.log(f"Min of days between estimate and actual delivery: {orders_general_features.days_between_estimate_actual_delivery.min()}")

    return orders_general_features


In [None]:
@dataset('orders_payments_features')
def extract_orders_payments_features() -> Any:
    
    # PAYMENT TABLE BASED FEATURE EXTRACTION - ORDER FEATURES
    
    # Fetch dataset from Layer and convert it into a pandas dataframe
    payments_df = layer.get_dataset("payments_dataset").to_pandas()
    
    # Drop rows which has NA values in the timestamp columns
    payments_df = payments_df[payments_df['payment_type'].notna() & payments_df['payment_value'].notna()] 

    # Computing new features: use_voucher & total_payment & payment_type
    payments_df_agg=payments_df\
    .assign(is_voucher= np.where(payments_df['payment_type']=='voucher',1,0))\
    .groupby(['order_id'],as_index=False) \
    .agg(use_voucher=("is_voucher","max"), \
         total_payment=("payment_value","sum"), \
         payment_type=("payment_type","max") \
        )

    # Select columns to be returned
    orders_payments_features=payments_df_agg[['order_id','use_voucher','total_payment','payment_type']]
    
    # Important Data Info
    layer.log(f"All total_payment values non-negative: {(orders_payments_features.total_payment>=0).all()}")
    
    return orders_payments_features

In [29]:
@dataset('customers_features')
def extract_customer_features() -> Any:
    
    # CUSTOMER TABLE BASED FEATURE EXTRACTION - CUSTOMER FEATURES
    
    # Fetch datasets from Layer and convert them into a pandas dataframe
    orders_df = layer.get_dataset("orders_dataset").to_pandas()
    customers_df = layer.get_dataset("customers_dataset").to_pandas()
    
    #Merge 2 dataframes
    orders_customers_merged = orders_df.merge(customers_df,left_on='customer_id',right_on='customer_id',how='left')

    # # Compute a new feature: multiple_order
    orders_customers_merged["total_orders"]=orders_customers_merged.groupby('customer_unique_id')['customer_id'].transform('count')
    orders_customers_merged['multiple_order'] = np.where(orders_customers_merged['total_orders']> 1, 1, 0)

    #Compute a new feature: days_since_order
    dataset_max_date=orders_customers_merged.order_purchase_timestamp.max()
    orders_customers_merged['days_since_order'] = (dataset_max_date-orders_customers_merged.order_purchase_timestamp).dt.days

    # Filter out only the first orders of users in the dataset
    orders_customers_merged["order_rank"]=orders_customers_merged.groupby('customer_unique_id')['order_purchase_timestamp'].rank(method='first')
    orders_customers_merged=orders_customers_merged[orders_customers_merged["order_rank"]==1.0].drop(columns=['order_rank'])

    # Select columns to be returned and rename them accordingly
    orders_customers_merged = orders_customers_merged.rename(columns={"order_id": "first_order_id", "days_since_order": "days_since_first_order"})
    customer_features = orders_customers_merged[['customer_unique_id','customer_city','customer_state','first_order_id','days_since_first_order','multiple_order']]
    
    # Important Data Info
    layer.log(f"Ratio of multiple-orders customers over single-order customers: {round(customer_features.multiple_order.value_counts()[1]/customer_features.multiple_order.value_counts()[0],2)}")

    return customer_features


In [30]:
@dataset('training_data')
def generate_training_data() -> Any:
    
    # TRAINING DATA GENERATION
    
    # Fetch datasets from Layer and convert them into a pandas dataframe 
    orders_general_features = layer.get_dataset("orders_general_features").to_pandas()
    orders_payments_features = layer.get_dataset("orders_payments_features").to_pandas()
    customer_features = layer.get_dataset("customers_features").to_pandas()
    
    
    # Merge dataframes and drop irrelevant columns 
    order_features = orders_general_features.merge(orders_payments_features,left_on='order_id',right_on='order_id',how='left')
    training_data_raw = customer_features.merge(order_features,left_on='first_order_id',right_on='order_id',how='left').drop(columns=['order_id','order_status'])

    # Rename columns
    training_data_raw = training_data_raw.rename(columns={
                                      "total_waiting": "first_order_total_waiting", 
                                      "days_between_estimate_actual_delivery": "first_order_days_between_estimate_actual_delivery",
                                      "use_voucher": "first_order_use_voucher",
                                      "total_payment": "first_order_total_payment",
                                      "payment_type": "first_order_payment_type"
                                     })

    # Decrease number of dimensions in the customer_city and customer_state columns to 6 (before applyling one-hot-encoding)
    top5_cities = ["sao paulo","rio de janeiro","belo horizonte","brasilia","curitiba"]
    top5_states = ["SP","RJ","MG","RS","PR"]
    training_data_raw['customer_city'] = training_data_raw['customer_city'].apply(lambda city: city if city in top5_cities else 'other')
    training_data_raw['customer_state'] = training_data_raw['customer_state'].apply(lambda state: state if state in top5_states else 'other')

    # Create a label column 'churned': If multiple_order is 1, then CHURNED=0, otherwise CHURNED=1. Select only the churned customers if it has been more than 365 days since the first order
    training_data_raw['churned'] = 1 
    training_data_raw.loc[training_data_raw.multiple_order == 1, 'churned'] = 0
    training_data_raw = training_data_raw.loc[ (training_data_raw.churned==0) | ((training_data_raw.churned==1) & (training_data_raw.days_since_first_order > 365))]

    # Select columns to be returned and drop NA rows
    training_data = training_data_raw.drop(columns=['multiple_order']).dropna()
    
    # Important Data Info
    layer.log(f"Number of training data records: {len(training_data)}")
    layer.log(f"Churn user ratio after first order: {round(training_data.churned.value_counts()[1]/len(training_data),2)}")
    
    return training_data


In [31]:
@model('churn_model')
def train_churn_model() -> Any:
    
    # Fetch dataset from Layer and convert it into pandas dataframe
    training_data = layer.get_dataset("training_data").to_pandas()
    
    # Parameters for data split
    test_size_fraction = 0.33
    random_seed = 42   

    # Data Split
    X_train, X_test, Y_train, Y_test = train_test_split(training_data.drop(columns=['customer_unique_id','first_order_id','churned']),
                                                        training_data.churned,
                                                        test_size=test_size_fraction,
                                                        random_state=random_seed)

    
    # Define a One-Hot Encoder Transformer
    categorical_cols = ['customer_city','customer_state','first_order_payment_type']
    transformer = ColumnTransformer(transformers=[('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)],remainder='passthrough')
    
    # Model: Define a Gradient Boosting Classifier
    learning_rate = 0.01
    max_depth = 6
    max_features = 'sqrt'
    min_samples_leaf = 10
    n_estimators = 100
    subsample = 0.8
    random_state = 42
    
    layer.log_parameter("Test Size Fraction:",test_size_fraction)
    layer.log_parameter("Random Seed:",random_seed)
    layer.log_parameter("Learning Rate:",learning_rate)
    layer.log_parameter("Max Depth:",max_depth)
    layer.log_parameter("Max Number of Candidate Features:",max_features)
    layer.log_parameter("Min Number of Samples in a Leaf:",min_samples_leaf)
    layer.log_parameter("Number of Estimators:",n_estimators)
    layer.log_parameter("Subsample Ratio:",subsample)
    layer.log_parameter("Model's Random State",random_state)
    
    model = GradientBoostingClassifier(learning_rate=learning_rate,
                                       max_depth=max_depth,
                                       max_features=max_features,
                                       min_samples_leaf=min_samples_leaf,
                                       n_estimators=n_estimators,
                                       subsample=subsample,
                                       random_state=random_state)
    


    # Pipeline Fit
    pipeline = Pipeline(steps=[('t', transformer), ('m', model)])
    pipeline.fit(X_train, Y_train)

    # Model Evaluation
    # 1. Predict probabilities of target 1:Churned
    probs = pipeline.predict_proba(X_test)[:,1]
    # 2. Calculate average precision and area under the receiver operating characteric curve (ROC AUC)
    avg_precision = average_precision_score(Y_test, probs, pos_label=1)
    auc = roc_auc_score(Y_test, probs)
    
    # Important Model Metrics
    layer.log_metric("Average Precision Value:",avg_precision)
    layer.log_metric("Area under ROC:",auc)
    
    return pipeline

In [32]:
# ++ init Layer
layer = Layer(project_name="churn_project", environment='requirements.txt')

# ++ To run the whole project on Layer Infra
layer.run([read_orders_table, 
           read_payments_table, 
           read_customers_table,
           extract_orders_general_features,
           extract_orders_payments_features,
           extract_customer_features,
           generate_training_data,train_churn_model])




--- Layer Infra: Running Project: churn_project ---
Layer Infra: Installing scikit-learn>=0.18...

Building orders_dataset...
	orders_dataset > All order_ids are unique: True
	orders_dataset > Max order purchase date: 2018-10-17 17:30:18
	orders_dataset > Min order purchase date: 2016-09-04 21:15:19
	orders_dataset > Different order statuses: ['delivered' 'invoiced' 'shipped' 'processing' 'unavailable' 'canceled'
 'created' 'approved']

Building payments_dataset...
	payments_dataset > Total number of rows: 103886
	payments_dataset > Total number of unique orders: 99440
	payments_dataset > Payment Types: ['credit_card' 'boleto' 'voucher' 'debit_card' 'not_defined']
	payments_dataset > Payment Value Mean: 154.10038041699553

Building customers_dataset...
	customers_dataset > Total number of customers: 96096
	customers_dataset > Total number of different cities: 4119
	customers_dataset > Total number of different states: 27

Building orders_general_features...
	orders_general_features > A