<a href="https://colab.research.google.com/github/Joewahome/mypackage/blob/master/Spresso_Automation_Script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from constants import (
    post_to_slack,
    file_to_slack,
    dev_redshift,
    wallet_redshift,
    redshift,
    insert_to_postgres,
    bulk_insert,
    delete_to_postgres,
    get_all_files,
    slack_texttable,
)
from common.aws.adapters import SecretsManagerAdapter
from datetime import datetime, timedelta, date
import pandas as pd
from googleapiclient.discovery import build
from google.oauth2.service_account import Credentials
from google.auth.transport.requests import Request
import gspread
import numpy as np
import logging
import io
import pytz

eastern = pytz.timezone("Asia/Dubai")


def get_location(file_name):
    if "66" in file_name:
        return "66"
    elif "67" in file_name:
        return "67"
    elif "68" in file_name:
        return "68"
    elif "69" in file_name:
        return "69"
    elif "70" in file_name:
        return "70"
    elif "80" in file_name:
        return "80"
    elif "81" in file_name:
        return "81"
    elif "82" in file_name:
        return "82"
    elif "83" in file_name:
        return "83"
    elif "84" in file_name:
        return "84"
    elif "85" in file_name:
        return "85"
    elif "86" in file_name:
        return "86"
    elif "87" in file_name:
        return "87"
    elif "88" in file_name:
        return "88"
    elif "89" in file_name:
        return "89"
    elif "90" in file_name:
        return "90"
    elif "91" in file_name:
        return "91"
    elif "92" in file_name:
        return "92"
    elif "93" in file_name:
        return "93"
    elif "94" in file_name:
        return "94"
    else:
        return "other"


json_creds = SecretsManagerAdapter("da/recon/google/service-account-creds").value
creds = Credentials.from_service_account_info(
    json_creds,
    scopes=[
        "https://www.googleapis.com/auth/cloud-platform",
        "https://www.googleapis.com/auth/drive",
        "https://www.googleapis.com/auth/spreadsheets",
        "https://spreadsheets.google.com/feeds",
        "https://www.googleapis.com/auth/drive.file",
    ],
)
drive_service = build("drive", "v3", credentials=creds)
gsheet_service = gspread.authorize(creds)
if not creds.valid:
    creds.refresh(Request())

logger = logging.getLogger()
logger.setLevel(logging.INFO)

slack_channel = "nigeria_reports"
na = ""
tolerance = 1
neg_tolerance = -1


