In [85]:
import json
import os
import datetime
import getpass
import pyodbc
import pandas as pd
import pandasql as ps
import numpy as np
from sqlalchemy.engine import URL
from sqlalchemy import create_engine


In [105]:
class ConfigNotFoundError(Exception):
    """Custom exception for configuration not found errors."""
    pass

class ConnectionError(Exception):
    """Custom exception for connection errors."""

class SSMSConnnectionManager:
    def __init__(self, config, password):
        self.config = config
        self.password = password
        self.driver = None 
        self.server = None
        self.port = None
        self.database = None
        self.username = None
        self.engine = None
        self.pyodbc_connect = None
        self.connecting_string = None

    def set_connection_config(self):
        if self.config['Environment']:
            env = self.config['Environment']
        else: 
            raise Exception('Environment configuration not found')

        for key, value in self.config.items():
            try:
                if 'SSMSConnectionManager' in key:
                    server_name = env+'Server'
                    self.driver = value['Driver']
                    self.server = value[server_name]
                    self.port = value['Port']
                    self.database = value['Database']
                    self.username = value['Username']
            except Exception as e:
                print(f"SSMSConnectionManager configuration failure. The following configuration has not been set correctly: {e}")

    def connect(self):
        try:   
            self.connecting_string = f"Driver={self.driver};Server={self.server},{self.port};Database={self.database};UID={self.username};PWD={self.password};&autocommit=true"
            connection_url = URL.create("mssql+pyodbc", query={"odbc_connect":self.connecting_string})
            engine = create_engine(connection_url, use_setinputsizes=False, echo=False)
            self.pyodbc_connect = pyodbc.connect(self.connecting_string)
            self.engine = engine.connect()
            print("Connected to SSMS successfully.")
            return(self.pyodbc_connect)
        except KeyError as e:
            raise ConfigNotFoundError(f"Missing required configuration key: {e}")
        except pyodbc.Error as e:
                raise ConnectionError(f"Error connecting to SSMS: {e}")

    def query_table(self, query):
        if not self.engine:
            print("No active connection.")
            return None
        
        try:
            df = pd.read_sql(query, con=self.engine)
            print("Query executed successfully.")
            return df
        except Exception as e:
            print(f"Error executing query: {e}")
            return None


    def insert_into_sql(self, df, tablename, columns):
        conn = self.pyodbc_connect
        cursor = self.pyodbc_connect.cursor()

        # Create the SQL insert statement dynamically
        placeholders = ', '.join(['?'] * len(columns))
        columns_str = ', '.join(columns)
        sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"

        # Prepare the data for insertion
        data = df[columns].values.tolist()

        if not self.pyodbc_connect:
            print("No active connection.")
            return None

        try:
            # Execute the insert statement
            cursor.executemany(sql, data)

            # Commit the transaction
            self.pyodbc_connect.commit()

            # Close the connection
            #cursor.close()
            #self.pyodbc_connect.close()
            print("Data imported successfully.")
        except (Exception, pyodbc.DatabaseError) as error:
            # Rollback the transaction on failure
            self.pyodbc_connect.rollback()
            print(f"Error importing data into {tablename}: {error}")
        finally:
            cursor.close()
            self.pyodbc_connect.close()

    def update_table_with_dataframe(self, df, table_name, key_columns, update_columns):
        """
        Update the table with the dataframe values.
        :param conn: Database connection object
        :param df: pandas DataFrame with the data to update
        :param table_name: Name of the table to update
        :param key_columns: List of column names to use as keys for the update
        :param update_columns: List of column names to update
        """
        conn = self.pyodbc_connect
        cursor = conn.cursor()
        
        try:
            for index, row in df.iterrows():
                set_clause = ', '.join([f"{col} = ?" for col in update_columns])
                where_clause = ' AND '.join([f"{col} = ?" for col in key_columns])
                sql_query = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}"
                parameters = [row[col] for col in update_columns] + [row[col] for col in key_columns]
                cursor.execute(sql_query, parameters)
            conn.commit()
        except (Exception, pyodbc.DatabaseError) as error:
            print(f"Error: {error}")
            conn.rollback()
        finally:
            cursor.close()
            self.pyodbc_connect.close()

    def close(self):
        if self.engine:
            self.engine.close()
            print("Connection to SSMS closed.")
        else:
            print("No active connection to close.")

    def run_connection_manager(self):
        try:
            self.set_connection_config()
            self.connect()
            return(self.pyodbc_connect)
        except (ConfigNotFoundError, ConnectionError) as e:
            print(f"Error: {e}")

In [106]:
class carrierConfig:
    def __init__(self, config_file_path):
       self.config_file_path = config_file_path
       self.config = self.read_config()

    def read_config(self):
        try:
            with open(self.config_file_path, 'r') as file:
                config = json.load(file)
            return(config)
        except Exception as e:
            print(f"Error reading config file: {e}")
            return(None)

    def get_carrier_config(self, config=None):
        if config is None:
            config = self.config

        config_list = [(obj['carrierId'], obj['carrierName'], obj["carrierFileType"], f"{config['FileDetails']['mainPath']+obj['carrierName']}\\", obj["delimiter"]) for obj in config.get('CarrierDetials')]

        carrier_config_worklist = []

        def create_carrier_filename_worklist (worklist):
            for details in worklist:
                carrier_id = details[0]
                carrier_file_type = details[2]
                if os.path.isdir(details[3]):
                    for fileName in os.listdir(details[3]):
                        if carrier_file_type in fileName:
                            carrier_config_worklist.append((details[0],details[1],details[3]+fileName,details[4]))
                        else:
                            continue
                            #print(f'No file listed in current directory: {details[3]}')
                else:
                    print(f"The directory '{details[3]}' does not exist.")

        create_carrier_filename_worklist(config_list)
        return carrier_config_worklist 

