In [2]:
import sys
sys.path.append("..")

In [3]:
from rpy2.robjects.packages import importr, data
import rpy2.robjects as ro
import pandas as pd
import rpy2.robjects as ro
from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
from rpy2.robjects.conversion import localconverter
from rpy2.rinterface_lib.embedded import RRuntimeError

In [4]:
geo_lift = importr('GeoLift')

In [6]:
geo_lift_pre_test_r_df = data(geo_lift).fetch('GeoLift_PreTest')["GeoLift_PreTest"]
with localconverter(ro.default_converter + pandas2ri.converter):
  geo_lift_pre_test = ro.conversion.rpy2py(geo_lift_pre_test_r_df)

In [9]:
import pandas as pd
from typing import List, Optional
from pandera import DataFrameSchema, Column
import pandera as pa
from pandera.dtypes import Timestamp
from pandera.typing import DataFrame, Series

class GeoLiftDataFrameSchema(pa.SchemaModel):
    """GeoLift data frame schema. 
    
    This is the schema of a data frame after processed by `geo_data_read`.
    """
    
    location: Series[str] = pa.Field()
    time: Series[int] = pa.Field()
    Y: Series[float] = pa.Field()
    date_unix: Optional[Series[float]] = pa.Field()
    
    class Config:
        strict = False

def geo_data_read(data: pd.DataFrame,
                  date_id: str = "date",
                  location_id: str = "location",
                  Y_id: str = "units",
                  format: str = "mm/dd/yyy",
                  X: List[str] = [],
                  summary: bool = False,
                  keep_unix_time: bool = False) -> DataFrame[GeoLiftDataFrameSchema]:
    with localconverter(ro.default_converter + pandas2ri.converter):
        df = geo_lift.GeoDataRead(data, 
                                date_id = date_id,
                                location_id = location_id,
                                Y_id = Y_id,
                                format = format,
                                X = X,
                                summary = summary,
                                keep_unix_time = keep_unix_time)
    return df

geo_test_data_pre_test =  geo_data_read(geo_lift_pre_test,
                                        date_id = "date",
                                        location_id = "location",
                                        Y_id = "Y",
                                        X = [], #empty list as we have no covariates
                                        format = "yyyy-mm-dd",
                                        summary = True)
                                


R[write to console]: ##################################
#####       Summary       #####
##################################

* Raw Number of Locations: 40
* Time Periods: 90
* Final Number of Locations (Complete): 40



In [10]:
geo_test_data_pre_test.head()

Unnamed: 0,location,time,Y
1,atlanta,1,3384
2,atlanta,2,3904
3,atlanta,3,5734
4,atlanta,4,4311
5,atlanta,5,3686


**TODO:** Cannot figure out how to get this ggplot object to work in python.

In [169]:
import math, datetime
import rpy2.robjects.lib.ggplot2 as ggplot2
from rpy2.robjects.packages import importr
with localconverter(ro.default_converter + pandas2ri.converter):
    r_from_pd_df = ro.conversion.py2rpy(geo_test_data_pre_test)

from rpy2.robjects.conversion import Converter
import rpy2.rinterface as ri
from rpy2 import robjects

# with localconverter(ro.default_converter + pandas2ri.converter):
def r_df_to_pandas(df):
    with localconverter(ro.default_converter + pandas2ri.converter):
        return ro.conversion.rpy2py(df)

from typing import Union
from rpy2.robjects.vectors import BoolVector, IntVector, FloatVector, StrVector



utils = importr('utils')
base = importr('base')

def vector_to_scalar(x: Union[BoolVector, IntVector, FloatVector, StrVector]) -> Optional[Union[bool, int, float, str]]:
    if not isinstance(x, (BoolVector, IntVector, FloatVector, StrVector)):
        raise ValueError("Must be an R object of classes: BoolVector, IntVector, FloatVector, StrVector")
    if len(x) == 0:
        return None
    if len(x) != 1:
        raise ValueError
    if isinstance(x, BoolVector):
        return bool(x[0])
    if isinstance(x, IntVector):
        return int(x[0])
    if isinstance(x, FloatVector):
        return float(x[0])
    if isinstance(x, StrVector):
        return str(x[0])
    

from rpy2.robjects import ListVector
import pandas as pd
import numpy as np
from typing import Dict, Any, Union

from rpy2.robjects.conversion import NameClassMap

from functools import cache


