## Mock script for when lsst.dirac is down

In [1]:
import numpy as np

In [3]:
# Mock Dust_values class
class Dust_values:
    def __init__(self):
        self.Ax1 = {'u': 0.1, 'g': 0.08, 'r': 0.06, 'i': 0.05, 'z': 0.03, 'y': 0.02}

# Mock light curve interpolation class
class MockLightCurves:
    def __init__(self):
        pass
    
    def interp(self, t, filtername, lc_indx=0):
        """Return a mock light curve value."""
        return -15.0 + 0.1 * t  # Simple linear interpolation for testing.

# Mock UserPointsSlicer class
class UserPointsSlicer:
    def __init__(self, ra, dec, latLonDeg=True, badval=0):
        self.slicePoints = {}

# Mock uniformSphere function
def uniformSphere(n, seed=42):
    """Generate mock RA/Dec points uniformly distributed."""
    np.random.seed(seed)
    ra = np.random.uniform(0, 360, n)
    dec = np.random.uniform(-90, 90, n)
    return ra, dec

# Mock signal-to-noise ratio function
def m52snr(mags, m5):
    """Mock signal-to-noise ratio."""
    return 10 ** ((m5 - mags) / 2.5)

# Mock data slice
def create_mock_data_slice(n_points=100):
    """Generate a mock data slice for testing."""
    return {
        'observationStartMJD': np.linspace(0, 100, n_points),
        'fiveSigmaDepth': np.random.uniform(20, 24, n_points),
        'filter': np.random.choice(['u', 'g', 'r', 'i', 'z', 'y'], n_points),
        'night': np.random.randint(1, 10, n_points)
    }

# Define FastTransientMetric class
class FastTransientMetric:
    def __init__(self, metricName='FastTransientMetric', mjdCol='observationStartMJD',
                 m5Col='fiveSigmaDepth', filterCol='filter', nightCol='night',
                 ptsNeeded=2, riseRates=None, fadeRates=None, peakMagRange=None,
                 durationAtPeak=None, mjd0=59853.5, outputLc=False):
        """
        Metric to test the detection of fast-evolving transients.
        """
        self.mjdCol = mjdCol
        self.m5Col = m5Col
        self.filterCol = filterCol
        self.nightCol = nightCol
        self.ptsNeeded = ptsNeeded
        self.outputLc = outputLc

        self.riseRates = riseRates
        self.fadeRates = fadeRates
        self.peakMagRange = peakMagRange
        self.durationAtPeak = durationAtPeak

        self.lightcurves = MockLightCurves()
        self.mjd0 = mjd0

        dust_properties = Dust_values()
        self.Ax1 = dust_properties.Ax1

    def _multi_detect(self, around_peak):
        """Simple detection criteria: detect at least a certain number of times."""
        if np.size(around_peak) < self.ptsNeeded:
            return 0
        return 1

    def _check_rise_fade_rate(self, around_peak, mags, t, filters):
        """
        Check if the rise and fade rates match the expected thresholds for the transient.
        """
        for f in set(filters):
            times_f = t[around_peak][np.where(filters == f)[0]]
            mags_f = mags[around_peak][np.where(filters == f)[0]]

            if len(mags_f) < 2:
                continue

            dt = np.max(times_f) - np.min(times_f)
            dm = np.max(mags_f) - np.min(mags_f)
            rate = dm / dt

            if (self.riseRates[f][0] <= rate <= self.riseRates[f][1]) or (self.fadeRates[f][0] <= rate <= self.fadeRates[f][1]):
                return 1
        return 0

    def run(self, dataSlice, slicePoint=None):
        result = {}
        t = dataSlice[self.mjdCol] - self.mjd0 - slicePoint['peak_time']
        mags = np.zeros(t.size, dtype=float)

        for filtername in np.unique(dataSlice[self.filterCol]):
            infilt = np.where(dataSlice[self.filterCol] == filtername)
            mags[infilt] = self.lightcurves.interp(t[infilt], filtername)
            A_x = self.Ax1[filtername] * slicePoint['ebv']
            mags[infilt] += A_x

            distmod = 5 * np.log10(slicePoint['distance'] * 1e6) - 5.0
            mags[infilt] += distmod

        around_peak = np.where((t > 0) & (t < 30) & (mags < dataSlice[self.m5Col]))[0]
        filters = dataSlice[self.filterCol][around_peak]

        result['multi_detect'] = self._multi_detect(around_peak)
        result['rate_check'] = self._check_rise_fade_rate(around_peak, mags, t, filters)

        return result

In [5]:
# Test the Metric
if __name__ == "__main__":
    # Define parameter ranges
    riseRates = {'u': (0.25, 4.75), 'g': (0.25, 4.75), 'r': (0.5, 4.75), 
                 'i': (0.5, 4.75), 'z': (0.5, 4.5), 'y': (0.25, 4.75)}

    fadeRates = {'u': (0.5, 1.8), 'g': (0.5, 1.6), 'r': (0.4, 1.2), 
                 'i': (0.3, 0.9), 'z': (0.2, 0.8), 'y': (0.2, 0.7)}

    peakMagRange = {'u': (-15.5, -13.5), 'g': (-15.5, -14), 'r': (-15.5, -13), 
                    'i': (-16, -13), 'z': (-16, -13.5), 'y': (-16.5, -13.5)}

    durationAtPeak = {'u': 0.1, 'g': 0.3, 'r': 0.7, 'i': 1.0, 'z': 1.5, 'y': 2.0}

    # Generate mock data
    mock_data_slice = create_mock_data_slice()
    slicer = UserPointsSlicer([0], [0])
    slicer.slicePoints['peak_time'] = [50]
    slicer.slicePoints['distance'] = [200]
    slicer.slicePoints['ebv'] = [0.1]

    # Instantiate and run the metric
    metric = FastTransientMetric(riseRates=riseRates, fadeRates=fadeRates, 
                                  peakMagRange=peakMagRange, durationAtPeak=durationAtPeak)
    results = metric.run(mock_data_slice, slicePoint=slicer.slicePoints)
    print("Results:", results)

TypeError: can't multiply sequence by non-int of type 'float'