In [107]:
class CarrierLoader:
    def __init__(self, carrier_worklist):
        self.carrier_worklist = carrier_worklist
        self.standard_headers = ['Carrier Name','Brand Name','Plan Name','State','Document Type','Language','Effective Date','Document URL']
        self.carrierWorklist = None
        self.read_method = None
        self.df = None
        self.log_df = None

    def load_file(self):
        dataframe = pd.DataFrame()
        files_with_issues = []
        try:
            # loop through files and check if columns in file matches standard_headers and load to dataframe if matched. If not create a dataframe output errors to logging. 
            for carrierWorklist in self.carrier_worklist:
                file_path = carrierWorklist[2]
                delimiter_len = len(carrierWorklist[3])
                # Read the Excel file
                if file_path.split('.')[-1] in ['xlsx','xls'] and delimiter_len == 0:
                    df = pd.read_excel(file_path, dtype={'Document URL': str})
                    
                    # Get the headers of the DataFrame
                    file_headers = df.columns.tolist()
                    
                    # Check if the file headers match the standard headers
                    if all(column in file_headers for column in self.standard_headers):
                        print(f"Headers in {file_path} match the standard headers.")
                        # Add new column in df
                        df['CarrierId'] = carrierWorklist[0]
                        df['EffectiveEndDate'] = ''
                        df['LoadDate'] = pd.Timestamp.now()
                        df['fileName'] =  carrierWorklist[2] #carrierWorklist[2].split('\\')[-1]
                        df['FileLoadIndicator'] = 'Y'
                        df.rename(columns={'Carrier Name':'CarrierName', 'Brand Name':'BrandName', 'Document Type':'DocumentType', 'Effective Date':'EffectiveBeginDate', 'Document URL':'DocumentUrl', 'Plan Name':'PlanName', 'StateCd':'State'},inplace=True)
                        df = df[['CarrierId','CarrierName','BrandName','PlanName','State','DocumentType','Language','EffectiveBeginDate','EffectiveEndDate','DocumentUrl','LoadDate','fileName','FileLoadIndicator']]
                        dataframe = pd.concat([dataframe, df], ignore_index=True)
                    else:
                        print(f"Headers in {file_path} do NOT match the standard headers.")
                        files_with_issues.append((f"{file_path}", "Headers do Not match standard template."))
                # Read the Csv file without delimiter
                elif file_path.split('.')[-1] in ['csv','txt','text'] and delimiter_len == 0:
                    df = pd.read_csv(file_path)
                    
                    # Get the headers of the DataFrame
                    file_headers = df.columns.tolist()
                    
                    # Check if the file headers match the standard headers
                    if all(column in file_headers for column in self.standard_headers):
                        print(f"Headers in {file_path} match the standard headers.")
                        # Add new column in df
                        df['CarrierId'] = carrierWorklist[0]
                        df['EffectiveEndDate'] = ''
                        df['LoadDate'] = pd.Timestamp.now()
                        df['fileName'] =  carrierWorklist[2] #carrierWorklist[2].split('\\')[-1]
                        df['FileLoadIndicator'] = 'Y'
                        df.rename(columns={'Carrier Name':'CarrierName', 'Brand Name':'BrandName', 'Document Type':'DocumentType', 'Effective Date':'EffectiveBeginDate', 'Document URL':'DocumentUrl', 'Plan Name':'PlanName', 'StateCd':'State'},inplace=True)
                        df = df[['CarrierId','CarrierName','BrandName','PlanName','State','DocumentType','Language','EffectiveBeginDate','EffectiveEndDate','DocumentUrl','LoadDate','fileName','FileLoadIndicator']]
                        dataframe = pd.concat([dataframe, df], ignore_index=True)
                        #dataframe.append(df)
                    else:
                        print(f"Headers in {file_path} do NOT match the standard headers.")
                        files_with_issues.append((f"{file_path}", "Headers do Not match standard template."))
                # Read the Csv file with delimiter
                elif file_path.split('.')[-1] in ['csv','txt','text'] and delimiter_len > 0:
                    df = pd.read_csv(file_path, delimiter= carrierWorklist[3])
                    
                    # Get the headers of the DataFrame
                    file_headers = df.columns.tolist()
                    
                    # Check if the file headers match the standard headers
                    if all(column in file_headers for column in self.standard_headers):
                        print(f"Headers in {file_path} match the standard headers.")
                        # Add new column in df
                        df['CarrierId'] = carrierWorklist[0]
                        df['EffectiveEndDate'] = ''
                        df['LoadDate'] = pd.Timestamp.now()
                        df['fileName'] =  carrierWorklist[2] #carrierWorklist[2].split('\\')[-1]
                        df['FileLoadIndicator'] = 'Y'
                        df.rename(columns={'Carrier Name':'CarrierName', 'Brand Name':'BrandName', 'Document Type':'DocumentType', 'Effective Date':'EffectiveBeginDate', 'Document URL':'DocumentUrl', 'Plan Name':'PlanName', 'StateCd':'State'},inplace=True)
                        df = df[['CarrierId','CarrierName','BrandName','PlanName','State','DocumentType','Language','EffectiveBeginDate','EffectiveEndDate','DocumentUrl','LoadDate','fileName','FileLoadIndicator']]
                        dataframe = pd.concat([dataframe, df], ignore_index=True)

                    else:
                        print(f"Headers in {file_path} do NOT match the standard headers.")
                        failed_desc_list = [{'SourceFile': f"{file_path}", 'ErrorDescription':"Headers don't match standard template."}]
                        files_with_issues.append((f"{file_path}", "Headers do Not match standard template."))
        except Exception as e:
            print(f"Error loading file '{carrierWorklist[2]}': {e}")     
            
        self.df = dataframe.drop_duplicates()
        self.log_df = pd.DataFrame(files_with_issues, columns=['SourceFile','ErrorDescription'])
        return(self.df, self.log_df)
    