class AugSynth(ListVector):
    
    @property
    def data(self) -> Dict[str, Union[np.array, Dict[str, np.array]]]:
        res = {}
        for k in ("X", "trt", "y", "time"):
            res[k] = np.asarray(self.rx2("data").rx2(k))
        res["synth_data"] = {}
        for k in ("Z0", "Z1", "Y0plot", "Y1plot", "X0", "X1"):
            res["synth_data"][k] = np.asarray(self.rx2("data").rx2("synth_data").rx2(k))
        return res

    @property
    @cache
    def weights(self) -> np.array:
        return np.asarray(self.rx2("weights"))

    @property
    @cache
    def l2_imbalance(self) -> float:
        return vector_to_scalar(self.rx2("l2_imbalance"))

    @property
    @cache
    def scaled_l2_imbalance(self) -> float:
        return vector_to_scalar(self.rx2("scaled_l2_imbalance"))

    @property
    @cache
    def mhat(self) -> np.array:
        return np.asarray(self.rx2("mhat"))

    @property
    @cache
    def lambda_(self):
        return self.rx2("lambda")
    
    @property
    @cache
    def ridge_mhat(self) -> np.array:
        return np.asarray(self.rx2("ridge_mhat"))
    
    @property
    @cache
    def synw(self) -> np.array:
        return np.asarray(self.rx2("synw"))  

    @property
    @cache
    def lambdas(self):
        return self.rx2("lambdas")

    @property
    @cache
    def lambdas_errors(self):
        return self.rx2("lambdas_errors")
    
    @property
    @cache
    def lambdas_errors_se(self):
        return self.rx2("lambdas_errors_se")
    
    @property
    @cache
    def progfunc(self) -> str:
        return str(vector_to_scalar(self.rx2("progfunc")))
    
    @property
    @cache
    def scm(self) -> bool:
        return bool(vector_to_scalar(self.rx2("scm")))
    
    @property
    @cache
    def fixedeff(self) -> bool:
        return bool(vector_to_scalar(self.rx2("fixedeff")))
    
    @property
    @cache
    def extra_args(self):
        return self.rx2("extra_args")
    
    # @property
    # def call(self) -> str:
    #     # don't know how to easily convert this to string
    #     return self.rx2("call"))
    
    @property
    @cache
    def t_int(self) -> int:
        return int(vector_to_scalar(self.rx2("t_int")))



class AugSynthSummary(ListVector):
    
    @property
    @cache
    def att(self) -> float:
        return r_df_to_pandas(self.rx2("att"))
    
    @property
    @cache
    def average_att(self) -> float:
        return r_df_to_pandas(self.rx2("average_att"))
    
    @property
    @cache
    def alpha(self) -> float:
        return float(vector_to_scalar(self.rx2("alpha")))
    
    @property
    @cache
    def t_int(self) -> int:
        return int(vector_to_scalar(self.rx2("t_int")))
    
    # @property 
    # def call(self):
    #     return self.rx2("call")
    
    @property 
    @cache
    def l2_imbalance(self) -> float:
        return float(vector_to_scalar(self.rx2("l2_imbalance")))
    
    @property 
    @cache
    def bias_est(self) -> bool:
        return bool(vector_to_scalar(self.rx2("bias_est")))
    
    @property
    @cache
    def inf_type(self) -> str:
        return str(vector_to_scalar(self.rx2("inf_type")))
 
augsynth_converter = Converter("augsynth conversions")
augsynth_converter.rpy2py_nc_map[ri.ListSexpVector] = NameClassMap(
    ri.ListSexpVector, {'augsynth': AugSynth, 'summary.augsynth': AugSynthSummary})

        
class GeoLiftMarketSelection(robjects.ListVector):
    
    _constructor = ["GeoLiftMarketSelection"]
    _rprint = ["print.GeoLiftMarketSelection"]
    
    @property
    @cache
    def BestMarkets(self) -> pd.DataFrame:
        with localconverter(ro.default_converter + pandas2ri.converter):
            return ro.conversion.py2rpy(self.rx2("BestMarkets"))

    @property
    @cache
    def PowerCurve(self) -> pd.DataFrame:
        with localconverter(ro.default_converter + pandas2ri.converter):
            return ro.conversion.py2rpy(self.rx2("PowerCurve"))
        
    @property
    @cache
    def parameters(self) -> pd.DataFrame:
        converters = {
            'data': r_df_to_pandas,
            'model': vector_to_scalar,
            'cpic': vector_to_scalar,
            'side_of_test': vector_to_scalar,
            'fixed_effects': vector_to_scalar
        }
        params = market_selection.rx2('parameters')
        new_params = {}
        for name, func in converters.items():
            val = params.rx2(name)
            new_params[name] = func(val)
        return new_params


