## Import reina and other necessary libraries. Initialize a spark session.

In [1]:
from reina.MetaLearners import SLearner
from reina.MetaLearners import TLearner
from reina.MetaLearners import XLearner
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import SparkSession


# Initialize spark session
spark = SparkSession \
            .builder \
            .appName('Meta-Learner-Spark') \
            .getOrCreate()

## Read toy data. Replace .load() with the test_data.csv location -- this location could be a local one (no cluster) or it could be on a distributed storage system (e.g., HDFS)

*Note: Code below assumes data generated by our script (for specifics, please refer to our toy data generation in the README). You could also modify the code accordingly to use your own data.*

In [33]:
df = spark.read \
          .format("csv") \
          .option('header', 'true') \
          .option("inferSchema" , "true")\
          .load("../../telco-churn.csv")  # replace with the location of test_data.csv

# Drop columns not needed
df = df.drop("customerID")
df = df.drop("gender")
df = df.drop("partner")
df = df.drop("PaymentMethod")

# Print out dataframe schema
print(df.schema)
print(df.show(5))

StructType(List(StructField(SeniorCitizen,IntegerType,true),StructField(Dependents,StringType,true),StructField(tenure,IntegerType,true),StructField(PhoneService,StringType,true),StructField(MultipleLines,StringType,true),StructField(InternetService,StringType,true),StructField(OnlineSecurity,StringType,true),StructField(OnlineBackup,StringType,true),StructField(DeviceProtection,StringType,true),StructField(TechSupport,StringType,true),StructField(StreamingTV,StringType,true),StructField(StreamingMovies,StringType,true),StructField(Contract,StringType,true),StructField(PaperlessBilling,StringType,true),StructField(MonthlyCharges,DoubleType,true),StructField(TotalCharges,StringType,true),StructField(Churn,StringType,true)))
+-------------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------+------------+-----+
|SeniorCitizen|Dependents|tenure|Pho

In [34]:
# Fix some data types...
df = df.withColumn("TotalCharges", df.TotalCharges.cast("float"))

## Minimum pre-processing

Transform categorical data using string index and dummy variables

In [35]:
from pyspark.ml.feature import StringIndexer

categorical_feats = ["Dependents", "PhoneService", "MultipleLines", "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies", "Contract", "PaperlessBilling", "Churn"]

for feature in categorical_feats:
    indexer = StringIndexer(inputCol=feature, outputCol=feature+"_index")
    df = indexer.fit(df).transform(df)
    df = df.drop(feature)

In [36]:
from pyspark.ml.feature import OneHotEncoder

one_hot_features = ["MultipleLines_index", "InternetService_index", "Contract_index"]


for feature in one_hot_features:
    encoder = OneHotEncoder(inputCol=feature, outputCol=feature+"_ohe")
    df = encoder.fit(df).transform(df)
    df.drop(feature)

## S-leaner

In [47]:
print((df.count(), len(df.columns)))

(7043, 20)


In [49]:
from pyspark.sql.functions import col


df = df.dropna()
print((df.count(), len(df.columns)))

(7032, 20)


In [51]:
# Set up necessary parameters
treatments = ['PhoneService_index']
outcome = 'Churn_index'

# Arbitrary estimator. Can replace with other ML algo.
estimator = RandomForestRegressor(featuresCol="features", labelCol=outcome)

# Fit S-learner
spark_slearner = SLearner()
spark_slearner.fit(data=df, treatments=treatments, outcome=outcome, estimator=estimator)

# Get heterogeneous treatment effects (cate for individual samples and ate for averaged treatment effect)
cate, ate = spark_slearner.effects(X=df, treatment=treatments[0])
print(cate)
print(ate)

NameError: name 'df1' is not defined

## T-leaner

In [None]:
# Set up necessary parameters
treatments = ['treatment']
outcome = 'outcome'

# Arbitrary estimators. Can replace with other ML algo.
estimator_1 = RandomForestRegressor()
estimator_0 = RandomForestRegressor()

# Fit T-learner
spark_tlearner = TLearner()
spark_tlearner.fit(data=df, treatments=treatments, outcome=outcome,
                   estimator_0=estimator_0, estimator_1=estimator_1)

# Get heterogeneous treatment effects (cate for individual samples and ate for averaged treatment effect)
cate, ate = spark_tlearner.effects()
print(cate)
print(ate)

## X-leaner

In [None]:
# Set up necessary parameters
treatments = ['treatment']
outcome = 'outcome'

# Arbitrary estimators. Can replace with other ML algo.
estimator_11 = RandomForestRegressor()
estimator_10 = RandomForestRegressor()
estimator_21 = RandomForestRegressor()
estimator_20 = RandomForestRegressor()
propensity_estimator = RandomForestClassifier()

# Fit X-learner
spark_xlearner = XLearner()
spark_xlearner.fit(data=df, treatments=treatments, outcome=outcome, 
                       estimator_10=estimator_10, estimator_11=estimator_11, 
                       estimator_20=estimator_20, estimator_21=estimator_21,
                       propensity_estimator=propensity_estimator)

# Get heterogeneous treatment effects (cate for individual samples and ate for averaged treatment effect)
cate, ate = spark_xlearner.effects()
print(cate)
print(ate)