# Parameters
- `connection_string` : 'postgresql://postgres:postgres@localhost:5432/local'   -> Postgresql URL connection string
- `delay` :                48                                                   -> end_date is 48 hours from today
- `frame`:                 6                                                    -> start_date is 48 + 6 hours from today
- `update_carpool_status`: 'True'                                               -> If carpools should be updated or not

In [None]:
import datetime
import os
from datetime import timedelta
from datetime import datetime

import pandas as pd
from sqlalchemy import create_engine, text
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import insert


# Input params checks
update_carpool_status = os.environ['UPDATE_CARPOOL_STATUS'] == "true" or False
connection_string = os.environ['PG_CONNECTION_STRING']
delay = 24
frame = 24

start_date_frame = datetime.now() - timedelta(hours=int(delay)) - timedelta(hours=int(frame))
end_date_frame = datetime.now() - timedelta(hours=int(frame))
print(f"processing carpools between {start_date_frame} and {end_date_frame} ")

In [None]:
engine = create_engine(connection_string, connect_args={'sslmode':'require'})

query = f"""(
  SELECT
    CC._ID,
    CASE
      WHEN DRIVER_PHONE IS NOT NULL THEN SUBSTR(DRIVER_PHONE, 1, 10)
      ELSE CC.DRIVER_PHONE_TRUNC
    END AS PHONE_TRUNC,
    CC.DRIVER_IDENTITY_KEY AS IDENTITY_KEY,
    CC.DRIVER_OPERATOR_USER_ID AS OPERATOR_USER_ID,
    CC.OPERATOR_TRIP_ID,
    CC.START_DATETIME,
    EXTRACT(
      EPOCH
      FROM
        (CC.END_DATETIME - CC.START_DATETIME)
    )::INT AS DURATION,
    CC.END_DATETIME,
    CC.OPERATOR_ID,
    TRUE AS IS_DRIVER,
    CASE
      WHEN PASSENGER_PHONE IS NOT NULL THEN SUBSTR(PASSENGER_PHONE, 1, 10)
      ELSE CC.PASSENGER_PHONE_TRUNC
    END AS OTHER_PHONE_TRUNC,
    CC.PASSENGER_IDENTITY_KEY AS OTHER_IDENTITY_KEY,
    CC.PASSENGER_OPERATOR_USER_ID AS OTHER_OPERATOR_USER_ID
  FROM
    CARPOOL_V2.CARPOOLS CC
  WHERE
    CC.START_DATETIME >=  NOW() - '{delay} hours'::interval - '{frame} hours'::interval
    AND CC.START_DATETIME < NOW() - '{delay} hours'::interval
    AND CC.OPERATOR_ID != 11
)
UNION ALL
(
  SELECT
    CC._ID,
    CASE
      WHEN PASSENGER_PHONE IS NOT NULL THEN SUBSTR(PASSENGER_PHONE, 1, 10)
      ELSE CC.PASSENGER_PHONE_TRUNC
    END AS PHONE_TRUNC,
    CC.PASSENGER_IDENTITY_KEY AS IDENTITY_KEY,
    CC.OPERATOR_TRIP_ID,
    CC.PASSENGER_OPERATOR_USER_ID AS OPERATOR_USER_ID,
    CC.START_DATETIME,
    EXTRACT(
      EPOCH
      FROM
        (CC.END_DATETIME - CC.START_DATETIME)
    )::INT AS DURATION,
    CC.END_DATETIME,
    CC.OPERATOR_ID,
    FALSE::BOOLEAN AS IS_DRIVER,
    CASE
      WHEN DRIVER_PHONE IS NOT NULL THEN SUBSTR(DRIVER_PHONE, 1, 10)
      ELSE CC.DRIVER_PHONE_TRUNC
    END AS OTHER_PHONE_TRUNC,
    CC.DRIVER_IDENTITY_KEY AS OTHER_IDENTITY_KEY,
    CC.DRIVER_OPERATOR_USER_ID AS OTHER_OPERATOR_USER_ID
  FROM
    CARPOOL_V2.CARPOOLS CC
  WHERE
    CC.START_DATETIME >=  NOW() - '{delay} hours'::interval - '{frame} hours'::interval
    AND CC.START_DATETIME < NOW() - '{delay} hours'::interval
    AND CC.OPERATOR_ID != 11
)
"""

with engine.connect() as conn:
    df_carpool = pd.read_sql_query(text(query), conn)

In [None]:
# Extract the date from start_datetime
df_carpool['date'] = df_carpool['start_datetime'].dt.date

In [None]:
df_carpool_agg = df_carpool.groupby(['identity_key', 'date']).agg(
    unique_operator_trip_id=('operator_trip_id', 'nunique'),
    unique_operator_id=('operator_id', 'nunique'),
    carpool_id_list=('_id', list)).reset_index()

In [None]:
multi_op_mask = df_carpool_agg['unique_operator_id'] > 1
more_than_four_trip_mask = df_carpool_agg['unique_operator_trip_id'] > 4

df_carpool_target = df_carpool_agg[(multi_op_mask) & (more_than_four_trip_mask)]

In [None]:
df_carpool_target = df_carpool_target.explode('carpool_id_list').rename(columns={'carpool_id_list': '_id'})

In [None]:
df_carpool_target_merged = pd.merge(df_carpool_target, df_carpool, on=['_id', 'identity_key'], how = 'left').sort_values(by=['identity_key', 'start_datetime'])

In [None]:
def get_every_trip_after_4_trip_id_occ(df):
   df_grouped_by_trip = df.groupby('operator_trip_id').agg(
    start_datetime=('start_datetime', 'first'),
    _id=('_id', 'first')
).reset_index().sort_values(by=['start_datetime'])
   return df_grouped_by_trip.iloc[4:]

df_final_result = df_carpool_target_merged.groupby(['identity_key']).apply(lambda x: get_every_trip_after_4_trip_id_occ(x))

Mise à jour des carpools retenus en status `fraudcheck_error`

In [None]:

# Update de carpool_v2 schema for failed status (i.e fraud detected)
if update_carpool_status is True:

    metadata = sa.MetaData(schema='carpool_v2')
    metadata.reflect(bind=engine)

    table = metadata.tables['carpool_v2.status']
    
    where_clause = table.c.carpool_id.in_(df_final_result['_id'].to_list())

    update_stmt = sa.update(table).where(where_clause).values(fraud_status='failed')

    with engine.connect() as conn:
        result = conn.execute(update_stmt)
        print(f"{result.rowcount} carpools status updated to fraud_status=failed")
        conn.commit()

Ajout des labels dans une table.
C'est cette table qui est utilisée pour renvoyer l'information du type de fraude aux opérateurs

In [None]:
df_labels = pd.DataFrame(df_final_result['_id'])
df_labels.columns = ['carpool_id']
df_labels = df_labels.assign(label='interoperator_too_many_trips_by_day')

In [None]:
def insert_or_do_nothing_on_conflict(table, conn, keys, data_iter):
    insert_stmt = insert(table.table).values(list(data_iter))
    on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['carpool_id', 'label'])
    conn.execute(on_duplicate_key_stmt)

df_labels.to_sql(
    name="labels",
    schema="fraudcheck",
    con=engine,
    if_exists="append",
    index=False,
    method=insert_or_do_nothing_on_conflict
)