In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pymysql

spark = SparkSession.builder.appName("cf_etl").getOrCreate()

### MySQL and Spark Connection

In [4]:
import os
from dotenv import load_dotenv

load_dotenv()

DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')

def table_df(schema_name, table_name):
    url = f"jdbc:mysql://{DB_HOST}:{DB_PORT}/{schema_name}"
    properties = {
        "user": DB_USERNAME,
        "password": DB_PASSWORD,
        "driver": "com.mysql.cj.jdbc.Driver"
    }
    df = spark.read.jdbc(url=url, table=table_name, properties=properties)
    return df

### Update Config Table

In [3]:
def update_date_on_config_table(schema_name, table_name, index, interval_period):

    pymysql_connection = pymysql.connect(
        host= DB_HOST,
        user= DB_USERNAME,
        password=DB_PASSWORD,
        database= schema_name
    )

    with pymysql_connection.cursor() as cursor:
        exec_date_query = f"update `{schema_name}`.{table_name} set execution_date = (current_timestamp) where table_id = {index+1}"
        cursor.execute(exec_date_query)

        update_startdate_query = f"update `{schema_name}`.{table_name}  set start_date_time = date_add(start_date_time, interval {interval_period} day)"
        cursor.execute(update_startdate_query)

        update_enddate_query = f"update `{schema_name}`.{table_name}  set end_date_time = date_add(end_date_time, interval {interval_period} day)"
        cursor.execute(update_enddate_query)
        
        pymysql_connection.commit()

In [5]:
df = table_df('config_db','cf_etl_table')

In [6]:
df.show()

+--------+--------------+-------------------+--------------------+-------------------+--------------+----------------+-------------------+-------------------+--------------+---------------+---------------+
|table_id|   schema_name|         table_name|hdfs_upload_location|     hdfs_file_name|is_incremental|       inc_field|    start_date_time|      end_date_time|execution_date|interval_period|   partition_by|
+--------+--------------+-------------------+--------------------+-------------------+--------------+----------------+-------------------+-------------------+--------------+---------------+---------------+
|       1|transaction_db|        transaction|hdfs://localhost:...|        transaction|          true|transaction_date|2023-06-25 00:00:00|2023-06-25 23:59:59|    2024-06-25|              1|trans_date_only|
|       2|transaction_db|transaction_non_inc|hdfs://localhost:...|transaction_non_inc|         false|            NULL|               NULL|               NULL|          NULL|   

### Retrive Field Mapped Table 

In [6]:
def field_mapped_df(cf_db, schema_name, table_name, table_id):

    con = pymysql.connect(
        host= DB_HOST,
        user= DB_USERNAME,
        password=DB_PASSWORD,
        database= cf_db
    )

    with con.cursor() as cursor:
        cursor.callproc(f'{cf_db}.sp_field_mapping', [schema_name, table_name, table_id])
        result = cursor.fetchall()
        fields = [desc[0] for desc in cursor.description]   
        df = spark.createDataFrame(result, fields)    
        con.commit()
    
    return df

### Uploading File to HDFS

In [7]:
def upload():

    df = table_df('config_db','cf_etl_table')

    for i, row in zip(range(df.count()), df.collect()):
        is_incremental, table_id, schema, table, location, hdfs_file = row['is_incremental'], row['table_id'], row['schema_name'], row['table_name'], row['hdfs_upload_location'], row['hdfs_file_name'] 
        hdfs_path = f"{location}{hdfs_file}"

        field_mapped_table = field_mapped_df('config_db', schema, table, table_id)
        
        if is_incremental:
            start_date, end_date, date_col, interval_period, partition_by = row['start_date_time'], row['end_date_time'], row['inc_field'], row['interval_period'], row['partition_by']

            field_mapped_table.createOrReplaceTempView("incremental_table")

            result = spark.sql(f"SELECT * FROM incremental_table WHERE {date_col} BETWEEN '{start_date}' AND '{end_date}'")
            result.write.mode('append').parquet(hdfs_path, partitionBy = partition_by)
            
            update_date_on_config_table('config_db', 'cf_etl_table', i, interval_period)

        elif not is_incremental:
            field_mapped_table.write.mode("overwrite").parquet(hdfs_path)

In [8]:
upload()

In [5]:
# def get_column_value(df, index, col):
#     """
#     This function retrieves a value from a specified column in a DataFrame at a given index.
    
#     """
#     col_values = df.select(col).collect()
#     value = col_values[index][col]
#     return value

# def upload():

#     df = table_df('config_db','cf_etl_table')

#     for i in range(df.count()):
        
#         is_incremental = get_column_value(df, i, 'is_incremental')
#         schema = get_column_value(df, i, 'schema_name')
#         table = get_column_value(df, i, 'table_name')
#         location = get_column_value(df, i, 'hdfs_upload_location')
#         hdfs_file = get_column_value(df, i, 'hdfs_file_name')
#         hdfs_path = f"{location}{hdfs_file}"

#         if is_incremental:
#             start_date = get_column_value(df, i, 'start_date_time')
#             end_date = get_column_value(df, i, 'end_date_time')
#             date_col = get_column_value(df, i, 'inc_field')      
 
#             query = f"(SELECT * FROM {schema}.{table} WHERE {date_col} BETWEEN '{start_date}' AND '{end_date}') AS sql_query"
#             result = table_df(schema, query)
#             result.write.mode('append').parquet(hdfs_path)
            
#             update_date_on_config_table('config_db', 'cf_etl_table', i)

#         elif not is_incremental:
#             result = table_df(schema, table)
#             result.write.mode("overwrite").parquet(hdfs_path)            


### Reading Parquet File

In [9]:
new_df = spark.read.parquet('hdfs://localhost:19000//mydir/transaction')

In [10]:
new_df.show()

+--------------+-------------------+----------+------------+---------------+
|transaction_id|   transaction_date|account_id|product_name|trans_date_only|
+--------------+-------------------+----------+------------+---------------+
|             1|2023-06-23 10:30:00|    ACC001|    ProductA|     2023-06-23|
+--------------+-------------------+----------+------------+---------------+



### See if Hadoop File Exists

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName('Incremental Load') \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:19000") \
    .getOrCreate()
    
hadoop_conf = spark._jsc.hadoopConfiguration()
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)

hdfs_path = "hdfs://localhost:19000//mydir/transaction"

path = spark._jvm.org.apache.hadoop.fs.Path(hdfs_path)
if fs.exists(path):
    print('file exists')
else:
    print('file doesn\'t exist')

file exists
