In [1]:
# Creating a Spark Session Block

import logging
import os
import time
from typing import Any, Callable
from functools import wraps

import findspark
import mysql.connector
import pyspark
from dotenv import load_dotenv, find_dotenv
from mysql.connector import Error
from pyspark.sql import SparkSession


findspark.init('/home/su/spark-3.5.1-bin-hadoop3')


#Creating a Spark Session.
spark = (SparkSession
 .builder
 .appName('pyspark_example')
 .config("spark.driver.extraClassPath", "/home/su/spark_mirror/mysql-connector-j-8.3.0.jar")
 .getOrCreate())


24/04/01 13:58:24 WARN Utils: Your hostname, NB99-186 resolves to a loopback address: 127.0.1.1; using 192.168.56.1 instead (on interface eth3)
24/04/01 13:58:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/01 13:58:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Creating decorators block

def execution_time(func: Callable) -> Callable:
    """
    Decorator function to measure the execution time of a given function.
    
    :param func: Callable - The function to be measured for execution time.
    :return Callable: The wrapped function with execution time logging.
    """
    @wraps(func)
    def wrapper(*args: str, **kwargs: int) -> None:
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        logger.info(f'{args[2]} delta from table {args[1]} was executed for {execution_time}.',
                    extra={'table_name': f'{args[1]}', 'delta_name': f'{args[2]}'})
        return result
    return wrapper


def error_log(func: Callable) -> Callable:
    """
    Decorator function for handling errors and logging exceptions.
    
    :param func: Callable - The function to be wrapped for error handling.
    :return Callable: The wrapped function with error logging capabilities.
    """
    @wraps(func)
    def wrapper(*args, **kwargs) -> None:
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f'Error in function {func.__name__}: {e}',
                         extra={'table_name': f'{args[1]}', 'delta_name': f'{args[2]}'})
    return wrapper


In [3]:
# The setting up logging block

load_dotenv(find_dotenv())
db_config = {
    'host': os.getenv("DB_HOST"),
    'database': os.getenv("DB_NAME"),
    'user': os.getenv("DB_USER"),
    'password': os.getenv("DB_PASSWORD")
}


class MirrorLogger(logging.Handler):
    """
    Class for setting up logging to a MySQL database.
    """
    def __init__(self):
        logging.Handler.__init__(self)

    def emit(self, record):
        try:
            level_value = record.levelname  # Getting the level of logging.
            message_value = self.format(record)  # Getting a log message.
            time_value = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(record.created)) # Getting the time of the action.
            filename_value = record.filename # Getting the name of the file.
            
            connection = mysql.connector.connect(**db_config)
            cursor = connection.cursor()
            cursor.execute('INSERT INTO mirror_log (time, level, message, filename, table_name, delta_name) VALUES (%s, %s, %s, %s, %s, %s)',
                           (time_value, level_value, message_value, filename_value, record.table_name, record.delta_name))
            connection.commit()
            if connection.is_connected():
                cursor.close()
                connection.close()
        except Error as e:
            print(f'Error while commecting to MySQL: {e}')


# Setting up the level of logging.
logging.basicConfig(
    level=logging.INFO,
    filemode='a',
    format='%(asctime)s | %(levelname)s | %(message)s | %(filename)s'
)

# Creating a log.
logger = logging.getLogger(__name__)

# Creating a log handler.
db_handler = MirrorLogger()
logger.addHandler(db_handler)


In [4]:
# The block for creating auxiliary functions

def create_logdb() -> None:
    """
    Function to create a database and a table for logging in MySQL.

    :return: None
    """
    try:
        connection = mysql.connector.connect(
                host=db_config.get('host'),
                user=db_config.get('user'),
                password=db_config.get('password')
        )
        cursor = connection.cursor()
        cursor.execute('CREATE DATABASE IF NOT EXISTS logs')
        cursor.execute('USE logs')
    
        cursor.execute('CREATE TABLE IF NOT EXISTS mirror_log ('
                       'time DATETIME, '
                       'level VARCHAR(50), '
                       'message VARCHAR(300), '
                       'filename VARCHAR(100), '
                       'table_name VARCHAR(100), '
                       'delta_name INT)'
                       )
        cursor.close()
        connection.close()
    except Exception as e:
        print('Unable to create a database for logs.')
    


@error_log
def get_log() -> list[tuple[Any, ...]]:
    """
    Function to retrieve log data from a MySQL database.
    
    :return list[tuple[Any, ...]]: A list of tuples representing log data.
    """
    log = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/logs") \
    .option("dbtable", "mirror_log") \
    .option("user", "user1") \
    .option("password", "12345678") \
    .load()
    
    return log


