In [2]:
import re
import pandas as pd
import pyodbc
from OTAs.file_management.file_path_manager import FilePathManager
import re

USERNAME = 'azureadmin'
PASSWORD = 'brudnyHarry!66'
# from Viator_AllLinks import main as main_alllinks
# from Viator_GetOperator import main as main_getoperator

In [3]:
import logging

class CustomLogger:
    def __init__(self, log_file='app.log'):
        # Create a custom logger
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.DEBUG)

        # Create handlers for writing to log file and console
        file_handler = logging.FileHandler(log_file)
        console_handler = logging.StreamHandler()

        # Set level for handlers
        file_handler.setLevel(logging.DEBUG)
        console_handler.setLevel(logging.INFO)

        # Create formatters and add it to handlers
        log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(log_format)
        console_handler.setFormatter(log_format)

        # Add handlers to the logger
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)

    # Define methods for info, done (treated as info), and error logging
    def logger_info(self, message):
        self.logger.info(message)

    def logger_done(self, message):
        self.logger.info(message)  # Treating 'done' as an info log

    def logger_err(self, message):
        self.logger.error(message)

# Initialize logger instance
logger = CustomLogger(log_file='upsert_process.log')



In [4]:
def clean_add_uid(df):
    df = df[df['Link'].str.strip().str.len() > 1]
    df = df.sort_values(by=['uid', 'Date input'], ascending=[True, False])
    # Drop duplicates to keep only the latest URL for each UID
    df = df.drop_duplicates(subset='uid')
    # Reset index for clarity
    df = df.reset_index(drop=True)
    return df


In [5]:
# Function to clean out non-UTF characters
def clean_text(text):
    try:
        # Remove non-ASCII characters or replace with a safe placeholder
        cleaned_text = re.sub(r'[^\x00-\x7F]+', '', text)
        return cleaned_text
    except TypeError:
        return text  # If it's not a string, return the original value



# Now the dataframe should be free of non-ASCII characters, and can be uploaded to SQL.
# upsert_df_to_sql_db(df_main, 'your_table_name')


In [6]:
def upsert_df_to_sql_db(path_df_main, database_name):


    # Log start of process
    logger.logger_info(f"Starting upsert process for file {path_df_main} for database {database_name}")

    df_main = pd.read_excel(path_df_main, engine='openpyxl')
    logger.logger_info(f"Loaded {len(df_main)} rows from {path_df_main}")

    # Apply this function to all string columns in the dataframe to clean non-ASCII characters
    for column in df_main.select_dtypes(include=['object']).columns:
        df_main[column] = df_main[column].apply(lambda x: clean_text(x) if isinstance(x, str) else x)
    logger.logger_info(f"Cleaned text data in dataframe.")

    # Fill missing values and filter data
    df_main['Reviews'] = df_main['Reviews'].fillna(0)
    df_main['Operator'] = df_main['Operator'].fillna('Error')
    df_main['Tytul'] = df_main['Tytul'].fillna('Error')
    df_main['Reviews'] = df_main['Reviews'].astype(str)
    df_main['Operator'] = df_main['Operator'].astype(str)
    df_main['Date input'] = df_main['Date input'].astype(str)
    df_main['Date update'] = df_main['Date update'].astype(str)
    df_main = df_main[df_main['City'].str.len() >= 3]
    logger.logger_info(f"Processed missing values and filtered cities.")

    # Determine table name based on file
    if 'GYG' in path_df_main:
        table_name = 'Operators_GYG'
        df_main = clean_add_uid(df_main)
    elif 'Musement' in path_df_main:
        table_name = 'Operators_Musement'
    elif 'Headout' in path_df_main:
        table_name = 'Operators_Headout'
    else:
        table_name = 'Operators_Viator'
    df_main = clean_add_uid(df_main)
    df_main = df_main.drop_duplicates(subset=['uid'])
    logger.logger_info(f"Using table {table_name} for upsert operation.")

    # Database connection settings
    server = 'sqlserver-myotas.database.windows.net'
    database = database_name
    driver = '{ODBC Driver 18 for SQL Server}'

    try:
        cnxn = pyodbc.connect(f'DRIVER={driver};SERVER=tcp:{server};PORT=1433;DATABASE={database};UID={USERNAME};PWD={PASSWORD}')
        logger.logger_info(f"Successfully connected to database {database}.")
    except Exception as e:
        logger.logger_err(f"Failed to connect to database: {str(e)}")
        return "Couldn't connect to database"

    cursor = cnxn.cursor()
    cursor.fast_executemany = True

    # Create table if it doesn't exist
    create_table_query = f"""
        IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='{table_name}' AND xtype='U')
        CREATE TABLE [dbo].[{table_name}] (
            [Tytul]       NVARCHAR (MAX) NULL,
            [Link]        NVARCHAR (MAX) NULL,
            [City]        NVARCHAR (255) NULL,
            [Operator]    NVARCHAR (MAX) NULL,
            [Reviews]     NVARCHAR (255) NULL,
            [Date input]  NVARCHAR (255) NULL,
            [Date update] NVARCHAR (255) NULL,
            [uid]         NVARCHAR (255) NOT NULL PRIMARY KEY
        );
    """
    logger.logger_info(f"Ensuring table {table_name} exists.")

    try:
        # cursor.execute(create_table_query)
        # cnxn.commit()
        logger.logger_done(f"Table {table_name} checked/created successfully.")
    except pyodbc.Error as e:
        logger.logger_err(f"Error creating table: {str(e)}")
        return "Table creation failed"

    # Upsert query
    merge_query = f"""
        MERGE [dbo].[{table_name}] AS target
        USING (VALUES (?, ?, ?, ?, ?, ?, ?, ?)) AS source ([Tytul], [Link], [City], [Operator], [Date input], [Date update], [uid], [Reviews])
        ON target.[uid] = source.[uid]
        WHEN MATCHED THEN
            UPDATE SET
                target.[Tytul] = source.[Tytul],
                target.[Link] = source.[Link],
                target.[City] = source.[City],
                target.[Operator] = source.[Operator],
                target.[Date input] = source.[Date input],
                target.[Date update] = source.[Date update],
                target.[Reviews] = source.[Reviews]
        WHEN NOT MATCHED THEN
            INSERT ([Tytul], [Link], [City], [Operator], [Date input], [Date update], [uid], [Reviews])
            VALUES (source.[Tytul], source.[Link], source.[City], source.[Operator], source.[Date input], source.[Date update], source.[uid], source.[Reviews]);
    """
    data_list = [tuple(row) for row in df_main.values]
    logger.logger_info(f"Preparing to upsert {len(data_list)} rows.")

    try:
        cursor.executemany(merge_query, data_list)
        cnxn.commit()
        logger.logger_done(f"Successfully upserted {len(data_list)} rows.")
    except pyodbc.Error as e:
        logger.logger_err(f"Data upsert failed: {str(e)}")

    cnxn.close()
    logger.logger_done(f"Database connection closed.")
    return f'Successfully upserted: {len(data_list)} rows to {table_name} table'