In [108]:
class validations:
    def __init__(self,main_df,validation_restuls_df,column_dups,columns_to_check):
        self.main_df = main_df
        self.validation_restuls_df = validation_restuls_df
        self.error_logging_dup_df = None
        self.error_logging_column_df = None
        self.column_dups = column_dups
        self.columns_to_check = columns_to_check

    def validate_no_duplicates(self):
        # Check for duplicates based on self.column_dups
        duplicates = self.main_df.duplicated(subset=self.column_dups, keep=False)
        # Display rows that are duplicates based on self.column_dups
        results = self.main_df[duplicates]
        duplicate_df = pd.DataFrame()

        if len(results['fileName'].unique().tolist()) == 0:
            print("Duplicate Validations: Passed")
        else:
            for fileName in results['fileName'].unique().tolist():
                error_logging_dup_df = pd.DataFrame({'fileName':[f'{fileName}'], 'ErrorDescription':["Please check source file for duplicates."]})
                self.error_logging_dup_df =  pd.concat([duplicate_df, error_logging_dup_df], ignore_index=True)
            return (duplicate_df)

    def validate_no_blanks_or_nulls_in_column(self):
        # Identify rows where 'ProductStateId' is null or blank, if so log errors becasue data was not found to be mapped on. 
        column_df = pd.DataFrame()
        for column in self.columns_to_check:
            results = self.main_df[self.main_df[column].isnull() | (self.main_df[column] == '')]

            if len(results['fileName'].unique().tolist()) != 0:
                for fileName in results['fileName'].unique().tolist():
                    column_error_df = pd.DataFrame({'fileName':[f'{fileName}'], 'ErrorDescription':[f"Column not populated '{column}'"]})
                    self.error_logging_column_df =  pd.concat([column_df, column_error_df], ignore_index=True)
                    #print(f"Please check source file '{fileName}', value not populated for the following column '{column}'.")
            else:
                print(f"Check column '{column}' for blanks or null values: Passed")
        return (column_df)
    
    def combine_values_dataframe(self):
        # consolidates all values from data that match on as specific column & create new column 'FileLoadIndicator' = 'N' to for files not to load
        #if self.error_logging_dup_df is not None or self.error_logging_column_df is not None:
        if  self.error_logging_dup_df is not None and self.error_logging_column_df is not None:
            self.validation_restuls_df = pd.concat([self.error_logging_dup_df, self.error_logging_column_df], ignore_index=True)
            self.validation_restuls_df = self.validation_restuls_df.groupby('fileName')['ErrorDescription'].agg(lambda x: '; '.join(x)).reset_index()
            self.validation_restuls_df['FileLoadIndicator'] = 'N'
            print("Valdations completed: Failed, please check logs")
        elif self.error_logging_dup_df is not None and self.error_logging_column_df is None:
            self.validation_restuls_df = self.error_logging_dup_df
            self.validation_restuls_df = self.validation_restuls_df.groupby('fileName')['ErrorDescription'].agg(lambda x: '; '.join(x)).reset_index()
            self.validation_restuls_df['FileLoadIndicator'] = 'N'
            print("Valdations completed: Failed, please check logs")
        elif self.error_logging_dup_df is None and self.error_logging_column_df is not None:
            self.validation_restuls_df = self.error_logging_column_df
            self.validation_restuls_df = self.validation_restuls_df.groupby('fileName')['ErrorDescription'].agg(lambda x: '; '.join(x)).reset_index()
            self.validation_restuls_df['FileLoadIndicator'] = 'N'
            print("Valdations completed: Failed, please check logs")
        else:
            print("Validations completed.....")
            
        return(self.validation_restuls_df)

    def run_all_validations(self):
        self.validate_no_duplicates()
        self.validate_no_blanks_or_nulls_in_column()
        self.combine_values_dataframe()
        return(self.validation_restuls_df)
        
        

In [125]:
# input password for SSMS\PostgreSQL connection
server_password = getpass.getpass("Enter password to connect to SSMS and PostgreSQL: ")

# Paths to configuration files
config_file_path = 'AncillaryDocumentConfig.json' 

# Read configuration
carrier_config = carrierConfig(config_file_path)
# Prepare Carrier Config List for Data Ingest
carrier_config_list = carrier_config.get_carrier_config()  

# Load carrier data to dataframe 
load_carrier_config = CarrierLoader(carrier_config_list)
main_df, validation_restuls_df = load_carrier_config.load_file()

# Get distinct CarrierId list
carrier_id = tuple(sorted([str(id[0]) for id in carrier_config_list]))

# Connect to SSMS
ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
ssms_connection.run_connection_manager()

