In [0]:
%pip install -U nutter chispa 

In [0]:
%load_ext autoreload

In [0]:
%autoreload 2

In [0]:
%run /Repos/potturi.tulasiram@diggibyte.com/cue-box/cue_box/utility/functions

In [0]:
from runtime.nutterfixture import NutterFixture, tag
from chispa.dataframe_comparer import *

In [0]:
my_schema = StructType([
                        StructField('user_id', LongType(), True),
                        StructField('name', StringType(),  True),
                        StructField('email', StringType(),True),
                        StructField('password', StringType(), True),
                        StructField('phone', StringType(),True)])

In [0]:
users = spark.read.format('csv').option('header','true').option('delimiter',',').schema(my_schema).load('/mnt/meta/data/unit_testing/users.csv')

In [0]:
class TestFixtureArbitraryFiles(NutterFixture):
  def __init__(self):
    NutterFixture.__init__(self)

  # Get custom Schema test case
  def assertion_get_custom_schema(self):
    df_expected = spark.createDataFrame(data =[],schema=my_schema)
    expected_schema = get_custom_schema('users')
    df_actual = spark.createDataFrame(data = [],schema=expected_schema)
    assert_df_equality(df_actual, df_expected)

  #Column value replace function test case
  def assertion_replace_charactrs(self):
    replacements = {"password":('@',"")}
    
    data = [
      (9666488188,'Miss Beth Buchanan','Miss_Beth_Buchanan590@example.com','R_sCLwLzbW6c&I','469-349-07'),
      (6986658652,'John Olson','John_Olson257@example.com','KX)FBXzP7Ue','606.814.97'),
      (5870824764,'Gary Malone','Gary_Malone659@example.com','$7_HlF6g','001-902-76'),
      (3463026079,'Kristin Sanders','Kristin_Sanders125@example.com','k#5nOh!g','+1-818-794')
    ]
    df_expected = spark.createDataFrame(data=data,schema=my_schema)
    actual_value = replace_characters(users, replacements)
    actual_df = actual_value.limit(4)
    assert_df_equality(actual_df, df_expected)
    
  #Date format Change test case
  def assertion_change_date_format(self):
    expected_data = [(1, '01-01-2022', '2022-02-15'),(2, '30-03-2022', '2022-04-10'),(3, '20-05-2022', '2022-06-05')]
    my_schema = StructType([
                            StructField('id', StringType(), True),
                            StructField('date_column_1', StringType(), True),
                            StructField('date_column_2', StringType(), True)])
    
    input_data = [(1, '2022-01-01', '2022-02-15'),(2, '2022-03-30', '2022-04-10'),(3, '2022-05-20', '2022-06-05')]
    
    # Define schema with two date columns
    
    df_date = spark.createDataFrame(input_data,my_schema)
    
    df_expected = spark.createDataFrame(data=expected_data,schema=my_schema)
    df_actual = change_date_format(df_date,['date_column_1'],'dd-MM-yyyy')
    assert_df_equality(df_actual, df_expected)

    # Column rename Function test cases
  def assertion_rename_columns(self):
      my_schema = StructType([
                            StructField('id', LongType(), True),
                            StructField('names', StringType(),True ),
                            StructField('email', StringType(), True),
                            StructField('password', StringType(), True),
                            StructField('phone', StringType(), True)])
      data = [
        (9666488188, 'Miss Beth Buchanan', 'Miss_Beth_Buchanan590@example.com', 'R_sCLwLzbW6@c&I', '469-349-07'),
        (6986658652, 'John Olson', 'John_Olson257@example.com', '@KX)FBXzP7Ue', '606.814.97'),
        (5870824764, 'Gary Malone', 'Gary_Malone659@example.com', '$7_HlF6g', '001-902-76'),
        (3463026079, 'Kristin Sanders', 'Kristin_Sanders125@example.com', 'k#5nOh!g', '+1-818-794'),
        (7991496591, 'Martha Jacobs', 'Martha_Jacobs491@example.com', '3rgJcyE#*6M', '2462808562'),
        (5822830658, 'Jessica Schmidt', 'Jessica_Schmidt236@example.com', '_Za1L^P5_D54K&j', '257-432-61'),
        (5020007556, 'Monica Lopez', 'Monica_Lopez113@example.com', 'K7V!gqGj)', '001-574-57')]
      df_actual = spark.createDataFrame(data =data,schema=my_schema)
      df = rename_columns(users,column_mapping={'user_id':'id','name':'names'})
      assert_df_equality(df_actual, df.limit(7))

  # Data type casting test cases
  def assertion_cast_type_columns(self):
    expected_data = [
    (1, '01-01-2022', '2022-02-15'),
    (2, '30-03-2022', '2022-04-10'),
    (3, '20-05-2022', '2022-06-05')]
    my_schema = StructType([
                            StructField('id', StringType(), True),
                            StructField('date_column_1', StringType(), True),
                            StructField('date_column_2', StringType(), True),
                            ])

    input_data = [
        (1, '2022-01-01', '2022-02-15'),
        (2, '2022-03-30', '2022-04-10'),
        (3, '2022-05-20', '2022-06-05')]
    df_date = spark.createDataFrame(input_data,my_schema)

    df_expected = spark.createDataFrame(data=expected_data, schema=my_schema)
    df_expected = df_expected.withColumn('date_column_1', to_date(df_expected['date_column_1'], 'dd-MM-yyyy'))
    column_datatypes = {'date_column_1': DateType()}
    df_actual = typecast_columns(df_date, column_datatypes)
    assert_df_equality(df_actual, df_expected)

  #Epoch Date conversion test cases
  def assertion_convert_epoch_columns(self):
    data = [(1,'1975-02-16 23:36:02'),
                (2,'1975-02-16 23:36:12'),
                (3,'1975-02-16 23:36:22')]
    schema = StructType([
        StructField("id", LongType(), True),
        StructField("epoch_timestamp", StringType(), True)])    
    df_expected = spark.createDataFrame(data=data,schema=schema)
    data = [
    (1, 1618257621),  # Replace with your epoch timestamp values
    (2, 1618257721),
    (3, 1618257821)]
    schema = ["id", "epoch_timestamp"]
 
    df_epoch = spark.createDataFrame(data, schema=schema)
    df_actual = convert_epoch_columns_to_datetime(df_epoch,['epoch_timestamp'])
    df_actual = df_actual.withColumn('epoch_timestamp',col('epoch_timestamp').cast(StringType()))
    assert_df_equality(df_actual, df_expected)
  
  def assertion_get_table_data(self):
    my_schema = StructType([
                          StructField('user_id', LongType(), True),
                          StructField('email1', StringType(), True),
                          StructField('password', StringType(), True),
                          StructField('phone', StringType(), True),
                          StructField('first_name', StringType(), True),
                          StructField('last_name', StringType(), True),
                          StructField('load_date', StringType(), True)])
    data = [
    (275905542, '732c11b3b3cacfa817e2bc64f605832d', '&E3nBxQDE_o&P5o', '(236)691-3','Brianna','Holmes','2023-12-29'),
     (9059841656, 'be06d7c848dee6a36effc7cedb5201e0', '(OX1AZu9K)', '(637)769-0','Cameron','Nelson','2023-12-29')]
    df_expected = spark.createDataFrame(data=data,schema=my_schema)
    
    df_actual = get_table_data('cue_box', 'users', filter_condition = None).limit(2)
    
    
    assert_df_equality(df_actual, df_expected)

  
result = TestFixtureArbitraryFiles().execute_tests()
print(result.to_string())
is_job = dbutils.notebook.entry_point.getDbutils().notebook().getContext().currentRunId().isDefined()
if is_job:
  result.exit(dbutils)