@error_log
def get_mirror(db_path: str) -> list[tuple[Any, ...]]:
    """
    Function to retrieve mirror data from a CSV file.

    :param db_path: str - The path of the table for which delta is being created.
    :return list[tuple[Any, ...]]: A list of tuples representing mirror data.
    """
    mirror = (
        spark.read.format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ";")
        .load(f"{db_path}/mirr_md_account_d")
    )

    return mirror


@execution_time
@error_log
def get_delta(db_path: str, table_name: str, delta_name: int, keys: list[str]) -> None:
    """
    The main function for creating a delta between mirror and delta data.

    :param db_path: str - The path of the table for which delta is being created.
    :param table_name: str - The name of the table for which delta is being created.
    :param delta_name: int - The specific delta file to process.
    :param key: list[str] - The list of keys used for joining mirror and delta data.
    :return: None
    """
    logger.info(
            f'Starting loading the {delta_name} delta.',
            extra={'table_name': f'{table_name}', 'delta_name': f'{delta_name}'}
        )
    # Loading mirror and delta dataframes
    df_mirror = get_mirror(db_path)
    df_delta = (
        spark.read.format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ";")
        .load(f"{db_path}/{table_name}/{delta_name}")
    )
    
    # Creating temporary views for mirror and delta dataframes
    df_mirror.createOrReplaceTempView("mirror")
    df_delta.createOrReplaceTempView("delta")

    # Getting the list of fields for mirror and delta dataframes
    mirror_fields = df_mirror.schema.names
    delta_fields = df_delta.schema.names

    # Creating field strings for coalescing data from mirror and delta
    field_strings = [
        f"coalesce(d.{delta_field}, m.{mirror_field}) as {delta_field}" for delta_field, mirror_field in zip(
            delta_fields, mirror_fields
        )
    ]
    selected_fields = ', '.join(field_strings)
    
    # Creating key conditions for joining mirror and delta data
    key_conditions = ' AND '.join([f"m.{key} = d.{key}" for key in keys])
    
    # Executing SQL query to select fields and join mirror and delta data
    mirror = spark.sql(f"""
    SELECT
        {selected_fields}
    FROM 
    mirror m FULL JOIN delta d ON {key_conditions}
    """)
    
    # Show the first 5 rows of the resulting dataframe
    mirror.show(5)

    # Writing a result.
    path = f'{db_path}/mirr_md_account_d'
    mirror.write.csv(path, mode='overwrite', header='True', sep=';')
    logger.info(
            f'Finishing loading the {delta_name} delta.',
            extra={'table_name': f'{table_name}', 'delta_name': f'{delta_name}'}
        )


In [5]:
# The main function block

@error_log
def merge_delta(db_path: str, table_name: str, keys: list[str]) -> None:
    """
    The function manages the merging of delta files for a specified table.

    :param db_path: str - The path of the table for which delta is being created.
    :param table_name: str - The name of the table to merge delta files for.
    :param key: list[str] - The list of keys used for joining mirror and delta data.
    :return: None
    """
    # Creating database for logs.
    create_logdb()
    # Retrieving log data and create a temporary view
    log_check = get_log()
    log_check.createOrReplaceTempView("log")

    #Setting up directory path and list files
    directory = f"{db_path}/{table_name}/"
    files = list(map(int, os.listdir(directory)))

    # Determining delta files to process
    dc_querry = "select delta_name from log"
    delta_check = list((set(files) - set(
        [row[0] for row in (spark.sql(dc_querry).collect())]))
                      )
    i = delta_check[0] if delta_check else 1000

    # Checking if log table is empty and create mirror if needed
    if (spark.sql("select count(*) from log").collect()[0][0]) == 0:
        logger.info(
            'Srarting loading the mirror.',
            extra={'table_name': f'{table_name}', 'delta_name': f'{i}'}
        )
        df = (
            spark.read.format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .load(f"{db_path}/{table_name}/1000")
        )

        # Writing mirror data and log the action
        path = f'{db_path}/mirr_md_account_d'
        df.write.csv(path, mode='overwrite', header='True', sep=';')
        logger.info(
            'Finishing loading the mirror.',
            extra={'table_name': f'{table_name}', 'delta_name': f'{i}'}
        )
        logger.info(
            'Mirror was created.',
            extra={'table_name': f'{table_name}', 'delta_name': f'{i}'}
        )
        df.show(5)
        i += 1

    # Processing delta files one by one
    dn_querry = "select delta_name from log"
    while i in files and i not in [row[0] for row in (spark.sql(dn_querry).collect())]:
        get_delta(db_path, table_name, i, keys)
        logger.info(
            f'{i} delta was merged.',
            extra={'table_name': f'{table_name}', 'delta_name': f'{i}'}
        )
        i += 1

merge_delta('/home/su/spark_mirror', 'data_deltas', ['account_rk'])


                                                                                