## Ingest raw Select Activity Reports from CIVI
Main purpose for ingesting this dataset is it contains the Source email field (ie the identity of the originator of the CIVI record, ex. ) allowing for QA of entries in CIVI as well as detecting system initiated entries versus Membership Committee entries.

Goal is to store one table that contains all raw records from CIVI that serve as source data for all mem_type and mem_status tables. b

### Workflow proposal
Like consolidated_mem_type, one *consolidated* child table will persist. Upon every CIVI ingest/update, the raw table for the new date range will augment the *consolidated* table. Records in the *consolidated* table having a **start_dt** after the *update* table will be removed, and the revision table will be appended to the *consolidated* table. An sql stored procedure will execute some of the work. Ideally, the stored_procedure is written to accept variables. These variables will be the relevant table names. <p>
**Key** is that I be able to join this dataset to the mem_type and mem_status tables via member email and timestamp.

In [1]:
import os
import pandas as pd
import numpy as np
import re
import datetime
import itertools
import json
import sqlalchemy
from container_credentials import return_credentials

In [2]:
#os.chdir('/home/candela/Documents/greeneHill/membershipReportsCIVI')
os.chdir('/home/mofongo/Documents/ghfc/membershipReportsCIVI/membershipReportingLogicSampleReports')
#/home/candela/Documents/greeneHill/membershipReportsCIVI/membershipReportingLogicSampleReports

In [76]:
activityReport = pd.read_csv('./selectActivityReport_20241013.csv')
activityReport_legacy = pd.read_csv('./selectActivityReport_10072024.csv')

In [77]:
activityReport.columns = [i.replace(' ','_')+'_act' for i in list(activityReport.columns)]
activityReport_legacy.columns = [i.replace(' ','_')+'_act' for i in list(activityReport_legacy.columns)]

In [78]:
#normalize email names
activityReport = activityReport.assign(email_grouping = activityReport.apply(lambda x: x['Target_Email_act'] if pd.notnull(x['Target_Email_act']) else x['Source_Email_act'], axis = 1))

activityReport_legacy = activityReport_legacy.assign(email_grouping = activityReport_legacy.apply(lambda x: x['Target_Email_act'] if pd.notnull(x['Target_Email_act']) else x['Source_Email_act'], axis = 1))

In [79]:
activityReport = activityReport.assign(Activity_Date_DT_act = pd.to_datetime(activityReport['Activity_Date_act'], format = '%Y-%m-%d %H:%M'))
activityReport_legacy = activityReport_legacy.assign(Activity_Date_DT_act = pd.to_datetime(activityReport_legacy['Activity_Date_act'], format = '%Y-%m-%d %H:%M'))

In [80]:
#drop irrelevant columns
try:
    activityReport.drop(['Assignee_Name_act','Activity_Details_act'], axis = 1, inplace = True)
except KeyError:
    print('column not found')


try:
    activityReport_legacy.drop(['Assignee_Name_act','Activity_Details_act'], axis = 1, inplace = True)
except KeyError:
    print('column not found')


Recreating parts of the logic of the stored procedure here

In [81]:
#convert Activity_Date_DT_act to text
min_date = activityReport['Activity_Date_DT_act'].min()
min_date_text = min_date.strftime(format = "%Y-%m-%d %H:%M:%S")

In [82]:
# DEFINE THE DATABASE CREDENTIALS
cred_dict = return_credentials()

user = cred_dict['user'] 
password = cred_dict['pass'] 
host = cred_dict['host'] 
port = cred_dict['port'] 
database = cred_dict['database'] 

def get_connection():
	return sqlalchemy.create_engine(
		url="mysql+pymysql://{0}:{1}@{2}:{3}/{4}".format(
			user, password, host, port, database
		)
	)

if __name__ == '__main__':

	try:
	
		# GET THE CONNECTION OBJECT (ENGINE) FOR THE DATABASE
		engine = get_connection()
		print(
			f"Connection to the {host} for user {user} created successfully.")
	except Exception as ex:
		print("Connection could not be made due to the following error: \n", ex)

Connection to the 172.17.0.2 for user root created successfully.


In [85]:
#retrieve the pre-existing table
#NOTE this may not be sustainable the larger the transaction list becomes
#sample functioning python code: type_new = conn.execute(text("SELECT COUNT(*) FROM consolidated_mem_type_temp2"))
#example of a functioning query: SELECT * FROM consolidated_mem_status WHERE start_dt > date('2024-08-15 02:54:00') limit 10;
#WHERE start_dt > date({min_date_text})
#conn.execute(sqlalchemy.text(f"RENAME TABLE {replacement_table} to {legacy_table}"))
from sqlalchemy.sql import select
from sqlalchemy.sql import text

with engine.connect() as conn:
    historical = conn.execute(text(f'SELECT * FROM consolidated_rawActivityReport WHERE Activity_Date_DT_act < DATE("{min_date_text}")'))
    historical_df = pd.DataFrame(historical.fetchall(), columns=historical.keys())

In [86]:
#just reconciling the dtypes of ea table
pd.concat([pd.Series(historical_df.dtypes),pd.Series(activityReport.dtypes)], axis =1, names = ['historical_df','activityReport'])

Unnamed: 0,0,1
Target_Name_act,object,object
Source_Email_act,object,object
Target_Email_act,object,object
Activity_Type_act,object,object
Subject_act,object,object
Activity_Date_act,object,object
Activity_Status_act,object,object
email_grouping,object,object
Activity_Date_DT_act,datetime64[ns],datetime64[ns]


Merge/concatenate the two tables then remove duplicates

In [87]:
consolidated = pd.concat([historical_df,activityReport])

In [88]:
consolidated_nd = consolidated.drop_duplicates()

In [89]:
consolidated_nd.shape

(22044, 9)

Be sure that the datatypes from both tables are equal

In [90]:
try:
    frame = consolidated_nd.to_sql('consolidated_rawActivityReport', con=engine, if_exists='replace', index=False)
    #drop latest_trial as field is redundant
    #frame = activityReport_legacy.to_sql('consolidated_rawActivityReport', con=engine, if_exists='replace', index=False)
except ValueError as vx:
    print(vx)
except Exception as ex:   
    print(ex)
else:
    print("Table created successfully");   
finally:
    engine.dispose()

Table created successfully


Call procedure the merge the pre-existing and update tables