In [1]:
!pip install isodate




[notice] A new release of pip available: 22.3.1 -> 23.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import papermill as pm
import dateutil.parser
import dateutil
import pandas as pd
import numpy as np
import pymongo
from config import params  # Import parameters from the config file

# Extract parameters for MongoDB connection
mongo_host = params['mongo_host']
mongo_port = params['mongo_port']
db_name = params['db_name']
collection_name = params['collection_name']
last_timestamp_collection_name = params['last_timestamp_collection_name']
cohort_all_collection = params['cohort_all_collection']

# Establish MongoDB connection
client = pymongo.MongoClient(f"mongodb://{mongo_host}:{mongo_port}")
db = client[db_name]

#class processing_data:
#############Sub Functions ################

# Define the update_last_processed_timestamp_to_db function
def update_last_processed_timestamp_to_db(timestamp):
    print('timestamp:', timestamp, "collection:", last_timestamp_collection_name)
    last_timestamp_collection = db[last_timestamp_collection_name].update_one({}, {"$set": {"last_processed_timestamp": timestamp}}, upsert=True)
    print('update output', last_timestamp_collection)
    cursor = db[last_timestamp_collection_name].find({})
    documents = list(cursor)
    cursor.close()

def process_since_last_timestamp(last_timestamp):
    vtime = {'Timestamp': {'$gt': last_timestamp}}
    cursor = db[collection_name].find(vtime)
    documents = list(cursor)
    
    ## karu for time being neeed to check it out cursor.close()
    
    cdc_data = pd.DataFrame(documents)

    if len(cdc_data) == 0:
        print('No records processed...')
    else:
        max_timestamp = cdc_data['Timestamp'].max()
        update_last_processed_timestamp_to_db(max_timestamp)

    return cdc_data

def process_all_records():
    cursor = db[collection_name].find({})
    documents = list(cursor)
    cursor.close()

    cdc_data = pd.DataFrame(documents)

    cdc_data['Timestamp'] = pd.to_datetime(cdc_data['Timestamp'])

    if len(cdc_data) == 0:
        print('No records processed...')
    else:
        max_timestamp = cdc_data['Timestamp'].max()
        print('Timestamp of last processed data:', max_timestamp)
        update_last_processed_timestamp_to_db(max_timestamp)

    return cdc_data

def get_last_processed_timestamp():
    update_last_processed_timestamp_to_db(0)

    last_timestamp_collection = db[last_timestamp_collection_name]
    last_processed_timestamp = last_timestamp_collection.find_one({}).get('last_processed_timestamp')

    print('last_processed_timestamp:', last_processed_timestamp)

    if last_processed_timestamp == 0 or last_processed_timestamp is None or pd.isnull(last_processed_timestamp):
        print('Processing all records')
        cdc_data = process_all_records()
    else:
        if last_processed_timestamp == 0:
            print('Processing all records since it is 0')
            cdc_data = process_all_records()
        else:
            print('Processing cdc records:')
            print(last_processed_timestamp)
            cdc_data = process_since_last_timestamp(last_processed_timestamp)

    client.close()

    return cdc_data

# Call the get_last_processed_timestamp function
fcdc_data = get_last_processed_timestamp()

# Print the resulting DataFrame
print(fcdc_data)

In [None]:
fcdc_data

In [None]:
import pandas as pd
import random

# Your existing functions remain unchanged
def assign_numeric_label(hour):
    if 0 <= hour < 6:
        return '1'
    elif 6 <= hour < 12:
        return '2'
    elif 12 <= hour < 18:
        return '3'
    else:
        return '4'

def find_max_values(df):
    label_cols = ['1', '2', '3', '4']
    
    for col in label_cols:
        if col not in df.columns:
            df[col] = 0  
    
    df['recommended_hr'] = df[label_cols].max(axis=1)
    df['hr_id'] = df[label_cols].idxmax(axis=1)
    max_counts = df[label_cols].eq(df['recommended_hr'], axis=0).sum(axis=1)
    multiple_max = df[max_counts > 1].index
    random_column = random.choice(label_cols)
    return df, multiple_max, random_column