# Set query join dbo.AncillaryProduct & dbo.AncillaryProductState to get list of ProductStateID based on values set in parameter "carrier_id"
if len(carrier_id) == 1:
    AncillaryProductState_Q = f"""
		SELECT 
			p.ProductId,
			p.CarrierId,
			p.BrandId,
			p.ProductName,
			ps.ProductStateId,
			ps.StateCd
		FROM dbo.AncillaryProduct p 
		JOIN dbo.AncillaryProductState ps on p.ProductId = ps.ProductId
		WHERE p.CarrierId = {carrier_id[0]}
		"""
else:
	AncillaryProductState_Q = f"""
		SELECT 
			p.ProductId,
			p.CarrierId,
			p.BrandId,
			p.ProductName,
			ps.ProductStateId,
			ps.StateCd
		FROM dbo.AncillaryProduct p 
		JOIN dbo.AncillaryProductState ps on p.ProductId = ps.ProductId
		WHERE p.CarrierId in {carrier_id}
		"""
	
# Run Query AncillaryProductState_Q and set to Dataframe 
ancillary_product_state_df = ssms_connection.query_table(AncillaryProductState_Q)
# Close to connection to ssms
ssms_connection.close()
# Rename columns in ancillary_product_state_df
ancillary_product_state_df.rename(columns={'ProductName':'PlanName', 'StateCd':'State'},inplace=True)

# Pandas left join main_df and ancillary_product_state_df on CarrierId, State and Plan Name 
ancillary_document_df = pd.merge(main_df, ancillary_product_state_df, on=['CarrierId','State','PlanName'], how='left')

# Update document type in ancillary_document_df if language == Spanish beginning prefix of documentType = 'SP_' && language == English beginning prefix of documentType = 'EN_'
def update_language_docs(row):
    if row['Language'] == 'Spanish':
        return 'SP_' + row['DocumentType']
    else:
        return row['DocumentType'] 
    
ancillary_document_df['DocumentTypeNew'] = ancillary_document_df.apply(update_language_docs, axis=1)

# Set product_state_id tuple fromm ancillary_document_df
product_state_id = tuple(sorted([str(id) for id in ancillary_document_df['ProductStateId'].unique().tolist()]))

# Load dbo.AncillaryDocuments to ssms_document_df
# Connect to SSMS
ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
ssms_connection.run_connection_manager()

# Query Ancillary Document table where ProductStateId in {product_state_id} and Load dbo.AncillaryDocuments to ssms_document_df
AncillaryDocument_Q = f"""
SELECT *
FROM dbo.AncillaryDocument 
WHERE EffectiveEndDate is null and ProductStateId in {product_state_id}
"""
# Run Query AncillaryProductState_Q and set to Dataframe 
ssms_document_df = ssms_connection.query_table(AncillaryDocument_Q)
# Close to connection to ssms
ssms_connection.close()

# Set Parameters and Run validations rules and check for data load issues for all files loaded into dataframe
check_columns_for_dup = ['ProductStateId','CarrierId', 'CarrierName', 'BrandName','PlanName','State','DocumentType','Language','EffectiveBeginDate','fileName']
columns_to_check_if_null = ['CarrierId','PlanName','State','DocumentType','Language','EffectiveBeginDate','DocumentUrl','ProductStateId']
set_validations = validations(ancillary_document_df,validation_restuls_df,check_columns_for_dup,columns_to_check_if_null)
validation_restuls_df = set_validations.run_all_validations()

# Update FileLoadIndicator = 'N' in ancillary_document_df for files with issues from validation_restuls_df. 
if validation_restuls_df.size > 0:
	# merge results with ancillary_document_df
	ancillary_document_df = pd.merge(ancillary_document_df, validation_restuls_df, on='fileName', how='left',suffixes=('', '_new'))

	# Update the 'FileLoadIndicator' column based on the 'FileLoadIndicator_new' column and the 'fileName' match
	ancillary_document_df['FileLoadIndicator'] = np.where(ancillary_document_df['fileName'].isin(validation_restuls_df['fileName']), ancillary_document_df['FileLoadIndicator_new'], ancillary_document_df['FileLoadIndicator'])

	# Drop the 'FileLoadIndicator_new' & 'ErrorDescription' column.
	ancillary_document_df.drop(columns=['FileLoadIndicator_new','ErrorDescription'], inplace=True)
else:
    print('All validations passed....')

# Set DocumentType = DocumentTypeNew in ancillary_document_df
ancillary_document_df['DocumentType'] = ancillary_document_df['DocumentTypeNew']

# Select specfic columns where FileLoadIndicator = 'Y' in ancillary_document_df and Select specific columns to prepaare to load into ssms.
ancillary_document_df = ancillary_document_df[ancillary_document_df['FileLoadIndicator'] == 'Y']
ancillary_document_df = ancillary_document_df[['ProductStateId','DocumentType','EffectiveBeginDate','DocumentUrl','LoadDate']]

# Update FileLoadIndicator = 'N' in ancillary_document_df for files with issues from validation_restuls_df. 
ssms_document_df = pd.merge(ssms_document_df, ancillary_document_df, on=['ProductStateId','DocumentType'], how='inner',suffixes=('', '_new'))