In [7]:
sites = ["GYG", "Musement", "Viator"]
for site in sites:
    file_manager = FilePathManager(site, 'N/A')
    file_path_xlsx_operator = file_manager.get_file_paths()['file_path_xlsx_operator']
    upsert_df_to_sql_db(file_path_xlsx_operator, 'OTAs')
    if site == "GYG":
        upsert_df_to_sql_db(file_path_xlsx_operator, 'db_ota_future_price')


2024-11-20 09:45:48,152 - INFO - Starting upsert process for file G:\.shortcut-targets-by-id\1ER8hilqZ2TuX2C34R3SMAtd1Xbk94LE2\MyOTAs\Pliki firmowe\Operators_GYG.xlsx for database OTAs
2024-11-20 09:45:54,064 - INFO - Loaded 64235 rows from G:\.shortcut-targets-by-id\1ER8hilqZ2TuX2C34R3SMAtd1Xbk94LE2\MyOTAs\Pliki firmowe\Operators_GYG.xlsx
2024-11-20 09:45:54,512 - INFO - Cleaned text data in dataframe.
2024-11-20 09:45:54,627 - INFO - Processed missing values and filtered cities.
2024-11-20 09:45:54,817 - INFO - Using table Operators_GYG for upsert operation.
2024-11-20 09:46:25,086 - ERROR - Failed to connect to database: ('08001', '[08001] [Microsoft][ODBC Driver 18 for SQL Server]TCP Provider: Timeout error [258].  (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 18 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 18 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 18 for SQL Server]Unable to complete login pr

In [7]:
### DEBUG IN CASE OF ERROR

In [8]:
# def upsert_df_to_sql_db(path_df_main, database_name, batch_size=1000):
#     import pandas as pd
#     import pyodbc

#     # Log start of process
#     logger.logger_info(f"Starting upsert process for file {path_df_main}")

#     df_main = pd.read_excel(path_df_main, engine='openpyxl')
#     logger.logger_info(f"Loaded {len(df_main)} rows from {path_df_main}")

#     # Apply this function to all string columns in the dataframe to clean non-ASCII characters
#     for column in df_main.select_dtypes(include=['object']).columns:
#         df_main[column] = df_main[column].apply(lambda x: clean_text(x) if isinstance(x, str) else x)
#     logger.logger_info(f"Cleaned text data in dataframe.")

#     # Fill missing values and filter data
#     df_main['Reviews'] = df_main['Reviews'].fillna(0)
#     df_main['Operator'] = df_main['Operator'].fillna('Error')
#     df_main['Tytul'] = df_main['Tytul'].fillna('Error')
#     df_main['Reviews'] = df_main['Reviews'].astype(str)
#     df_main['Operator'] = df_main['Operator'].astype(str)
#     df_main['Date input'] = df_main['Date input'].astype(str)
#     df_main['Date update'] = df_main['Date update'].astype(str)
#     df_main = df_main[df_main['City'].str.len() >= 3]
#     logger.logger_info(f"Processed missing values and filtered cities.")

#     # Determine table name based on file
#     if 'GYG' in path_df_main:
#         table_name = 'Operators_GYG'
#         df_main = clean_add_uid(df_main)
#     elif 'Musement' in path_df_main:
#         table_name = 'Operators_Musement'
#     elif 'Headout' in path_df_main:
#         table_name = 'Operators_Headout'
#     else:
#         table_name = 'Operators_Viator'
#     df_main = clean_add_uid(df_main)
#     df_main = df_main.drop_duplicates(subset=['uid'])
#     logger.logger_info(f"Using table {table_name} for upsert operation.")

#     # Database connection settings
#     server = 'sqlserver-myotas.database.windows.net'
#     database = database_name
#     driver = '{ODBC Driver 18 for SQL Server}'

#     try:
#         cnxn = pyodbc.connect(f'DRIVER={driver};SERVER=tcp:{server};PORT=1433;DATABASE={database};UID={USERNAME};PWD={PASSWORD}')
#         logger.logger_info(f"Successfully connected to database {database}.")
#     except Exception as e:
#         logger.logger_err(f"Failed to connect to database: {str(e)}")
#         return "Couldn't connect to database"

#     cursor = cnxn.cursor()
#     cursor.fast_executemany = True

#     # Create table if it doesn't exist
#     create_table_query = f"""
#         IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='{table_name}' AND xtype='U')
#         CREATE TABLE [dbo].[{table_name}] (
#             [Tytul]       NVARCHAR (MAX) NULL,
#             [Link]        NVARCHAR (MAX) NULL,
#             [City]        NVARCHAR (255) NULL,
#             [Operator]    NVARCHAR (MAX) NULL,
#             [Reviews]     NVARCHAR (255) NULL,
#             [Date input]  NVARCHAR (255) NULL,
#             [Date update] NVARCHAR (255) NULL,
#             [uid]         NVARCHAR (255) NOT NULL PRIMARY KEY
#         );
#     """
#     logger.logger_info(f"Ensuring table {table_name} exists.")
#     try:
#         cursor.execute(create_table_query)
#         cnxn.commit()
#         logger.logger_done(f"Table {table_name} checked/created successfully.")
#     except pyodbc.Error as e:
#         logger.logger_err(f"Error creating table: {str(e)}")
#         return "Table creation failed"

#     # Upsert query
#     merge_query = f"""
#         MERGE [dbo].[{table_name}] AS target
#         USING (VALUES (?, ?, ?, ?, ?, ?, ?, ?)) AS source ([Tytul], [Link], [City], [Operator], [Date input], [Date update], [uid], [Reviews])
#         ON target.[uid] = source.[uid]
#         WHEN MATCHED THEN
#             UPDATE SET
#                 target.[Tytul] = source.[Tytul],
#                 target.[Link] = source.[Link],
#                 target.[City] = source.[City],
#                 target.[Operator] = source.[Operator],
#                 target.[Date input] = source.[Date input],
#                 target.[Date update] = source.[Date update],
#                 target.[Reviews] = source.[Reviews]
#         WHEN NOT MATCHED THEN
#             INSERT ([Tytul], [Link], [City], [Operator], [Date input], [Date update], [uid], [Reviews])
#             VALUES (source.[Tytul], source.[Link], source.[City], source.[Operator], source.[Date input], source.[Date update], source.[uid], source.[Reviews]);
#     """
    
#     data_list = [tuple(row) for row in df_main.values]
#     logger.logger_info(f"Preparing to upsert {len(data_list)} rows in batches of {batch_size}.")

#     # Split data into batches
#     for i in range(0, len(data_list), batch_size):
#         batch = data_list[i:i + batch_size]
#         logger.logger_info(f"Processing batch {i // batch_size + 1} of {len(data_list) // batch_size + 1}")
        
#         try:
#             cursor.executemany(merge_query, batch)
#             cnxn.commit()
#             logger.logger_done(f"Successfully upserted batch {i // batch_size + 1}")
#         except pyodbc.Error as e:
#             logger.logger_err(f"Batch {i // batch_size + 1} failed with error: {str(e)}")
#             # Further process the failing batch row-by-row
#             for idx, row in enumerate(batch):
#                 try:
#                     cursor.execute(merge_query, row)
#                     cnxn.commit()
#                     logger.logger_done(f"Successfully upserted row {i + idx + 1} from batch {i // batch_size + 1}")
#                 except pyodbc.Error as e:
#                     logger.logger_err(f"Row {i + idx + 1} failed: {row} with error: {str(e)}")

#     cnxn.close()
#     logger.logger_done(f"Database connection closed.")
#     return f'Successfully upserted rows to {table_name} table'


In [9]:
# sites = ["Viator"]
# for site in sites:
#     file_manager = common_functions.FilePathManager(site, 'N/A')
#     file_path_xlsx_operator = file_manager.get_file_paths()['file_path_xlsx_operator']
#     upsert_df_to_sql_db(file_path_xlsx_operator, 'OTAs')
#     if site == "GYG":
#         upsert_df_to_sql_db(file_path_xlsx_operator, 'db_ota_future_price')
