In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
import pymysql

In [11]:
connection = pymysql.connect(
        host='localhost',
        user='root',
        password='root',
        database='main_database')

In [12]:
spark = SparkSession.builder.appName("field_mapping").config("spark.jars", "C:\spark-3.5.1-bin-hadoop3\jars\mysql-connector-j-8.4.0.jar").getOrCreate()

In [13]:
url = "jdbc:mysql://localhost:3306/main_database"
properties = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.jdbc.Driver"
}

In [14]:
def sql_table_updater(index,interval_by):
    try:
        with connection.cursor() as cursor:
            try:
                exec_date = f'update `main_database`.cf_etl_table set execution_date = current_timestamp where id = {index + 1}'
                cursor.execute(exec_date)
            except Exception as e:
                print(f"Error updating execution date: {e}")
                return

            try:
                start_date = f'update `main_database`.cf_etl_table set start_date_time = date_add(start_date_time, interval {interval_by} day)'
                cursor.execute(start_date)
            except Exception as e:
                print(f"Error updating start date: {e}")
                return

            try:
                end_date = f'update `main_database`.cf_etl_table set end_date_time = date_add(end_date_time, interval {interval_by} day)'
                cursor.execute(end_date)
            except Exception as e:
                print(f"Error updating end date: {e}")
                return

            try:
                connection.commit()
            except Exception as e:
                print(f"Error committing transaction: {e}")
                connection.rollback()
    except Exception as e:
        print(f"Error in SQL updater: {e}")

In [15]:
def mapping(url,table_name,properties):
    df = spark.read.jdbc(url=url, table=table_name, properties=properties)
    rows = df.collect()
    for row in rows:
        is_inc = row['is_incremental']
        partition_by = row['partition_by']
        interval_by = row['interval_days']
        id,location,hdfs_file_name,inc_field,database_name,table_name =row['id'],row['location'],row['hdfs_file_name'],row['inc_field'],row['Schema_names'],row['Table_names']
        start_date,end_date= row['start_date_time'],row['end_date_time']
        hdfs_path = f'{location}{hdfs_file_name}'
        cursor = connection.cursor()
        cursor.callproc('main_database.executor_mappings',[id,database_name,table_name,is_inc,inc_field,start_date,end_date])
        result = cursor.fetchall()
        column_names = [desc[0] for desc in cursor.description]

        df = spark.createDataFrame(result, schema=column_names)
        if is_inc:
            df.write.mode('append').parquet(hdfs_path,partitionBy=[partition_by]) 
            sql_table_updater(rows.index(row),interval_by)
        else:
            df.write.mode('overwrite').parquet(hdfs_path,partitionBy = [partition_by])  

In [16]:
mapping(url,'cf_etl_table',properties)

In [17]:
data = spark.read.parquet('hdfs://localhost:19000/airflow/sample_data')

In [18]:
data.show()

+--------------+-------------------+----------+--------+----------+
|Transaction_id|   Transaction_date|Account_id|Products|     Dates|
+--------------+-------------------+----------+--------+----------+
|             1|2023-06-23 10:30:00|    ACC001|ProductA|2023-06-23|
|             2|2023-06-24 11:00:00|    ACC002|ProductB|2023-06-24|
|             3|2023-06-25 09:45:00|    ACC003|ProductC|2023-06-25|
+--------------+-------------------+----------+--------+----------+