# Apply EffectiveEndDate and UpdateDate to previous records with new documents.
if ssms_document_df is not None:
	# Set new end dates ('EffectiveEndDate') for previous Documents 1 day prior to new documents loaded and apply udpates to dbo.AncillaryDopcument based on ProductStateId
	ssms_document_df['EffectiveEndDate'] = pd.to_datetime(ssms_document_df['EffectiveBeginDate_new'] - pd.Timedelta(days=1)).dt.date
	ssms_document_df['UpdateDate'] = pd.Timestamp.now()

	# Convert columns to appropriate types
	ssms_document_df['ProductStateId'] = ssms_document_df['ProductStateId'].astype(int)
	ssms_document_df['EffectiveEndDate'] = pd.to_datetime(ssms_document_df['EffectiveEndDate'])
	ssms_document_df['UpdateDate'] = pd.to_datetime(ssms_document_df['UpdateDate'])
      
	# Set Parameters to update dbo.AncillaryDocument in ssms and postgresql
	table_name = "dbo.AncillaryDocument"
	key_columns = ['ProductStateId']  # Column(s) used as keys for the update
	update_columns = ['EffectiveEndDate', 'UpdateDate']  # Columns to update

	# Connect to SSMS
	ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
	ssms_connection.run_connection_manager()

	# udpate EffectiveEndDate and UpdateDate in dbo.AncillaryDocument
	ssms_connection.update_table_with_dataframe(ssms_document_df, table_name, key_columns, update_columns)

	# Close to connection to ssms
	ssms_connection.close()
else:
	print('Empty')

# Connects to SSMS and Insert new data into SSMS dbo.AncillaryDocument where FileLoadIndicator = 'Y' from ancillary_document_df. On complete closes connection to SSMS
table_name = 'dbo.AncillaryDocument'
columns_to_insert = ['ProductStateId', 'DocumentType', 'EffectiveBeginDate', 'DocumentUrl','LoadDate']

ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
ssms_connection.run_connection_manager()
ssms_connection.insert_into_sql(ancillary_document_df,table_name,columns_to_insert)

# Update QA with changes applied to caliload dbo.AncillaryDocument Table to PostgreSQL 

# Migrate to QA


Headers in D:\Data\Ancillary\HI\Documents\Cigna\Cigna HIP Flex Documents 2.xlsx match the standard headers.
Connected to SSMS successfully.
Query executed successfully.
Connection to SSMS closed.
Connected to SSMS successfully.
Query executed successfully.
Connection to SSMS closed.
Duplicate Validations: Passed
Check column 'CarrierId' for blanks or null values: Passed
Check column 'PlanName' for blanks or null values: Passed
Check column 'State' for blanks or null values: Passed
Check column 'DocumentType' for blanks or null values: Passed
Check column 'Language' for blanks or null values: Passed
Check column 'EffectiveBeginDate' for blanks or null values: Passed
Check column 'DocumentUrl' for blanks or null values: Passed
Check column 'ProductStateId' for blanks or null values: Passed
Validations completed.....
All validations passed....
Connected to SSMS successfully.
Connection to SSMS closed.
Connected to SSMS successfully.
Data imported successfully.


In [126]:
# Load dbo.AncillaryDocuments to ssms_document_df
# Connect to SSMS
ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
ssms_connection.run_connection_manager()

# Query Ancillary Document table where ProductStateId in {product_state_id} and Load dbo.AncillaryDocuments to ssms_document_df
AncillaryDocument_Q = f"""
SELECT *
FROM dbo.AncillaryDocument 
WHERE EffectiveEndDate is null and ProductStateId in {product_state_id}
"""
# Run Query AncillaryProductState_Q and set to Dataframe 
ssms_document_df = ssms_connection.query_table(AncillaryDocument_Q)
# Close to connection to ssms
ssms_connection.close()

Connected to SSMS successfully.
Query executed successfully.
Connection to SSMS closed.


In [127]:
# Update FileLoadIndicator = 'N' in ancillary_document_df for files with issues from validation_restuls_df. 
ssms_document_df = pd.merge(ssms_document_df, ancillary_document_df, on=['ProductStateId','DocumentType'], how='inner',suffixes=('', '_new'))

# Set new end dates ('EffectiveEndDate') for previous Documents 1 day prior to new documents loaded and apply udpates to dbo.AncillaryDopcument based on ProductStateId
ssms_document_df['EffectiveEndDate'] = pd.to_datetime(ssms_document_df['EffectiveBeginDate_new'] - pd.Timedelta(days=1)).dt.date
ssms_document_df['UpdateDate'] = pd.Timestamp.now()

# Convert columns to appropriate types
ssms_document_df['ProductStateId'] = ssms_document_df['ProductStateId'].astype(int)
ssms_document_df['EffectiveEndDate'] = pd.to_datetime(ssms_document_df['EffectiveEndDate'])
ssms_document_df['UpdateDate'] = pd.to_datetime(ssms_document_df['UpdateDate'])


In [128]:
display(ssms_document_df)

Unnamed: 0,AncillaryDocumentId,ProductStateId,DocumentType,EffectiveBeginDate,EffectiveEndDate,DocumentUrl,UpdateDate,LoadDate,EffectiveBeginDate_new,DocumentUrl_new,LoadDate_new
0,50,569,Brochure,2024-07-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:23:17.327404,2024-06-10 22:17:37.460,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:17:37.461492
1,51,570,Brochure,2024-07-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:23:17.327404,2024-06-10 22:17:37.460,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:17:37.461492
2,52,571,Brochure,2024-06-01,2024-05-31,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:23:17.327404,2024-06-10 22:17:37.460,2024-06-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:17:37.461492
3,53,580,Brochure,2024-06-15,2024-06-14,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:23:17.327404,2024-06-10 22:17:37.460,2024-06-15,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:17:37.461492
4,54,585,Brochure,2024-08-01,2024-07-31,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:23:17.327404,2024-06-10 22:17:37.460,2024-08-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-10 22:17:37.461492