class GeoLift(robjects.ListVector):

    _constructor = ["GeoLift"]
    _rprint = ["print.GeoLift"]
    
    @property
    @cache
    def results(self) -> AugSynth:
        with localconverter(ro.default_converter + augsynth_converter):
            return ro.conversion.rpy2py(self.rx2("results"))

    @property
    @cache
    def inference(self) -> pd.DataFrame:
        return r_df_to_pandas(self.rx2("inference"))
    
    @property
    @cache
    def inference(self) -> pd.DataFrame:
        return r_df_to_pandas(self.rx2("data"))
    
    @property
    @cache
    def y_obs(self) -> np.array:
        return np.asarray(self.rx2("y_obs"))
    
    @property
    @cache
    def y_hat(self) -> np.array:
        return np.asarray(self.rx2("y_hat"))

    @property
    @cache
    def ATT(self) -> np.array:
        return np.asarray(self.rx2("ATT"))

    @property
    @cache
    def ATT_se(self):
        return self.rx2("ATT_se")

    @property
    @cache
    def TreatmentStart(self) -> int:
        return vector_to_scalar(self.rx2("TreatmentStart"))

    @property
    @cache
    def TreatmentEnd(self) -> int:
        return vector_to_scalar(self.rx2("TreatmentEnd"))
    
    @property
    @cache
    def test_id(self):
        return r_df_to_pandas(self.rx2("test_id"))

    @property
    @cache
    def incremental(self) -> float:
        return vector_to_scalar(self.rx2("incremental"))
    
    @property
    @cache
    def Y_id(self) -> str:
        return vector_to_scalar(self.rx2("Y_id"))

    @property
    @cache
    def summary(self) -> AugSynthSummary:
        with localconverter(ro.default_converter + augsynth_converter):
            return ro.conversion.rpy2py(self.rx2("summary"))
            
    @property
    @cache
    def ConfidenceIntervals(self) -> bool:
        return bool(vector_to_scalar(self.rx2("ConfidenceInterval")))
    
    @property
    @cache
    def lower_bound(self) -> float:
        return float(vector_to_scalar(self.rx2("lower_bound")))
    
    @property
    @cache
    def upper_bound(self) -> float:
        return float(vector_to_scalar(self.rx2("lower_bound")))
    
    @property
    @cache
    def df_weights(self) -> pd.DataFrame:
        return r_df_to_pandas(self.rx2("df_weights"))

    @property
    @cache
    def stat_test(self) -> str:
        return str(vector_to_scalar(self.rx2("lower_bound")))


geo_lift_converter = Converter('GeoLiftConverter')
geo_lift_converter.rpy2py_nc_map[ri.ListSexpVector] = NameClassMap(
    ri.ListSexpVector, {'GeoLiftMarketSelection': GeoLiftMarketSelection, 'GeoLift': GeoLift})
geo_lift_converter += augsynth_converter

In [121]:
market_selection = geo_lift.GeoLiftMarketSelection(data = r_from_pd_df,
                                          treatment_periods = base.c(10,15),
                                          N = base.c(2,3,4,5),
                                          Y_id = "Y",
                                          location_id = "location",
                                          time_id = "time",
                                          effect_size = base.seq(-0.25, 0.25, 0.05),
                                          lookback_window = 1, 
                                          include_markets = base.c("chicago"),
                                          exclude_markets = base.c("honolulu"),
                                          cpic = 7.50,
                                          budget = 100000,
                                          alpha = 0.1,
                                          Correlations = True,
                                          fixed_effects = True,
                                          side_of_test = "one_sided")
with localconverter(geo_lift_converter):
    market_selection_2 = ro.conversion.rpy2py(market_selection)

R[write to console]: Setting up cluster.

R[write to console]: Importing functions into cluster.

R[write to console]: 
Deterministic setup with 2 locations in treatment.

R[write to console]: 
Deterministic setup with 3 locations in treatment.

R[write to console]: 
Deterministic setup with 4 locations in treatment.

R[write to console]: 
Deterministic setup with 5 locations in treatment.



  ID                                     location duration EffectSize Power
1  1      atlanta, chicago, las vegas, saint paul       15      -0.05     1
2  2                            chicago, portland       15      -0.05     1
3  3      atlanta, chicago, las vegas, saint paul       10      -0.05     1
4  4       chicago, cincinnati, houston, portland       15      -0.05     1
5  5            chicago, houston, nashville, reno       15      -0.05     1
6  6 atlanta, chicago, cleveland, las vegas, reno       10      -0.05     1
  AvgScaledL2Imbalance Investment    AvgATT Average_MDE ProportionTotal_Y
