# PySpark Integration Tests with Chispa
## Introduction
Integration testing is a vital aspect of software development. It guarantees the quality and reliability of software modules. Therefore, data engineers must learn how to create effective integration tests.

**So, What are Integration Tests?**

Integration tests are the second level of software testing, where individual software components are tested to ensure they work together as intended. Integration testing aims to identify issues with component interaction, validate data flow, and provide the proper behaviour of software when different parts are combined.

**And Why are Integration Tests Important?**

Integration tests help verify that individual components or modules of a software system work as they should together. They ensure that the various software modules function smoothly.  
There may be problems with hardware compatibility, system behaviour and verifying third-party APIs or tools, so we need integration testing to confirm that the data these produce and accept is correct.
https://github.com/MrPowers/chispa

In [1]:
from platform import python_version

print(python_version())


import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SparkConf().getAll()

3.10.12


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/10 08:35:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[('spark.eventLog.enabled', 'true'),
 ('spark.eventLog.dir', 'hdfs:///var/log/spark'),
 ('spark.master', 'yarn'),
 ('spark.app.submitTime', '1733819731054'),
 ('spark.ui.proxyBase', '/proxy/application_1733817654557_0001'),
 ('spark.submit.pyFiles', ''),
 ('spark.yarn.isPython', 'true'),
 ('spark.submit.deployMode', 'client'),
 ('spark.yarn.jars', 'hdfs://master/spark/jars/*.jar'),
 ('spark.history.fs.logDirectory', 'hdfs:///var/log/spark'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import concat_ws, col
from chispa.dataframe_comparer import *
import argparse

spark = SparkSession.builder.appName("chispa_test").getOrCreate()

table_values = [("John", 28, "New York", "USA", "Sales"),
                ("Alice", 32, "Los Angeles", "USA", "Marketing"),
                ("Bob", 24, "Chicago", "USA", "Engineering"),
                ("Sara", 29, "Toronto", "Canada", "Finance"),
                ("David", 30, "London", "UK", "HR"),
                ("Emily", 27, "Sydney", "Australia", "Sales"),
                ("Daniel", 35, "Paris", "France", "Marketing"),
                ("Ella", 26, "Berlin", "Germany", "Engineering"),
                ("Grace", 31, "Madrid", "Spain", "Finance"),
                ("William", 33, "Rome", "Italy", "HR"),
                ("Olivia", 25, "Tokyo", "Japan", "Sales"),
                ("Liam", 29, "Beijing", "China", "Marketing"),
                ("Sophia", 34, "Mumbai", "India", "Engineering"),
                ("Aiden", 28, "Cape Town", "South Africa", "Finance"),
                ("Mia", 31, "Buenos Aires", "Argentina", "HR")]

schema_sample = [
    "Name", "Age", "City", "Country", "Department"
]

def create_table(spark, schema, table_data):
    """
    Creates a Spark DataFrame from given schema and data.

    Args:
        spark: SparkSession object.
        schema: List of column names.
        table_data: List of tuples, where each tuple represents a row of data.

    Returns:
        Spark DataFrame.
    """
    df = spark.createDataFrame(table_data, schema)
    df.show()
    return df

def get_concatenated_column(spark_table):
    """
    Concatenates all columns of the given Spark DataFrame into a single column.

    Args:
        spark_table: Spark DataFrame.

    Returns:
        Spark DataFrame with a single column containing the concatenated values.
    """
    column_names = spark_table.columns
    concatenated_column = spark_table.select(
        concat_ws("_", *[col(c) for c in column_names]).alias("Concatenated_Column")
    )
    concatenated_column.show(truncate=False)
    return concatenated_column

def test_get_concatenated_column(schema, data):
    table = create_table(spark, schema, data)
    actual_output = get_concatenated_column(table).collect()[0][0]
    expected_output = "John_28_New_York_USA_Sales"  # Corrected expected output

    assert actual_output == expected_output, f"Expected: {expected_output}, Actual: {actual_output}"

    schema = [StructField("Concatenated_Column", StringType(), True)] 
    expected_df = spark.createDataFrame([Row(Concatenated_Column=expected_output)], schema)
    actual_df = spark.createDataFrame([Row(Concatenated_Column=actual_output)], schema)
    assert_df_equality(expected_df, actual_df)
    print("all's good")

spark = SparkSession.builder.appName("ColumnConcatenation").getOrCreate()
table = create_table(spark, schema_sample, table_values)
get_concatenated_column(table)
spark.stop()

# Call the test function
test_get_concatenated_column(schema_sample, table_values)

ModuleNotFoundError: No module named 'chispa'

## Unit Tests for PySpark Applications Using unittest and pytest Libraries

TL;DR: Software testing, and in particular, unit testing, is a crucial step in modern Data Engineering. Pytest and unittest are great tools for developing unit tests for PySpark applications. In this article, I provide code examples using both libraries. Also, I discuss the advantages and disadvantages that each of them brings. The choice depends on your needs and previous experience.


**1\. Introduction**

**2\. What is Testing in Software Development?**

**2.1 What are the Test Types?**

**2.2 Understanding Unit Testing and Its Importance**

**2.3 What are the Purposes of the unittest and pytest Libraries?**

**2.4 How Can I Execute These Tests?**

**2.5 Summary and Comparison**

**3\. Conclusion**