In [129]:
import pandas as pd
from sqlalchemy import create_engine, text

def connect_to_postgres(config):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        params = config

        # connect to the PostgreSQL server
        conn = psycopg2.connect(**params)
        return conn
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        return None

def update_postgres_table(df, table_name, connection_string, key_column):
    """
    Update records in a PostgreSQL table based on a DataFrame.

    :param df: pandas DataFrame containing the data to update
    :param table_name: name of the target table in PostgreSQL
    :param connection_string: connection string to connect to the PostgreSQL database
    :param key_column: the column name used as the unique key to identify rows
    """
    # Create an engine instance
    engine = create_engine(connection_string)
    
    # Connect to the database
    with engine.connect() as connection:
        for index, row in df.iterrows():
            # Prepare the update query
            set_clause = ', '.join([f"{col} = :{col}" for col in df.columns if col != key_column])
            update_query = f"""
            UPDATE {table_name}
            SET {set_clause}
            WHERE {key_column} = :{key_column}
            """
            
            # Execute the update query with the row data
            connection.execute(text(update_query), **row.to_dict())



In [None]:
# Set Parameters to update dbo.AncillaryDocument in ssms and postgresql
table_name = "dbo.AncillaryDocument"
key_columns = ['ProductStateId']  # Column(s) used as keys for the update
update_columns = ['EffectiveEndDate', 'UpdateDate']  # Columns to update

In [None]:
# dev: pg-presentation-dev.sunfirematrix.com
# qa: pg-presentation-qa-rw.sunfirematrix.com
# Define your PostgreSQL connection parameters in a dictionary
config = {
    "dbname": "planpresentation",
    "user":"sunfiresa",
    "password":"Ij30dIcmyam",
    "host":"pg-presentation-dev.sunfirematrix.com",
    "port":"5432"
}

# Connect to PostgreSQL
conn = connect_to_postgres(config)

if conn is not None:
    # Perform bulk insert
    bulk_insert_to_table(conn, ssms_document_df, table_name)

    # Close the connection
    conn.close()

In [None]:
# dev: pg-presentation-dev.sunfirematrix.com
# qa: pg-presentation-qa-rw.sunfirematrix.com
# Define your PostgreSQL connection parameters in a dictionary
config = {
    "dbname": "planpresentation",
    "user":"sunfiresa",
    "password":"Ij30dIcmyam",
    "host":"pg-presentation-dev.sunfirematrix.com",
    "port":"5432"
}

# Connect to PostgreSQL
conn = connect_to_postgres(config)

if conn is not None:
    # Update the PostgreSQL table
    update_postgres_table(ssms_document_df, table_name, conn, key_columns)


In [120]:
# Load dbo.AncillaryDocuments to ssms_document_df
# Connect to SSMS
ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
ssms_connection.run_connection_manager()

# Query Ancillary Document table where ProductStateId in {product_state_id} and Load dbo.AncillaryDocuments to ssms_document_df
AncillaryDocument_Q = f"""
SELECT *
FROM dbo.AncillaryDocument 
WHERE CAST(LoadDate as DATE) = CAST(GETDATE() as DATE) 
"""
# Run Query AncillaryProductState_Q and set to Dataframe 
ssms_current_document_df = ssms_connection.query_table(AncillaryDocument_Q)
# Close to connection to ssms
ssms_connection.close()

Connected to SSMS successfully.
Query executed successfully.
Connection to SSMS closed.


In [122]:
display(ssms_current_document_df.head())

Unnamed: 0,AncillaryDocumentId,ProductStateId,DocumentType,EffectiveBeginDate,EffectiveEndDate,DocumentUrl,UpdateDate,LoadDate
0,1,569,Brochure,2024-05-01,,https://cignaforbrokers.com/gasbagent/cache/fo...,,2024-06-10 21:41:55.997
1,2,570,Brochure,2024-05-01,,https://cignaforbrokers.com/gasbagent/cache/fo...,,2024-06-10 21:41:55.997
2,3,571,Brochure,2024-05-01,,https://cignaforbrokers.com/gasbagent/cache/fo...,,2024-06-10 21:41:55.997
3,4,580,Brochure,2024-05-01,,https://cignaforbrokers.com/gasbagent/cache/fo...,,2024-06-10 21:41:55.997
4,5,585,Brochure,2024-05-01,,https://cignaforbrokers.com/gasbagent/cache/fo...,,2024-06-10 21:41:55.997


In [58]:
def check_data_if_previously_loaded (df, columns_to_check):
    for i in columns_to_check:
        if 'Date' in i[0] and 'Date' in i[1] and matched is None:
            check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
            matched = check_match
        elif 'Date' in i[0] and 'Date' in i[1] and matched is not None:
            check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
            matched = matched & check_match
        elif 'Date' not in i[0] and 'Date' not in i[1] and matched is None:
            check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
            matched = check_match
        elif 'Date' not in i[0] and 'Date' not in i[1] and matched is not None:
            check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
            matched = matched & check_match

IndentationError: expected an indented block after 'for' statement on line 2 (4110948826.py, line 3)

In [527]:
columns_to_check_if_previously_loaded = (('EffectiveBeginDate','EffectiveBeginDate_new'), ('DocumentUrl','DocumentUrl_new'))

In [534]:
matched = None

In [535]:
if matched is None:
    print('emtpy')

emtpy