1            0.5558341   85305.00 -189.8766 -0.05007765        0.08767192
2            0.1738778   32281.87 -140.4179 -0.04898682        0.03306537
3            0.5978398   57760.87 -182.5825 -0.04753797        0.08767192
4            0.1971864   74118.37 -170.0523 -0.05153888        0.07576405
5            0.3321341   75556.12 -161.7403 -0.04825348        0.07816073
6            0.4536741  

In [152]:
import rpy2.rinterface_lib.callbacks
from typing import Callable, Optional, Tuple

class RCallbackContext:
    """Context manager for setting R callbacks.
    
    See https://rpy2.github.io/doc/v3.5.x/html/callbacks.html#console-i-o
    
    Attributes:
        consoleread: Function to use for :py:func:`rpy2.rinterface_lib.callbacks.consoleread`
        consolewrite_print:
        consolewrite_warnerror:
        showmessage: 
        consoleflush:
        yesnocancel:
        showfiles:
        processevents:
        busy: 
        cleanup:
    """
    
    _callbacks = [
        'consoleread',
        'consolewrite_print',
        'consolewrite_warnerror',
        'showmessage',
        'consoleflush',
        'yesnocancel',
        'showfiles',
        'processevents',
        'busy',
        'cleanup'
    ]
    
    def __init__(self, 
                 consoleread: Optional[Callable[[str], str]] = None,
                 consolewrite_print: Optional[Callable[[str], None]] = None,
                 consolewrite_warnerror: Optional[Callable[[str], None]] = None,
                 showmessage: Optional[Callable[[str], None]] = None,
                 consoleflush: Optional[Callable[[], None]] = None,
                 yesnocancel: Optional[Callable[[str], None]] = None,
                 showfiles: Optional[Callable[[Tuple[str, ...], Tuple[str, ...], Optional[str], Optional[str]], None]] = None,
                 processevents: Optional[Callable[[], None]] = None,
                 busy: Optional[Callable[[int], None]] = None,
                 cleanup: Optional[Callable[..., Optional[int]]] = None
                 ):
        self._callback_cache = {}
        # TODO: check validity of these
        self.consoleread = consoleread
        self.consolewrite_print = consolewrite_print
        self.consolewrite_warnerror = consolewrite_warnerror
        self.showmessage = showmessage
        self.consoleflush = consoleflush
        self.yesnocancel = yesnocancel
        self.showfiles = showfiles
        self.processevents = processevents
        self.busy = busy
        self.cleanup = cleanup
        
    def __enter__(self):
        # Cache current values of callbacks
        for cb in self._callbacks:
            self._callback_cache[cb] = getattr(rpy2.rinterface_lib.callbacks, cb)
        # Set any new callbacks
        for cb in self._callbacks:
            setattr(rpy2.rinterface_lib.callbacks, cb, getattr(self, cb))
        return self
    
    def __exit__(self, *excdetails) -> None:
        # Cache current values of callbacks
        for cb in self._callbacks:
            setattr(rpy2.rinterface_lib.callbacks, cb, self._callback_cache[cb])
    
def _console_donothing(s: str) -> None:
    pass

def suppress_output(print = True, warnerror = False) -> RCallbackContext:
    """Create a context to suppress R output.
    
    Args:
        print: Suppress standard output
        warnerror: Suppress warnings and errors
    Returns:    
        An object of `RCallbackContext` to use with a ``with`` statement to 
        suppress R output.
    
    """
    callbacks = {}
    if print:
        callbacks['consolewrite_print'] = _console_donothing
    if warnerror:
        callbacks['consolewrite_warnerror'] = _console_donothing
    return RCallbackContext(**callbacks)

In [122]:
from rpy2.robjects.packages import data as r_package_data
from rpy2.robjects import StrVector
geo_lift = importr("GeoLift")
augsynth = importr("augsynth")
methods = importr("methods")
GeoLift_Test = r_package_data(geo_lift).fetch('GeoLift_Test')["GeoLift_Test"]

geo_test_data_test = geo_lift.GeoDataRead(data = GeoLift_Test,
                                    date_id = "date",
                                    location_id = "location",
                                    Y_id = "Y",
                                    format = "yyyy-mm-dd",
                                    summary = True)

geo_test = geo_lift.GeoLift(Y_id = "Y",
                   data = geo_test_data_test,
                   locations = StrVector(["chicago", "portland"]),
                   treatment_start_time = 91,
                   treatment_end_time = 105)

R[write to console]: ##################################
#####       Summary       #####
##################################

* Raw Number of Locations: 40
* Time Periods: 105
* Final Number of Locations (Complete): 40



In [170]:
with localconverter(ro.default_converter + geo_lift_converter):
    geo_test_2 = ro.conversion.rpy2py(geo_test)

AttributeError: can't set attribute 'results'