In [None]:

data_message = self.carbon_calc_request_pb_proxy.CarbonCalcRequest()

data_message.ParseFromString(msg.value())
if is_invalid_message(data_message):
    self.logger.error("Invalid message received")
    return

start_time = datetime.datetime.fromtimestamp(data_message.start_time.seconds)
end_time = datetime.datetime.fromtimestamp(data_message.end_time.seconds)
with Span(self.logger):
    self.logger.info("Start process carbon calculation for %s between %s and %s", data_message.state,
                     start_time, end_time)
    country = db_client.get_country(data_message.country)
    state = db_client.get_state(data_message.state, country)
    fuel_mix_data = db_client.get_fuel_mix_data(state, start_time, end_time)
    emission_factors = db_client.get_emission_factors(state)
    if not emission_factors:
        self.logger.info("No emission factors found for %s between %s and %s, quit calculation",
                         data_message.state, start_time, end_time)
        return
    fuel_mix_df = pd.DataFrame([(f.timestamp, f.fuel_type_uid, f.value) for f in fuel_mix_data])
    emission_factors_df = pd.DataFrame([(e.fuel_type_uid, e.emission_factor) for e in emission_factors])
    emission_factors_df = emission_factors_df.rename(columns={0: "type", 1: "value"})
    fuel_mix_df = fuel_mix_df.rename(columns={0: "time", 1: "type", 2: "value"})
    fuel_mix_df = pd.pivot_table(fuel_mix_df, values='value', index=['time'], columns=['type'])
    emission_factors_df = pd.pivot_table(emission_factors_df, values='value', columns=['type'])
    fuel_mix_df["carbon_emission"] = 0
    fuel_mix_df["total_production"] = 0
    for column in emission_factors_df.columns:
        if column in fuel_mix_df.columns:
            fuel_mix_df["carbon_emission"] += fuel_mix_df[column].values * emission_factors_df[column].values
            fuel_mix_df["total_production"] += fuel_mix_df[column].values
    fuel_mix_df["carbon_intensity_without_import"] = fuel_mix_df["carbon_emission"] / fuel_mix_df["total_production"]
    db_client.save_carbon_result(state, fuel_mix_df)
    self.logger.info(
        "Saved carbon intensity result for %s between %s and %s", data_message.state,
        start_time, end_time
    )
    self.send_message(data_message.country, data_message.state, start_time, end_time, fuel_mix_df)
    self.producer.flush()