def process_emaildata(vgroup):
    vgroup['Timestamp'] = pd.to_datetime(vgroup['Timestamp'])
    vgroup['DayOfWeek'] = vgroup['Timestamp'].dt.dayofweek 
    vgroup['Timestamp'] = vgroup['Timestamp'].dt.hour.apply(assign_numeric_label)
    
    weekdays = vgroup[vgroup['DayOfWeek'] < 5]
    weekends = vgroup[vgroup['DayOfWeek'] >= 5]
    
    vgroupings = ['Event', 'Holiday','Timestamp']
    weekday_final = pd.concat([weekdays.groupby(['Emailid', col]).size().unstack(fill_value=0) for col in vgroupings], axis=1)
    weekend_final = pd.concat([weekends.groupby(['Emailid', col]).size().unstack(fill_value=0) for col in vgroupings], axis=1)
    
    weekday_final, weekday_multiple_max, random_col_weekday = find_max_values(weekday_final)
    weekend_final, weekend_multiple_max, random_col_weekend = find_max_values(weekend_final)
    
    weekday_final.rename(columns={'recommended_hr': 'w_rec_hr'}, inplace=True)
    weekend_final.rename(columns={'recommended_hr': 'we_rec_hr'}, inplace=True)
    
    return weekday_final, weekend_final, weekday_multiple_max, weekend_multiple_max, random_col_weekday, random_col_weekend

In [None]:
# grouped = df.groupby('Emailid')
grouped = fcdc_data.groupby('Emailid')
weekday_email_data = []
weekend_email_data = []
weekday_multiple_max_values = []
weekend_multiple_max_values = []
weekday_random_cols = []
weekend_random_cols = []

for vname, vgroup in grouped:
    (weekday_processed_data, weekend_processed_data,
     weekday_multiple_max, weekend_multiple_max,
     random_col_weekday, random_col_weekend) = process_emaildata(vgroup)
    
    weekday_email_data.append(weekday_processed_data)
    weekend_email_data.append(weekend_processed_data)
    weekday_multiple_max_values.append(weekday_multiple_max)
    weekend_multiple_max_values.append(weekend_multiple_max)
    weekday_random_cols.append(random_col_weekday)
    weekend_random_cols.append(random_col_weekend)

weekday_result = pd.concat(weekday_email_data)
weekend_result = pd.concat(weekend_email_data)

weekday_result.fillna(0, inplace=True)
weekend_result.fillna(0, inplace=True)

combined_result = pd.concat([weekday_result, weekend_result], axis=1)


combined_result['weekday_cohort_id'] = np.where(combined_result['w_rec_hr'] >= combined_result['we_rec_hr'],combined_result['w_rec_hr'],0)

combined_result['weekend_cohort_id'] = np.where(combined_result['we_rec_hr'] > combined_result['w_rec_hr'],combined_result['we_rec_hr'],0)


equal_scores = combined_result['w_rec_hr'] == combined_result['we_rec_hr']
indices = equal_scores[equal_scores].index

for index in indices:
    if combined_result.loc[index, 'weekday_cohort_id'] == combined_result.loc[index, 'weekend_cohort_id']:
        random_column = np.random.choice(['weekday_cohort_id', 'weekend_cohort_id'])
        value = combined_result.loc[index, random_column]
        combined_result.loc[index, 'weekday_cohort_id'] = value
        combined_result.loc[index, 'weekend_cohort_id'] = 0 if random_column == 'weekday_cohort_id' else value
combined_result.reset_index(inplace=True)
print("Combined Data with 'weekday_cohort_id' and 'weekend_cohort_id' columns:")
print(combined_result)

In [None]:
combined_result

In [None]:
combined_result.columns = combined_result.columns.astype(str)

# Create MongoDB client and connect to the database
mongo_uri = f"mongodb://{mongo_host}:{mongo_port}"
client = pymongo.MongoClient(mongo_uri)
db = client[db_name]
collection = db[cohort_all_collection]

data_to_insert = combined_result.to_dict(orient="records")

data_to_insert = [{str(key): value for key, value in doc.items()} for doc in data_to_insert]

collection.insert_many(data_to_insert)

client.close()

print("Data has been successfully saved to the MongoDB database.")


In [None]:
# combined_result.to_csv('cohort_all_outpute_testing1.csv')