In [540]:
for i in columns_to_check_if_previously_loaded:
    if 'Date' in i[0] and 'Date' in i[1] and matched is None:
        check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
        matched = check_match
    elif 'Date' in i[0] and 'Date' in i[1] and matched is not None:
        check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
        matched = matched & check_match
    elif 'Date' not in i[0] and 'Date' not in i[1] and matched is None:
        check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
        matched = check_match
    elif 'Date' not in i[0] and 'Date' not in i[1] and matched is not None:
        check_match = pd.to_datetime(ssms_document_df[col_pair1[0]]).dt.date == pd.to_datetime(ssms_document_df[col_pair1[1]]).dt.date
        matched = matched & check_match

In [547]:
if matched.any():
    print("Rows where both pairs of columns match:")
    print(ssms_document_df[matched])
else:
    print("No rows found where both pairs of columns match.")

Rows where both pairs of columns match:
    AncillaryDocumentId  ProductStateId DocumentType EffectiveBeginDate  \
48                   49             618  EN_Brochure         2024-05-01   

   EffectiveEndDate                                        DocumentUrl  \
48       2024-04-30  https://cignaforbrokers.com/gasbagent/cache/fo...   

                   UpdateDate                LoadDate EffectiveBeginDate_new  \
48 2024-06-03 21:49:00.324315 2024-06-03 16:56:40.803             2024-05-01   

                                      DocumentUrl_new  
48  https://cignaforbrokers.com/gasbagent/cache/fo...  


In [553]:
# Example of performing an action when values match
ssms_document_df['Match_Ind'] = matched.apply(lambda x: 'Matched' if x else 'Not Matched')
 

In [554]:
display(ssms_document_df.head())

Unnamed: 0,AncillaryDocumentId,ProductStateId,DocumentType,EffectiveBeginDate,EffectiveEndDate,DocumentUrl,UpdateDate,LoadDate,EffectiveBeginDate_new,DocumentUrl_new,Match_Action,Match_Ind
0,1,569,EN_Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-03 21:49:00.324315,2024-06-03 16:56:40.717,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,Not Matched,Not Matched
1,2,570,EN_Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-03 21:49:00.324315,2024-06-03 16:56:40.717,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,Not Matched,Not Matched
2,3,571,EN_Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-03 21:49:00.324315,2024-06-03 16:56:40.720,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,Not Matched,Not Matched
3,4,580,EN_Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-03 21:49:00.324315,2024-06-03 16:56:40.720,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,Not Matched,Not Matched
4,5,585,EN_Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-03 21:49:00.324315,2024-06-03 16:56:40.723,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,Not Matched,Not Matched


In [555]:
results = ssms_document_df[ssms_document_df['Match_Ind'] == 'Matched']

In [557]:
ancillary_document_id =  results['AncillaryDocumentId'].tolist()

In [560]:
print(f"Data Previously Loaded, Please check AncillaryDocumentId: {ancillary_document_id}")

Data Previously Loaded, Please check AncillaryDocumentId: [49]


In [60]:
display(ssms_document_df)

Unnamed: 0,AncillaryDocumentId,ProductStateId,DocumentType,EffectiveBeginDate,EffectiveEndDate,DocumentUrl,UpdateDate,LoadDate,EffectiveBeginDate_new,DocumentUrl_new,LoadDate_new
0,89,569,Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 22:26:19.086468,2024-06-04 21:20:25.890,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 21:41:11.299148
1,90,570,Brochure,2024-05-01,2024-06-30,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 22:26:19.086468,2024-06-04 21:20:25.890,2024-07-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 21:41:11.299148
2,91,571,Brochure,2024-05-01,2024-05-31,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 22:26:19.086468,2024-06-04 21:20:25.890,2024-06-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 21:41:11.299148
3,92,580,Brochure,2024-05-01,2024-06-14,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 22:26:19.086468,2024-06-04 21:20:25.890,2024-06-15,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 21:41:11.299148
4,93,585,Brochure,2024-05-01,2024-07-31,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 22:26:19.086468,2024-06-04 21:20:25.890,2024-08-01,https://cignaforbrokers.com/gasbagent/cache/fo...,2024-06-04 21:41:11.299148


In [64]:
key_columns = ['ProductStateId']  # Column(s) used as keys for the update
update_columns = ['EffectiveEndDate', 'UpdateDate']  # Columns to update

for index, row in ssms_document_df.iterrows():
    set_clause = ', '.join([f"{col} = ?" for col in update_columns])
    where_clause = ' AND '.join([f"{col} = ?" for col in key_columns])
    sql_query = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}"
    parameters = [row[col] for col in update_columns] + [row[col] for col in key_columns]
    print(sql_query, parameters)

UPDATE dbo.AncillaryDocument SET EffectiveEndDate = ?, UpdateDate = ? WHERE ProductStateId = ? [datetime.date(2024, 6, 30), Timestamp('2024-06-04 22:26:19.086468'), 569]
UPDATE dbo.AncillaryDocument SET EffectiveEndDate = ?, UpdateDate = ? WHERE ProductStateId = ? [datetime.date(2024, 6, 30), Timestamp('2024-06-04 22:26:19.086468'), 570]
UPDATE dbo.AncillaryDocument SET EffectiveEndDate = ?, UpdateDate = ? WHERE ProductStateId = ? [datetime.date(2024, 5, 31), Timestamp('2024-06-04 22:26:19.086468'), 571]
UPDATE dbo.AncillaryDocument SET EffectiveEndDate = ?, UpdateDate = ? WHERE ProductStateId = ? [datetime.date(2024, 6, 14), Timestamp('2024-06-04 22:26:19.086468'), 580]
UPDATE dbo.AncillaryDocument SET EffectiveEndDate = ?, UpdateDate = ? WHERE ProductStateId = ? [datetime.date(2024, 7, 31), Timestamp('2024-06-04 22:26:19.086468'), 585]


