In [1]:
from sshtunnel import SSHTunnelForwarder
import pandas as pd
import pymysql
from datetime import datetime, timedelta
import traceback
from dateutil import relativedelta
import json as json 


a_ssh_host = X
a_ssh_user = X
a_ssh_port = X
a_ssh_private_key = X
a_sql_hostname = X
a_sql_username = X
a_sql_password = X
a_sql_database = X
a_sql_port = X
 
b_ssh_host = X
b_ssh_user = X
b_ssh_port = X
b_ssh_private_key = X
b_sql_hostname = X
b_sql_username = X
b_sql_password = X
b_sql_database = X
b_sql_port = X

def query_data(ssh_host, ssh_user, ssh_port, ssh_private_key, sql_hostname, sql_username, sql_password, sql_database, sql_port, query):
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_username=ssh_user,
            ssh_pkey=ssh_private_key,
            remote_bind_address=(sql_hostname, sql_port)) as tunnel:
        conn = pymysql.connect(
            host='`x`',
            user=sql_username,
            passwd=sql_password,
            db=sql_database,
            port=tunnel.local_bind_port
        )
        data = pd.read_sql_query(query, conn)
        conn.close()
    return data


In [2]:
import logging
from pymysql import IntegrityError, OperationalError
from sshtunnel import SSHTunnelForwarder
import pymysql
import datetime


# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)  # Reduce logging level to minimize overhead

# File handler to log detailed debug info
file_handler = logging.FileHandler('debug.log')
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)

# Console handler to log only errors or higher
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR)
console_formatter = logging.Formatter('%(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)

# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# Define the batch size
BATCH_SIZE = 1000  # Adjust the batch size based on your needs

def chunker(seq, size):
    """Generator to divide data into chunks."""
    for pos in range(0, len(seq), size):
        yield seq[pos:pos + size]

try:
    with SSHTunnelForwarder(
            (b_ssh_host, b_ssh_port),
            ssh_username=b_ssh_user,
            ssh_pkey=b_ssh_private_key,
            remote_bind_address=(b_sql_hostname, b_sql_port)) as tunnel:
        
        logger.info("SSH Tunnel established successfully.")
        
        try:
            b_conn = pymysql.connect(
                host='127.0.0.1',
                user=b_sql_username,
                passwd=b_sql_password,
                db=b_sql_database,
                port=tunnel.local_bind_port
            )
            logger.info("Database connection established successfully.")
            b_cursor = b_conn.cursor()

            try:
                query_1 = '''SELECT f.settlement, settlement_time, fc.maturity, JSON_UNQUOTE(JSON_EXTRACT(ds.properties, '$.financial_product_id')) AS financial_product_id,
                            JSON_UNQUOTE(JSON_EXTRACT(ds.properties, '$.exchange_id')) AS exchange_id
                                    FROM futures AS f
                                    JOIN data_series_new AS ds ON f.data_series_id = ds.id
                                    JOIN futures_contracts AS fc ON fc.id = JSON_UNQUOTE(JSON_EXTRACT(ds.properties, '$.futures_contract_id'))
                                    WHERE JSON_UNQUOTE(JSON_EXTRACT(ds.properties, '$.financial_product_id')) IN (77,89,3,76,53,55,69)
                                    AND fc.maturity >= DATE(CONCAT(YEAR(CURDATE()), '-', LPAD(MONTH(CURDATE()), 2, '0'), '-01'))
                                    AND settlement IS NOT NULL
                                    AND fc.maturity <= DATE(CONCAT(YEAR(DATE_ADD(CURDATE(), INTERVAL 2 MONTH)), '-', LPAD(MONTH(DATE_ADD(CURDATE(), INTERVAL 2 MONTH)), 2, '0'), '-01'))
                            '''
                
                logger.info(f"Executing query: {query_1}")
                results = query_data(a_ssh_host, a_ssh_user, a_ssh_port, a_ssh_private_key,
                                     a_sql_hostname, a_sql_username, a_sql_password, a_sql_database, a_sql_port, query_1)
                logger.info(f"Query executed successfully, retrieved {len(results)} rows.")

                inserting_query = '''INSERT IGNORE INTO futures_prices
                                     (exchange_id, maturity_date, financial_product_id,currency,unit,settlement, settlement_date, source_id)
                                     VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'''
                
                results['settlement_time'] = pd.to_datetime(results['settlement_time'])   
                results['settlement_date'] = results['settlement_time'].dt.date
                results = results.sort_values(by=['financial_product_id', 'settlement_date', 'settlement_time'], ascending=[True, True, False])

                results_modified = results.drop_duplicates(subset=['financial_product_id', 'settlement_date'])

                currency_unit_mapping = {
                        '77': ('EUR', 'mt'),
                        '89': ('USD', 'mt'),
                        '3': ('USD', 'barrel'),
                        '76': ('MYR', 'mt'),
                        '53': ('USD cents', 'bushel'),
                        '55': ('USD cents', 'lb')
                    }

                source_id = 1


            

                values = [
                            (
                                item['exchange_id'],
                                item['maturity'],
                                item['financial_product_id'],
                                currency_unit_mapping.get(item['financial_product_id'], ('', ''))[0],  # currency
                                currency_unit_mapping.get(item['financial_product_id'], ('', ''))[1],  # unit
                                item['settlement'],
                                item['settlement_date'],
                                source_id
                            )
                            for index, item in results_modified.iterrows()
                        ]


                # Insert in batches
                for i, chunk in enumerate(chunker(values, BATCH_SIZE)):
                    logger.info(f"Inserting batch {i + 1} of {len(values) // BATCH_SIZE + 1}")
                    b_cursor.executemany(inserting_query, chunk)
                    b_conn.commit()
                    logger.info(f"Batch {i + 1} committed successfully.")

            except IntegrityError as ie:
                logger.error(f"Integrity error occurred: {ie}")
                b_conn.rollback()
                logger.info("Transaction rolled back due to IntegrityError.")

            except Exception as e:
                logger.error(f"An unexpected error occurred during query execution: {e}")
                b_conn.rollback()
                logger.info("Transaction rolled back due to an unexpected error.")

            finally:
                b_cursor.close()
                logger.info("Cursor closed.")

        except OperationalError as oe:
            logger.error(f"Operational error occurred: {oe}")

        finally:
            b_conn.close()
            logger.info("Database connection closed.")

except Exception as e:
    logger.critical(f"Critical error in establishing SSH Tunnel: {e}")