def generate_reconciliation_handler(event, context):
    date_str = event.get("date", None)
    seven_days_ago = date.today() - timedelta(days=7)
    if date_str:
        try:
            yesterday = pd.to_datetime(date_str, format="%Y-%m-%d").date()
            if yesterday < seven_days_ago:
                return {
                    "statusCode": 400,
                    "body": "Invalid date input. Date should be within the last 7 days.",
                }
        except ValueError:
            return {
                "statusCode": 400,
                "body": f"Invalid date format: {date_str}. Expected format is yyyy-mm-dd.",
            }
    else:
        yesterday = date.today() - timedelta(days=1)
    yesterday_date = pd.to_datetime(yesterday, format="%Y-%m-%d")
    smart_assignment = yesterday_date + pd.Timedelta(days=1)
    yesterday_date = yesterday_date.strftime("%Y-%m-%d")
    smart_assignment_date = smart_assignment.strftime("%Y-%m-%d")

    logger.info("Generating new spresso recon")
    logger.info("dropping recon records")
    delete_to_postgres("nigeria.reconciliation_spresso", {"day": yesterday_date})
    logger.info("dropping payout records")
    delete_to_postgres("nigeria.payout", {"reconciliation_date": yesterday_date})

    details = f"S-Presso Daily recon {yesterday} script starts: {datetime.now(eastern).strftime('%Y-%m-%d %H:%M:%S')}"
    post_to_slack(details=details, channel=slack_channel)

    logger.info(f"Fetching active drivers (smart assignment)")
    active_drivers = dev_redshift(
        f"""select drn,pick_drop_date
    from (select
        row_number() over (partition by drn
    order by
        effective_end_date desc) as rnk,
        case when cast((effective_start_date AT TIME ZONE 'UTC' AT TIME ZONE 'Africa/Lagos') as date) = cast('{yesterday_date}' as date) or
        cast((effective_end_date AT TIME ZONE 'UTC' AT TIME ZONE 'Africa/Lagos') as DATE) = cast('{yesterday_date}' as date) then 1 else 0 end as pick_drop_date,
        *
    from
        (
        select
            c.registration_no as plate_number,
            b.drn,
            b.name,
            cast((effective_start_date AT TIME ZONE 'UTC' AT TIME ZONE 'Africa/Lagos') as date) as effective_start_date,
            coalesce(cast((effective_end_date AT TIME ZONE 'UTC' AT TIME ZONE 'Africa/Lagos') as DATE),date'2050-01-01') as effective_end_date,
            c.make,
            c.model,
            c.year,
            c.color,
            mc.name as city,
            c.vin,
            a.description
        from
            moovebackend.driver_drivervehicleassignment a,
            moovebackend.driver_driver b,
            moovebackend.driver_vehicle c,
            moovebackend.markets_city mc
        where
            a.driver_id = b.id
            and a.vehicle_id = c.id
            and c.city_id = mc.id
        order by
            mc.name,
            effective_end_date desc,
            effective_start_date desc) vehicleAssign
    where
        city = 'Lagos'
        and cast((effective_start_date AT TIME ZONE 'UTC' AT TIME ZONE 'Africa/Lagos') as date) <= '{yesterday_date}'
        and cast((effective_end_date AT TIME ZONE 'UTC' AT TIME ZONE 'Africa/Lagos') as DATE) >= '{yesterday_date}') as a
        where rnk = 1
    """
    )

    dd = redshift(
        f"""select drn,driver_name as uber_name,driver_uuid as uber_id,product,vehicle_type from (
    select *,row_number() over (partition by drn order by date asc,product_start_date desc ) as rnk
    from vehicle.data_dump where date >= '{smart_assignment_date}' and city = 'Lagos' and driver_uuid is not null and driver_uuid != '' and LENGTH(driver_uuid) = 36) as a
    where rnk = 1"""
    )

    spresso_recon = pd.merge(active_drivers, dd, how="left", on="drn")
    spresso_recon["day"] = pd.to_datetime(yesterday)
    spresso_recon["week"] = spresso_recon["day"] - pd.to_timedelta(
        spresso_recon["day"].dt.dayofweek, unit="d"
    )
    spresso_recon = spresso_recon[
        spresso_recon["product"] == "UberGo DTO 48 [S-Presso]"
    ]

    try:
        yesterday_uber = datetime.strptime(yesterday, "%Y-%m-%d")
    except Exception as e:
        yesterday_uber = yesterday
    yesterday_string = yesterday_uber.strftime("%b %d")
    product = f"(name contains '{yesterday_string}(66)' or name contains '{yesterday_string}(67)' or name contains '{yesterday_string}(68)' or name contains '{yesterday_string}(69)' or name contains '{yesterday_string}(70)' or name contains '{yesterday_string}(80)'or name contains '{yesterday_string}(81)'or name contains '{yesterday_string}(82)'or name contains '{yesterday_string}(83)'or name contains '{yesterday_string}(84)'or name contains '{yesterday_string}(85)' or name contains '{yesterday_string}(86)'or name contains '{yesterday_string}(87)'or name contains '{yesterday_string}(88)'or name contains '{yesterday_string}(89)'or name contains '{yesterday_string}(90)'or name contains '{yesterday_string}(91)'or name contains '{yesterday_string}(92)'or name contains '{yesterday_string}(93)'or name contains '{yesterday_string}(94)')"
    product_list = [
        66,
        67,
        68,
        69,
        70,
        80,
        81,
        82,
        83,
        84,
        85,
        86,
        87,
        88,
        89,
        90,
        91,
        92,
        93,
        94,
    ]
    weekFolderID = "1BcYdBsK5UQYD6YUD5eOo5-8w_LP3D86l"
    uberGo48SP = "(title contains '(66)' or title contains '(67)' or title contains '(68)' or title contains '(69)' or title contains '(70)' or title contains '(80)'or title contains '(81)'or title contains '(82)'or title contains '(83)'or title contains '(84)'or title contains '(85)' or title contains '(86)'or title contains '(87)'or title contains '(88)'or title contains '(89)'or title contains '(90)'or title contains '(91)'or title contains '(92)'or title contains '(93)'or title contains '(94)')"
    query = (
        "'"
        + weekFolderID
        + "' in parents and  mimeType = 'application/vnd.google-apps.folder'"
    )
    weekFolder = (
        drive_service.files()
        .list(q=query, fields="nextPageToken, files(id, name)")
        .execute()
    )
    weekFolder = weekFolder.get("files", [])

    logger.info(f"Fetching payments uber files")
    file_list = []

    def combine_payments_files(country):
        listframe = []
        for folder in weekFolder:
            if folder["name"] == "Payments":
                query = f"mimeType='text/csv' and trashed=false and {product} and parents in '{folder['id']}' "
                results = (
                    drive_service.files()
                    .list(q=query, fields="nextPageToken, files(id, name)")
                    .execute()
                )
                files = results.get("files", [])
                for file in files:
                    mk1 = file["name"].find("(") + 1
                    mk2 = file["name"].find(")", mk1)
                    subString = file["name"][mk1:mk2]
                    if int(subString) in product_list:
                        file_list.append(file["name"])
                        request = drive_service.files().get_media(fileId=file["id"])
                        file_content = request.execute()
                        df = pd.read_csv(
                            io.StringIO(file_content.decode("utf-8")), delimiter=","
                        )
                        df = df.rename(
                            {
                                "Paid to you : Your earnings": "Net Earnings",
                                "Paid to you : Trip balance : Payouts : Cash collected": "Cash Collected",
                                "Paid to you": "Uber Balance",
                                "Paid to you : Trip balance : Payouts : Cash Collected": "Cash Collected",
                                "Paid to you : Your earnings : Fare": "Net Fares",
                                "Paid to you:Your earnings:Service fee": "Uber Fee",
                                "Paid to you:Your earnings:Service Fee": "Uber Fee",
                            },
                            axis=1,
                        )
                        df["name"] = file["name"]
                        listframe.append(df)
                        row_count = len(df)
                        print(f"{file['name']}: {row_count} rows")
                        if len(df) < 3:
                            post_to_slack(
                                details=f"Payment file alert - one file had no data {file['name']}",
                                channel=slack_channel,
                                title="Uber files",
                            )
        daypaymentsfile = pd.concat(listframe)
        words_to_match = (
            "Diamond Challenge",
            "Quest",
            "Guarantee Earnings",
            "guarantee",
        )
        daypaymentsfile["Uber Incentives"] = daypaymentsfile["Net Earnings"].where(
            daypaymentsfile["Description"].str.contains(
                "|".join(words_to_match), case=False, na=False
            ),
            0,
        )
        daypaymentsfile = daypaymentsfile[
            daypaymentsfile["Driver UUID"] != "00000000-0000-0000-0000-000000000000"
        ]
        daypaymentsfile = (
            daypaymentsfile.groupby(["Driver UUID", "name"])
            .agg(
                {
                    "Net Earnings": "sum",
                    "Cash Collected": ["sum", lambda x: (x < 0).sum()],
                    "Uber Balance": "sum",
                    "Uber Fee": "sum",
                    "Uber Incentives": "sum",
                }
            )
            .reset_index()
        )
        daypaymentsfile.columns = [
            "Driver UUID",
            "name",
            "Net Earnings",
            "Cash Collected",
            "CashTrips",
            "Uber Balance",
            "Uber Fee",
            "Uber Incentives",
        ]
        return daypaymentsfile

    try:
        payment_file = combine_payments_files(uberGo48SP)
        payment_file_col = {
            "Driver UUID": "uber_id",
            "Net Earnings": "net_earnings",
            "Cash Collected": "cash_collected",
            "Uber Balance": "uber_balance",
            "Uber Incentives": "uber_incentive",
        }
        payment_file = payment_file.rename(columns=payment_file_col)
        id_with_file = payment_file[["uber_id", "name"]]
        payment_file = payment_file.drop(columns=["CashTrips", "Uber Fee", "name"])
    except Exception as e:
        post_to_slack(
            details=f"SPresso Payments files failed, reason: {str(e)}",
            channel=slack_channel,
            title="Performance files",
        )

    not_list = id_with_file[~id_with_file["uber_id"].isin(spresso_recon["uber_id"])]
    if len(not_list) > 0:
        logger.info(f"Adding missing uber id found in payments file")
        not_list["uber_id"] = not_list["uber_id"].str.strip()
        uber_id_list = "','".join(not_list["uber_id"])
        missing_drn = redshift(
            f"""select cast('{yesterday}' as date) AS day,
       drn,driver_name as uber_name,driver_uuid as uber_id,product,vehicle_type,0 as pick_drop_date
       from (select ROW_NUMBER() OVER (PARTITION BY drn ORDER BY product_start_date desc, date asc) as rnk,*
       from vehicle.data_dump dd where date >= '{smart_assignment_date}' and product = 'UberGo DTO 48 [S-Presso]'
       and LENGTH(driver_uuid) = 36
       and driver_uuid in ('{uber_id_list}')) as a where rnk = 1 """
        )
        spresso_recon = pd.concat([spresso_recon, missing_drn])
        spresso_recon["day"] = pd.to_datetime(spresso_recon["day"])
        spresso_recon["week"] = spresso_recon["day"] - pd.to_timedelta(
            spresso_recon["day"].dt.dayofweek, unit="d"
        )
        no_record = not_list[~not_list["uber_id"].isin(missing_drn["uber_id"])]

    logger.info("Fetching Payments")
    wallet_upi = wallet_redshift(
        f"""select transaction_date - interval '1' day as day,
    driver_number as drn,
    concat('#',payment_unique_reference) as reference_number,
    amount as bank_statement,
    coalesce(metadata->>'remittance_type','Unknown')  AS remittance_type
     from public.aggregated_transactions
    where country_code = 'NG'
    and transaction_date = '{smart_assignment_date}'
    """
    )

    logger.info("Fetching Missing Payments")
    missing_upi = redshift(
        f"""select transaction_date - interval '1' day as day,
    drn,
    concat('#',reference_number) as reference_number,
    amount as bank_statement,
    'missing' as remittance_type
    from nigeria.missing_payments
    where transaction_date = '{smart_assignment_date}'
    """
    )
    current_upi = pd.concat([wallet_upi, missing_upi])

    logger.info("Fetching Security to remittance payments")
    sec_to_rem_list = gsheet_service.open_by_url(
        "https://docs.google.com/spreadsheets/d/1nnpCEB-FLbx7roSqFPbQzlv3Gt2sJjVR8dq23AfqYOM"
    )
    sec_to_rem_list = sec_to_rem_list.worksheet("Sec-Dep to Remittance")
    attempts = 0
    while attempts < 10:
        try:
            sec_to_rem_list = pd.DataFrame(sec_to_rem_list.get_all_records())
            break
        except Exception as e:
            attempts += 1
            if attempts == 10:
                post_to_slack(
                    details=f"Nigeria Sec-Dep read sheet failed, reason: {str(e)}",
                    channel=slack_channel,
                )
                raise Exception("Operation failed after 10 attempts.")
    sec_to_rem_list = current_upi[
        current_upi["reference_number"].isin(sec_to_rem_list["script_reference"])
    ]

    logger.info("Fetching Remittance to Security payments")
    rem_to_sec_list = gsheet_service.open_by_url(
        "https://docs.google.com/spreadsheets/d/1nnpCEB-FLbx7roSqFPbQzlv3Gt2sJjVR8dq23AfqYOM"
    )
    rem_to_sec_list = rem_to_sec_list.worksheet("Remittance to Sec-Dep")
    attempts = 0
    while attempts < 10:
        try:
            rem_to_sec_list = pd.DataFrame(rem_to_sec_list.get_all_records())
            break
        except Exception as e:
            attempts += 1
            if attempts == 10:
                post_to_slack(
                    details=f"Nigeria Sec-Dep read sheet failed, reason: {str(e)}",
                    channel=slack_channel,
                )
                raise Exception("Operation failed after 10 attempts.")
    rem_to_sec_list = current_upi[
        current_upi["reference_number"].isin(rem_to_sec_list["script_reference"])
    ]

    logger.info("Fetching Final Payments List")
    current_upi = current_upi[current_upi["remittance_type"] != "SECURITY_DEPOSIT"]
    current_upi = pd.concat([current_upi, sec_to_rem_list])
    current_upi = current_upi[
        ~current_upi["reference_number"].isin(rem_to_sec_list["reference_number"])
    ]
    current_upi = current_upi.drop_duplicates(subset="reference_number", keep="first")
    current_upi = (
        current_upi.groupby(["day", "drn"])["bank_statement"].sum().reset_index()
    )

    logger.info("Adding unassigned drivers make a payment")
    if len(current_upi) > 0:
        dd = redshift(
            f"""select drn,driver_name as uber_name,driver_uuid as uber_id,product,vehicle_type from
      (select row_number() over (partition by drn order by date asc,product_start_date desc ) as rnk,*
      from vehicle.data_dump where date >= '{smart_assignment_date}'
      and LENGTH(driver_uuid) = 36 and product = 'UberGo DTO 48 [S-Presso]') dd
      where rnk = 1 """
        )
        merge_upi = pd.merge(current_upi, dd, how="left", on="drn")
        spresso_upi = merge_upi[~merge_upi["uber_id"].isna()]
        if len(spresso_upi) > 0:
            cols = ["drn", "bank_statement"]
            active_spresso_upi = spresso_upi[cols]
            spresso_recon = pd.merge(
                spresso_recon, active_spresso_upi, how="left", on="drn"
            )
            spresso_recon["bank_statement"].fillna(0, inplace=True)
        else:
            pass

        inactive_drn = spresso_upi[~spresso_upi["drn"].isin(spresso_recon["drn"])]
        if len(inactive_drn) > 0:
            cols = ["day", "drn", "bank_statement", "uber_name", "uber_id"]
            trnxs_require_input = inactive_drn[cols]
            # storing inactive drivers in gsheet for ops team
            sh = gsheet_service.open_by_key(
                "1Z28QYhmQv2aAFiwBHq34v4HksqJE2TXaRqRIUwG8JWk"
            ).worksheet("payments_inactive_drn")
            all_values = sh.get_all_values()
            last_row = len([i for i in all_values if i != []])
            trnxs_require_input["day"] = trnxs_require_input["day"].apply(
                lambda x: x.strftime("%Y-%m-%d") if not pd.isna(x) else ""
            )
            trnxs_require_input = trnxs_require_input.replace({np.nan: ""})
            sh.append_rows(trnxs_require_input.values.tolist())
            post_to_slack(
                details=f"Payment uploaded to tab payments_inactive_drn",
                channel=slack_channel,
            )
            # setting fixed pick_drop_date to 2 (for inactive drivers - payment)
            inactive_drn["pick_drop_date"] = 2
            inactive_drn["week"] = inactive_drn["day"] - pd.to_timedelta(
                inactive_drn["day"].dt.dayofweek, unit="d"
            )
            cols = [
                "day",
                "drn",
                "uber_name",
                "uber_id",
                "product",
                "vehicle_type",
                "pick_drop_date",
                "week",
                "bank_statement",
            ]
            inactive_drn = inactive_drn[cols]
            spresso_recon = pd.concat([spresso_recon, inactive_drn])
        else:
            pass

    spresso_recon = pd.merge(spresso_recon, payment_file, how="left", on="uber_id")

    logger.info(f"Fetching performance uber files")
    file_list = []

    def combine_performance_files(country):
        listframe = []
        for folder in weekFolder:
            if folder["name"] == "Performance":
                query = f"mimeType='text/csv' and trashed=false and {product} and parents in '{folder['id']}' "
                results = (
                    drive_service.files()
                    .list(q=query, fields="nextPageToken, files(id, name)")
                    .execute()
                )
                files = results.get("files", [])
                for file in files:
                    mk1 = file["name"].find("(") + 1
                    mk2 = file["name"].find(")", mk1)
                    subString = file["name"][mk1:mk2]
                    if int(subString) in product_list:
                        file_list.append(file["name"])
                        request = drive_service.files().get_media(fileId=file["id"])
                        file_content = request.execute()
                        df = pd.read_csv(
                            io.StringIO(file_content.decode("utf-8")), delimiter=","
                        )
                        df = df.rename(
                            {
                                "Time on trip (days: hours : minutes)": "Time on trip (days : hours : minutes)",
                                "Driver last name": "Driver surname",
                            },
                            axis=1,
                        )
                        listframe.append(df)
                        row_count = len(df)
                        print(f"{file['name']}: {row_count} rows")
                        if len(df) < 3:
                            post_to_slack(
                                details=f"Performance file alert - one file had no data {file['name']}",
                                channel=slack_channel,
                                title="Uber files",
                            )
                    else:
                        pass

        dayperffile = pd.concat(listframe)
        dayperffile["Time online (days : hours: minutes)"] = (
            dayperffile["Time online (days : hours: minutes)"]
            .str.split(":")
            .apply(lambda x: int(x[0]) * 86400 + int(x[1]) * 3600 + int(x[2]) * 60)
        )
        dayperffile["Time online (days : hours: minutes)"] = round(
            pd.to_timedelta(
                dayperffile["Time online (days : hours: minutes)"], unit="s"
            ).dt.total_seconds()
            / 3600,
            2,
        )
        columnslist = [
            "Trips completed",
            "Time online (days : hours: minutes)",
            "Time on trip (days : hours : minutes)",
        ]
        for a in columnslist:
            dayperffile[a] = pd.to_numeric(dayperffile[a], errors="coerce")
            dayperffile = dayperffile.rename(
                {"Trips completed": "PerfTrips completed"}, axis=1
            )
            dayperffile["Name"] = (
                dayperffile["Driver first name"] + " " + dayperffile["Driver surname"]
            )
            dayperffile = (
                dayperffile.groupby(["Driver UUID", "Name"])
                .agg(
                    {
                        "PerfTrips completed": "sum",
                        "Time online (days : hours: minutes)": "sum",
                        "Time on trip (days : hours : minutes)": "sum",
                    }
                )
                .reset_index()
            )
            return dayperffile

    # perf_df = combine_performance_files(ghana_files_pattern)
    try:
        perf_df = combine_performance_files(uberGo48SP)
        perf_df_col = {
            "Driver UUID": "uber_id",
            "Name": "driver_name_perf",
            "PerfTrips completed": "trips",
            "Time online (days : hours: minutes)": "supply_hours",
            "Time on trip (days : hours : minutes)": "col_to_drop",
        }
        perf_df = perf_df.rename(columns=perf_df_col)
    except Exception as e:
        post_to_slack(
            details=f"S-Presso Performance files failed, reason: {str(e)}",
            channel=slack_channel,
            title="Performance files",
        )

    spresso_recon = pd.merge(spresso_recon, perf_df, how="left", on="uber_id")

    logger.info(f"Fetching quality uber files")
    file_list = []

    def combine_driverquality_files(country):
        listframe = []
        for folder in weekFolder:
            if folder["name"] == "Driver Quality":
                query = f"mimeType='text/csv' and trashed=false and {product} and parents in '{folder['id']}' "
                results = (
                    drive_service.files()
                    .list(q=query, fields="nextPageToken, files(id, name)")
                    .execute()
                )
                files = results.get("files", [])
                for file in files:
                    mk1 = file["name"].find("(") + 1
                    mk2 = file["name"].find(")", mk1)
                    subString = file["name"][mk1:mk2]
                    if int(subString) in product_list:
                        file_list.append(file["name"])
                        request = drive_service.files().get_media(fileId=file["id"])
                        file_content = request.execute()
                        df = pd.read_csv(
                            io.StringIO(file_content.decode("utf-8")), delimiter=","
                        )
                        df = df.rename(
                            {"Cancellation rate": "cancellation_rate"}, axis=1
                        )
                        listframe.append(df)
                        row_count = len(df)
                        print(f"{file['name']}: {row_count} rows")
                        if len(df) < 3:
                            post_to_slack(
                                details=f"Driver Quality file alert - one file had no data {file['name']}",
                                channel=slack_channel,
                                title="Uber files",
                            )
        daydrvqualityfile = pd.concat(listframe)
        cancellationrate = (
            daydrvqualityfile["cancellation_rate"]
            .astype(str)
            .str.split(" ", n=0, expand=True)
        )
        daydrvqualityfile["cancellation_rate"] = cancellationrate[0].fillna(0)
        daydrvqualityfile["cancellation_rate"] = pd.to_numeric(
            daydrvqualityfile["cancellation_rate"], errors="coerce"
        )
        daydrvqualityfile = (
            daydrvqualityfile.groupby(["Driver UUID"])
            .agg({"cancellation_rate": "mean"})
            .reset_index()
        )
        return daydrvqualityfile

    try:
        quality_file = combine_driverquality_files(uberGo48SP)
        quality_file_col = {"Driver UUID": "uber_id"}
        quality_file = quality_file.rename(columns=quality_file_col)
    except Exception as e:
        post_to_slack(
            details=f"SPresso Driver Quality files failed, reason: {str(e)}",
            channel=slack_channel,
            title="Performance files",
        )

    spresso_recon = pd.merge(spresso_recon, quality_file, how="left", on="uber_id")
    spresso_recon.fillna(0, inplace=True)

    logger.info("Fetching days off")
    recon_days_off = redshift(
        f"""select drn,reconciliation_date as day,count(drn) as current_day_off from nigeria.days_off
    where reconciliation_date = '{yesterday}' and product = 's-presso'
    group by 1,2"""
    )
    recon_days_off["day"] = pd.to_datetime(recon_days_off["day"])
    spresso_recon = pd.merge(
        spresso_recon, recon_days_off, how="left", on=["drn", "day"]
    )
    spresso_recon["current_day_off"].fillna(0, inplace=True)
    spresso_recon["assigned_day"] = 1
    # changing day off if drivers completed more than 5 trips
    spresso_recon.loc[spresso_recon["trips"] > 5, "current_day_off"] = 0
    spresso_recon["effective_days"] = spresso_recon["assigned_day"] - (
        spresso_recon["current_day_off"] + spresso_recon["pick_drop_date"]
    )
    spresso_recon["effective_days"] = np.maximum(spresso_recon["effective_days"], 0)

    dayoff_unassigned = recon_days_off[
        ~recon_days_off["drn"].isin(spresso_recon["drn"])
    ]
    if len(dayoff_unassigned) > 0:
        col = ["drn"]
        dayoff_unassigned_list = dayoff_unassigned[col]
        dayoff_unassigned_list
        table_string = slack_texttable(dayoff_unassigned_list, ["t"])
        post_to_slack(
            details=table_string,
            channel=slack_channel,
            title="day off for unassigned DRN!",
        )

    high_supply_hour = perf_df[perf_df["supply_hours"] >= 24]
    if len(high_supply_hour) > 0:
        col = ["uber_id"]
        high_supply_hour_list = high_supply_hour[col]
        table_string = slack_texttable(high_supply_hour_list, ["t"])
        post_to_slack(
            details=table_string,
            channel=slack_channel,
            title="Uber id exceed 24 hours in uber files!",
        )

    spresso_recon = spresso_recon.drop(columns=["driver_name_perf", "col_to_drop"])
    spresso_recon["trips"].replace("", 0, inplace=True)
    spresso_recon["trips"].fillna(0, inplace=True)
    spresso_recon["supply_hours"].replace("", 0, inplace=True)
    spresso_recon["supply_hours"].fillna(0, inplace=True)

    id_have_trip_off = spresso_recon[
        (spresso_recon["current_day_off"] == 1) & (spresso_recon["trips"] > 0)
    ]

    if len(id_have_trip_off) > 0:
        col = ["drn"]
        id_have_trip_off_list = id_have_trip_off[col]
        table_string = slack_texttable(id_have_trip_off_list, ["t"])
        post_to_slack(
            details=table_string,
            channel=slack_channel,
            title="DRN with completed trips but flagged as off day!",
        )

    spresso_recon["cash_collected"] = np.where(
        spresso_recon["cash_collected"] < 0,
        np.abs(spresso_recon["cash_collected"]),
        spresso_recon["cash_collected"],
    )
    spresso_recon["va_amount"] = 0
    payouts = redshift(
        f"""select reconciliation_date as day,drn,payable as payouts from nigeria.payout where reconciliation_date = '{yesterday}' and status = 'Approved' and product = 's-presso' """
    )
    payouts["day"] = pd.to_datetime(payouts["day"])
    spresso_recon = pd.merge(
        spresso_recon,
        payouts,
        how="left",
        left_on=["day", "drn"],
        right_on=["day", "drn"],
    )
    spresso_recon["payouts"].replace("", 0, inplace=True)
    spresso_recon["payouts"].fillna(0, inplace=True)

    recon_product = redshift(
        """select product_name as product,daily_remittance as daily_remittance_charge from nigeria.product """
    )
    spresso_recon = pd.merge(spresso_recon, recon_product, how="left", on="product")
    yesterday_varchar = yesterday_uber.strftime("%A")
    if yesterday_varchar == "Sunday":
        spresso_recon["daily_remittance"] = 0
    else:
        spresso_recon["daily_remittance"] = np.where(
            spresso_recon["effective_days"] == 0,
            0,
            spresso_recon["effective_days"] * spresso_recon["daily_remittance_charge"],
        )

    logger.info("Fetching active Incentive programs")
    incentive_check = redshift(
        f"""select *,
    CASE WHEN cast('{yesterday_date}' as date) >= start_date and cast('{yesterday_date}' as date) <= end_date then 1 else 0 end as validation
    from nigeria.incentive_programs
    where product = 'UberGo DTO 48 [S-Presso]' """
    )

    # Filter the dataframe for care_incentive
    care_incentive_df = incentive_check[incentive_check["program"] == "care_incentive"]
    # Initialize care_incentive to 0
    spresso_recon["care_incentive"] = 0
    # If validation is set for any entry
    if care_incentive_df["validation"].any() == 1:
        # If yesterday was Sunday, set care_incentive to 0
        if yesterday_varchar == "Sunday":
            pass  # care_incentive remains 0
        else:
            applied = False  # Flag to indicate if any incentive has been applied
            # Iterate over each row in care_incentive_df and apply the condition
            for _, row in care_incentive_df.iterrows():
                condition = (
                    (spresso_recon["supply_hours"] >= row["supply_hours"])
                    & (spresso_recon["trips"] >= row["trips"])
                    & (spresso_recon["net_earnings"] >= row["net_earnings"])
                )
                # If the condition applies to any row and no previous incentive has been applied
                if condition.any() and not applied:
                    spresso_recon.loc[condition, "care_incentive"] = row["incentives"]
                    applied = True  # Set the flag to True
                    break  # Exit the loop as the first incentive has been applied

    fuel_incentive_df = incentive_check[incentive_check["program"] == "fuel_incentive"]
    spresso_recon["fuel_incentive"] = 0
    if fuel_incentive_df["validation"].any() == 1:
        if yesterday_varchar == "Sunday":
            pass
        else:
            applied = False
            for _, row in fuel_incentive_df.iterrows():
                condition = (
                    (spresso_recon["supply_hours"] >= row["supply_hours"])
                    & (spresso_recon["trips"] >= row["trips"])
                    & (spresso_recon["net_earnings"] >= row["net_earnings"])
                )
                if condition.any() and not applied:
                    spresso_recon.loc[condition, "fuel_incentive"] = row["incentives"]
                    applied = True
                    break

    spresso_recon["incentives"] = (
        spresso_recon["care_incentive"] + spresso_recon["fuel_incentive"]
    )

    logger.info("Fetching extra charges")
    prev_ec = redshift(
        f"""with ec as (select drn,sum(total_amount) as total_extra_charges, sum(daily_deduction) as total_daily_charges from nigeria.extra_charge group by 1),
    recon as (SELECT drn,sum(extra_charges) as total_deduction_recon
    FROM nigeria.reconciliation_spresso where day < '{yesterday}' group by 1),
    data as (select ec.drn,ec.total_extra_charges,
    coalesce(r.total_deduction_recon,0) as total_deduction_recon,
    total_daily_charges,
    coalesce((total_extra_charges-total_deduction_recon),0) as deduction_different
    from ec
    left join recon r on ec.drn = r.drn)
    select drn, case when deduction_different <= 0 then 0
    when deduction_different > 0 and deduction_different > total_daily_charges then total_daily_charges
    when deduction_different > 0 and deduction_different <= total_daily_charges then deduction_different
    else 0 end as extra_charges from data  """
    )

    spresso_recon = pd.merge(spresso_recon, prev_ec, how="left", on="drn")
    spresso_recon["extra_charges"].replace("", 0, inplace=True)
    spresso_recon["extra_charges"].fillna(0, inplace=True)
    spresso_recon.loc[spresso_recon["trips"] == 0, "extra_charges"] = 0

    logger.info("Fetching prev outstanding")
    prev_outstanding = redshift(
        f"""select drn,sum(outstanding) as prev_outstanding
    from nigeria.reconciliation_spresso
    where day < '{yesterday}'
    group by 1"""
    )
    spresso_recon = pd.merge(spresso_recon, prev_outstanding, how="left", on="drn")
    spresso_recon["prev_outstanding"].replace("", 0, inplace=True)
    spresso_recon["prev_outstanding"].fillna(0, inplace=True)

    logger.info("Fetching Adjustments")
    recon_adjustments = redshift(
        """with adjs as (select * from nigeria.adjustments where product = 'UberGo DTO 48 [S-Presso]'),
    amount as (select reconciliation_date,drn,sum(adjustment) as adjustment_amount from adjs group by 1,2),
    adj_desc as (select * from
    (select reconciliation_date,drn,adjustment_description,adjustment_type,row_number() over (partition by reconciliation_date,drn order by reconciliation_date asc) as rnk from adjs) as a
    where rnk = 1)
    select a.reconciliation_date as day,a.drn,a.adjustment_amount,ad.adjustment_description ,ad.adjustment_type from amount a
    left join adj_desc ad on a.reconciliation_date = ad.reconciliation_date and a.drn = ad.drn"""
    )
    recon_adjustments["day"] = pd.to_datetime(recon_adjustments["day"])

    spresso_recon = pd.merge(
        spresso_recon,
        recon_adjustments,
        how="left",
        left_on=["day", "drn"],
        right_on=["day", "drn"],
    )
    spresso_recon["adjustment_amount"].fillna(0, inplace=True)
    spresso_recon["adjustment_description"].fillna("", inplace=True)
    spresso_recon["adjustment_type"].fillna("", inplace=True)

    logger.info("Calculating rest headers")
    spresso_recon["moove_balance"] = (
        spresso_recon["uber_balance"]
        + spresso_recon["va_amount"]
        + spresso_recon["bank_statement"]
        + spresso_recon["incentives"]
    ) - spresso_recon["payouts"]
    spresso_recon["outstanding"] = (
        spresso_recon["daily_remittance"]
        + spresso_recon["extra_charges"]
        + spresso_recon["adjustment_amount"]
    ) - spresso_recon["moove_balance"]
    spresso_recon["amount_due"] = (
        spresso_recon["daily_remittance"]
        + spresso_recon["extra_charges"]
        + spresso_recon["prev_outstanding"]
        + spresso_recon["adjustment_amount"]
    )
    spresso_recon["cum_outstanding"] = (
        spresso_recon["amount_due"] - spresso_recon["moove_balance"]
    )
    spresso_recon["payables"] = abs(
        spresso_recon["cum_outstanding"].apply(lambda x: np.where(x < -500, x, 0))
    )
    spresso_recon["partner_payment"] = 0
    spresso_recon["month"] = spresso_recon["day"].dt.strftime("%m")
    spresso_recon["year"] = spresso_recon["day"].dt.strftime("%Y")
    current_time = datetime.now()
    spresso_recon["updated_at"] = current_time
    spresso_recon["current_day_off"] = spresso_recon["current_day_off"].astype(int)
    spresso_recon["effective_days"] = spresso_recon["effective_days"].astype(int)
    spresso_recon["trips"] = spresso_recon["trips"].astype(int)

    logger.info("Ordering headers")
    recon_cols = [
        "week",
        "day",
        "drn",
        "uber_name",
        "uber_id",
        "product",
        "vehicle_type",
        "assigned_day",
        "current_day_off",
        "pick_drop_date",
        "effective_days",
        "trips",
        "supply_hours",
        "cancellation_rate",
        "net_earnings",
        "cash_collected",
        "uber_balance",
        "va_amount",
        "bank_statement",
        "incentives",
        "payouts",
        "moove_balance",
        "daily_remittance",
        "extra_charges",
        "prev_outstanding",
        "amount_due",
        "outstanding",
        "cum_outstanding",
        "payables",
        "partner_payment",
        "uber_incentive",
        "adjustment_amount",
        "fuel_incentive",
        "care_incentive",
        "month",
        "year",
        "updated_at",
        "adjustment_description",
        "adjustment_type",
    ]
    spresso_recon = spresso_recon[recon_cols]

    logger.info("Inserting payable records")
    payable_list = spresso_recon[spresso_recon["payables"] > 0]
    payable_list = payable_list[["day", "drn", "payables", "product"]]
    payable_list["status"] = "Rejected"
    payable_list["product"] = "s-presso"
    col = {"day": "reconciliation_date", "payables": "payable"}
    payable_list = payable_list.rename(columns=col)
    bulk_insert("payout", payable_list, "nigeria")

    logger.info("Insert payable to sheet for ops team")
    payout_to_sheet = redshift(
        """select * from nigeria.payout where product = 's-presso' order by reconciliation_date asc """
    )
    payout_to_sheet["reconciliation_date"] = pd.to_datetime(
        payout_to_sheet["reconciliation_date"], errors="coerce"
    )
    payout_to_sheet["reconciliation_date"] = payout_to_sheet[
        "reconciliation_date"
    ].dt.strftime("%d/%m/%Y")
    moove = []
    sh = gsheet_service.open_by_key("1Z28QYhmQv2aAFiwBHq34v4HksqJE2TXaRqRIUwG8JWk")
    moove = sh.worksheet("payout")
    num_rows = len(moove.get_all_values())
    moove.update(
        "A2:ZZ", [["" for i in range(moove.col_count)] for j in range(num_rows - 1)]
    )
    new_data = payout_to_sheet.values.tolist()
    moove.update("A2", new_data)

    logger.info("Inserting Recon records into db")
    bulk_insert("reconciliation_spresso", spresso_recon, "nigeria")

    logger.info("Sending recon records on slack channel")
    yesterday_datetime = datetime.strptime(yesterday_date, "%Y-%m-%d")
    weekdate_string = yesterday_datetime.strftime("%b %d")
    data_to_slack = redshift(
        f"""select * from nigeria.reconciliation_spresso where day >= now() - interval '7' day order by day desc"""
    )
    file_to_slack(
        data_to_slack,
        slack_channel,
        "Daily S-Presso Recon {}.xlsx".format(weekdate_string),
    )

    output_data = {}
    if "date" in event:
        output_data["date"] = event["date"]

    return output_data