In [None]:
update_columns =

In [388]:
for col in update_columns.values():
    print(row[col])

2024-05-31 00:00:00
2024-06-03 00:00:00


In [59]:
for _, row in ssms_document_df.iterrows():
    print((row[join_column],)) 

NameError: name 'join_column' is not defined

In [389]:
# Create a list of tuples for the values to be updated
update_values = [tuple(row[col] for col in update_columns.values()) + (row[join_column],) for _, row in ssms_document_df.iterrows()]
    

In [390]:
print(update_values)

[(Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 1), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 2), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 3), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 4), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 5), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 6), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 7), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 8), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 9), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 10), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 11), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 12), (Timestamp('2024-06-30 00:00:00'), Timestamp('2024-06-03 00:00:00'), 13), (Timestamp('2024-06-30 00:00:00'), Timestamp('

In [392]:
ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
conn = ssms_connection.run_connection_manager()

Connected to SSMS successfully.


In [393]:
with conn.cursor() as cursor:
    cursor.executemany(sql_update_query, update_values)
conn.commit()

In [100]:
import psycopg2
from sqlalchemy import create_engine

db_name="planpresentation",
db_user="sunfiresa",
db_password="Ij30dIcmyam",
db_host="pg-presentation-qa.sunfirematrix.com",
db_port="5432"

# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
    dbname="planpresentation",
    user="sunfiresa",
    password="Ij30dIcmyam",
    host="pg-presentation-qa-rw.sunfirematrix.com",
    port="5432"
)


# Create a connection string
conn_str = f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'

# Create SQLAlchemy engine
engine = create_engine(conn_str)

# Create a cursor object
cur = conn.cursor()

# Execute a query
cur.execute("SELECT * FROM dbo.ancillaryproductstate")

# Fetch the results
results = cur.fetchall()

# Print the results
for row in results:
    print(row)

# Close the cursor and the connection
cur.close()
conn.close()


(629, 13, 'AL', None)
(630, 13, 'AR', None)
(631, 13, 'AZ', None)
(632, 13, 'CO', None)
(633, 13, 'CT', None)
(634, 13, 'DE', None)
(635, 13, 'FL', None)
(636, 13, 'GA', None)
(637, 13, 'IA', None)
(638, 13, 'ID', None)
(639, 13, 'IL', None)
(640, 13, 'IN', None)
(641, 13, 'KS', None)
(642, 13, 'KY', None)
(643, 13, 'LA', None)
(644, 13, 'MA', None)
(645, 13, 'MD', None)
(646, 13, 'MI', None)
(647, 13, 'MN', None)
(648, 13, 'MO', None)
(649, 13, 'MS', None)
(650, 13, 'MT', None)
(651, 13, 'NC', None)
(652, 13, 'ND', None)
(653, 13, 'NE', None)
(654, 13, 'NH', None)
(656, 13, 'NM', None)
(657, 13, 'NV', None)
(658, 13, 'OH', None)
(659, 13, 'OK', None)
(660, 13, 'OR', None)
(661, 13, 'PA', None)
(662, 13, 'RI', None)
(663, 13, 'SC', None)
(664, 13, 'SD', None)
(665, 13, 'TN', None)
(666, 13, 'TX', None)
(667, 13, 'UT', None)
(668, 13, 'VA', None)
(669, 13, 'VT', None)
(302, 6, 'AK', None)
(303, 6, 'AL', None)
(304, 6, 'AR', None)
(305, 6, 'AZ', None)
(306, 6, 'CO', None)
(307, 6, 'DC', 

In [103]:
# Connects to SSMS and Insert new data into SSMS dbo.AncillaryDocument where FileLoadIndicator = 'Y' from ancillary_document_df. On complete closes connection to SSMS
table_name = 'dbo.ancillarydocument'
columns_to_insert = ['ProductStateId', 'DocumentType', 'EffectiveBeginDate', 'DocumentUrl','LoadDate']

#ssms_connection = SSMSConnnectionManager(carrier_config.read_config(),server_password)
#ssms_connection.run_connection_manager()
#ssms_connection.insert_into_sql(ancillary_document_df,table_name,columns_to_insert)

In [123]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values

def connect_to_postgres(config):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        params = config

        # connect to the PostgreSQL server
        conn = psycopg2.connect(**params)
        return conn
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        return None

def bulk_insert_to_table(conn, df, table_name):
    """ Bulk insert dataframe into table """
    # Create a list of tuples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))

    # SQL query to execute
    query = f"INSERT INTO {table_name}({cols}) VALUES %s"

    cursor = conn.cursor()
    try:
        execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("Bulk insert complete")
    cursor.close()

In [124]:
# dev: pg-presentation-dev.sunfirematrix.com
# qa: pg-presentation-qa-rw.sunfirematrix.com
# Define your PostgreSQL connection parameters in a dictionary
config = {
    "dbname": "planpresentation",
    "user":"sunfiresa",
    "password":"Ij30dIcmyam",
    "host":"pg-presentation-dev.sunfirematrix.com",
    "port":"5432"
}

# Connect to PostgreSQL
conn = connect_to_postgres(config)

if conn is not None:
    # Perform bulk insert
    bulk_insert_to_table(conn, ssms_current_document_df, table_name)

    # Close the connection
    conn.close()

Bulk insert complete
