In [7]:
import os
import logging
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, row_number, date_format
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
from pyspark.sql.window import Window
from pyspark import SparkConf
import os
import uuid


# define the schema for incoming data
SCHEMA = StructType(
[
    StructField('id', StringType(), True), 
    StructField('name', StringType(), True),
    StructField('age', StringType(), True), 
]
)

def add_fields(df):
    # Add batch_id and current timestamp columns
    df = df.withColumn("timestamp", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("batch_uuid", lit(uuid.uuid4().hex))
    return df
    
def start_spark_history_server(log_dir,event_dir):
    """
    Launches a Spark History Server and configures it to read logs from the specified directory.
    """
    # Set Spark configuration
    conf = SparkConf().setAppName("SparkHistoryServer").set("spark.eventLog.enabled", "true") \
                      .set("spark.eventLog.dir", event_dir).set("spark.history.fs.logDirectory", log_dir)

    # Create a SparkSession
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

    # Start Spark History Server
    os.system(f"nohup spark-submit --class org.apache.spark.deploy.history.HistoryServer \
            $SPARK_HOME/jars/spark-*.jar > /dev/null 2>&1 &")

    return spark
    
class IngestionJob:
    def __init__(self, spark, log_file):
        self.spark = spark

        # Initialize logger
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)

        # Set up file handler
        file_handler = logging.FileHandler(log_file)

        # Set log message format
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
        file_handler.setFormatter(formatter)

        # Add file handler to logger
        self.logger.addHandler(file_handler)
        
    def read_csv(self, SCHEMA, file_path):

    # read each csv file into a dataframe. We will consider all files are without header
        df = self.spark\
        .read.format("csv")\
        .option("delimiter", "|")\
        .option("header", "false")\
        .option("encoding", "ISO-8859-1")\
        .schema(SCHEMA)\
        .load(file_path)

    # now if any files happens to have a header then we can just remove that header line
        row1 = [i for i in df.head(1)[0].asDict().values()] # get first row
        schema_list = [(x.name) for x in SCHEMA.fields] # get schema as list
        
        if row1 == schema_list: # if first row is the schema then remove that row
            row1 = df.limit(1)
            df = df.subtract(row1)   
        
        print(file_path)
        self.logger.info(f"Read CSV file with {df.count()} rows from {file_path}")

        return df
        

    

    def ingest_data(self, df, output_path):
        
        # Write to Delta Lake with append mode and partition by batch_id and timestamp
        output_table = f"{output_path}"
        df.write.format("delta").mode("append").save(output_table)
        self.logger.info(f"Added batch_id and timestamp columns to DataFrame")
        return df
        
    def ingest_csv_to_deltalake(self, file_path, output_path):
    # define the schema for incoming data
        SCHEMA = StructType(
        [
            StructField('id', StringType(), True), 
            StructField('name', StringType(), True),
            StructField('age', StringType(), True), 
        ]
        )
    # read each csv file into a dataframe. We will consider all files are without header
        df = self.spark\
        .read.format("csv")\
        .option("delimiter", "|")\
        .option("header", "false")\
        .option("encoding", "ISO-8859-1")\
        .schema(SCHEMA)\
        .load(file_path)
    
    # now if any files happens to have a header then we can just remove that header line
        row1 = [i for i in df.head(1)[0].asDict().values()] # get first row
        schema_list = [(x.name) for x in SCHEMA.fields] # get schema as list
        
        if row1 == schema_list: # if first row is the schema then remove that row
            row1 = df.limit(1)
            df = df.subtract(row1)
            
        print(file_path)
        self.logger.info(f"Read CSV file with {df.count()} rows from {file_path}")

        # Add batch_id and current timestamp columns
        df = df.withColumn("timestamp", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn("batch_uuid", lit(uuid.uuid4().hex))
        self.logger.info(f"Added batch_id and timestamp columns to DataFrame")
        
        

        # Write to Delta Lake with append mode and partition by batch_id and timestamp
        output_table = f"{output_path}"
        df.write.format("delta").mode("append").save(output_table)
        self.logger.info(f"Wrote {df.count()} rows to Delta Lake at {output_table}")

if __name__ == "__main__":
    # Initialize SparkSession
    spark = SparkSession.builder.appName("IngestionJob").getOrCreate()

    # Parse arguments
    # parser = argparse.ArgumentParser(description='Ingest CSV files into Delta Lake')
    # parser.add_argument("--data_path", help="Path to csv files", required=True)
    # parser.add_argument('--output_path', type=str, default='delta', help='Output path for Delta Lake table')
    # parser.add_argument('--log_file', type=str, default='ingestion.log', help='Log file path')
    # parser.add_argument('--event_dir', type=str, default='/events', help='Event directory')
    # args = parser.parse_args()
    data_path = '/data'
    output_path = '/output/delta'
    log_file = '/logs/ingestion.log'
    event_dir = '/data/events'

    # create empty dataframe
    # Create an empty RDD
    emp_RDD = spark.sparkContext.emptyRDD()
     
    # Create empty schema
    columns = StructType([])
     
    # Create an empty RDD with empty schema
    final_df = spark.createDataFrame(data = emp_RDD,
                                 schema = SCHEMA)
    # Initialize IngestionJob
    job = IngestionJob(spark, log_file)

    files = os.listdir(data_path)
    print(files)
    file_paths = [file for file in files if file.endswith('.csv')]
    # Process each CSV file
    for file_path in file_paths:
        df = job.read_csv(SCHEMA, data_path + "/" + file_path)
        final_df = final_df.union(df)
    
    
    final_df = add_fields(final_df)
    print(final_df.printSchema())
    # job.ingest_csv_to_deltalake(args.data_path + "/" + file_path, args.output_path)
    # job.ingest_data(final_df, output_path)
    print(final_df.limit(1))
    # Stop SparkSession
    # spark.stop()
    

['file1.csv']
/data/file1.csv
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- timestamp: string (nullable = false)
 |-- batch_uuid: string (nullable = false)

None
DataFrame[id: string, name: string, age: string, timestamp: string, batch_uuid: string]


In [8]:
final_df.show()

+---+-------+---+-------------------+--------------------+
| id|   name|age|          timestamp|          batch_uuid|
+---+-------+---+-------------------+--------------------+
|  1|  Alice| 25|2023-05-11 11:17:23|f8c56d8a8b6e4e1d9...|
|  2|    Bob| 30|2023-05-11 11:17:23|f8c56d8a8b6e4e1d9...|
|  3|Charlie| 35|2023-05-11 11:17:23|f8c56d8a8b6e4e1d9...|
+---+-------+---+-------------------+--------------------+



In [15]:
def get_uuid():
    return '8377060215b0443b87041c799c413636'
def get_cur_ts():
    return '2023-05-11 11:17:236'
print(get_uuid())
print(get_cur_ts())

8377060215b0443b87041c799c413636
2023-05-11 11:17:236


In [21]:
import unittest

from unittest import mock
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit, current_timestamp, row_number, date_format
from datetime import datetime
from pyspark.sql import SparkSession
from freezegun import freeze_time
import uuid
import os
# import nbimporter

def add_fields(df):
    # Add batch_id and current timestamp columns
    df = df.withColumn("timestamp", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("batch_uuid", lit(uuid.uuid4().hex))
    return df


class SparkETLTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.spark = (SparkSession
                     .builder
                     .master("local[*]")
                     .appName("Unit-tests")
                     .getOrCreate())

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()
        
    def get_uuid():
        return uuid.uuid4().hex

    @mock.patch("add_fields.uuid.uuid4")
    @freeze_time("2023-01-01")
    def test_etl(self, mock_uuid):
        mock_uuid.return_value  = '8377060215b0443b87041c799c413636'
        input_schema = StructType(
        [
            StructField('id', StringType(), True), 
            StructField('name', StringType(), True),
            StructField('age', StringType(), True), 
        ]
        )

        #1. Prepare an input data frame that mimics our source data.
        input_data = [(1, "Steve", 35)]
        input_df = self.spark.createDataFrame(data=input_data, schema=input_schema)

        #2. Prepare an expected data frame which is the output that we expect.    
        expected_schema = StructType([
            StructField('id', StringType(), True), 
            StructField('name', StringType(), True),
            StructField('age', StringType(), True), 
            StructField('timestamp', StringType(), True), 
            StructField('batch_uuid', StringType(), True), 
                ])
        expected_data = [(1, "Steve", 35, '2023-05-11 11:17:236', '8377060215b0443b87041c799c413636') ]
        expected_df = self.spark.createDataFrame(data=expected_data, schema=expected_schema)
        
        #3. Apply our transformation to the input data frame
        output_df = add_fields(input_df)               
        
        #4. Assert the output of the transformation to the expected data frame.
        self.assertEqual(sorted(expected_df.collect()), sorted(output_df.collect()))

if __name__ == '__main__':
    main = SparkETLTestCase()

    # This executes the unit test/(itself)
    import sys
    suite = unittest.TestLoader().loadTestsFromTestCase(SparkETLTestCase)
    unittest.TextTestRunner(verbosity=4,stream=sys.stderr).run(suite)      

test_etl (__main__.SparkETLTestCase) ... ERROR

ERROR: test_etl (__main__.SparkETLTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/unittest/mock.py", line 1376, in patched
    with self.decoration_helper(patched,
  File "/opt/conda/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/opt/conda/lib/python3.10/unittest/mock.py", line 1358, in decoration_helper
    arg = exit_stack.enter_context(patching)
  File "/opt/conda/lib/python3.10/contextlib.py", line 492, in enter_context
    result = _cm_type.__enter__(cm)
  File "/opt/conda/lib/python3.10/unittest/mock.py", line 1431, in __enter__
    self.target = self.getter()
  File "/opt/conda/lib/python3.10/unittest/mock.py", line 1618, in <lambda>
    getter = lambda: _importer(target)
  File "/opt/conda/lib/python3.10/unittest/mock.py", line 1257, in _importer
    thing = __import__(import_path)
ModuleN

In [10]:
from home.joyan.code.prog import add_fields

ModuleNotFoundError: No module named 'home'

In [20]:
import datetime as dt
from unittest import mock

with mock.patch.object(dt.date, "today", return_value=dt.date(2020, 2, 29)):
    print(dt.date.today())

TypeError: cannot set 'today' attribute of immutable type 'datetime.date'