In [0]:
URL = 'dbfs:/FileStore/shared_uploads/antony.prince/tdd.txt'

In [0]:
input_df = spark.read.text(URL)


In [0]:
display(input_df)

value
"Test-driven development (TDD) is a software development process relying on software requirements being converted to test cases before software is fully developed, and tracking all software development by repeatedly testing the software against all test cases. This is as opposed to software being developed first and test cases created later."
"Software engineer Kent Beck, who is credited with having developed or ""rediscovered""[1] the technique, stated in 2003 that TDD encourages simple designs and inspires confidence.[2]"
"Test-driven development is related to the test-first programming concepts of extreme programming, begun in 1999,[3] but more recently has created more general interest in its own right.[4]"
Programmers also apply the concept to improving and debugging legacy code developed with older techniques.[5]
1. Add a test
"The adding of a new feature begins by writing a test that passes iff the feature's specifications are met. The developer can discover these specifications by asking about use cases and user stories. A key benefit of test-driven development is that it makes the developer focus on requirements before writing code. This is in contrast with the usual practice, where unit tests are only written after code."
2. Run all tests. The new test should fail for expected reasons
This shows that new code is actually needed for the desired feature. It validates that the test harness is working correctly. It rules out the possibility that the new test is flawed and will always pass.
3. Write the simplest code that passes the new test
"Inelegant or hard code is acceptable, as long as it passes the test. The code will be honed anyway in Step 5. No code should be added beyond the tested functionality."


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row

In [0]:
def split_words_from_data(input_df):
    return input_df.select(F.explode(F.split(input_df.value, "\\W+")).alias("word"))

words = split_words_from_data(input_df)


In [0]:
def test_split_words_from_data():
    #Arrange
    schema = StructType([
        StructField("value", StringType() ,False)
    ])
    expected_schema = StructType([
        StructField("word", StringType() ,False)
    ])
    data = [
        Row("Jack and Jill"),
        Row("went up the hill"),
    ]
    expected_data = [
        Row("Jack"), Row("and"), Row("Jill"), 
        Row("went"), Row("up"), Row("the"), Row("hill")
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, expected_schema)
    
    #Act
    actual_df = split_words_from_data(input_df)
    
    #Assert
    assert actual_df.schema == expected_df.schema
    assert actual_df.collect() == expected_df.collect()
    print("Test Passed !")

test_split_words_from_data()

Test Passed !


In [0]:
display(words)

word
Test
driven
development
TDD
is
a
software
development
process
relying


In [0]:
words.count()

Out[16]: 4985

In [0]:
def remove_empty_strings(words):
    return words.filter(F.trim(words.word) != "")

words_without_empty_string = remove_empty_strings(words)

In [0]:
def test_remove_empty_strings():
    #Arrange
    schema = StructType([
        StructField("word", StringType() ,False)
    ])
    data = [
        Row("Jack"),
        Row(""),
        Row("      "),
        Row("Jill")
    ]
    expected_data = [
        Row("Jack"), 
        Row("Jill"), 
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, schema)
    
    #Act
    actual_df = remove_empty_strings(input_df)
    
    #Assert
    assert actual_df.schema == expected_df.schema
    assert actual_df.collect() == expected_df.collect()
    print("Test Passed !")
    
test_remove_empty_strings()

Test Passed !


In [0]:
def convert_to_lower_case(words):
    return words.select(F.lower(words.word).alias("word"))

lower_case_words = convert_to_lower_case(words)

In [0]:
def test_convert_to_lower_case():
    #Arrange
    schema = StructType([
        StructField("word", StringType() ,False)
    ])
    data = [
        Row("Jack"),
        Row("aND"),
        Row("Jill")
    ]
    expected_data = [
        Row("jack"), 
        Row("and"),
        Row("jill") 
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, schema)
    
    #Act
    actual_df = convert_to_lower_case(input_df)

    #Assert
    assert actual_df.schema == expected_df.schema
    assert actual_df.collect() == expected_df.collect()
    print("Test Passed !")
    
test_convert_to_lower_case()

Test Passed !


In [0]:
display(lower_case_words)

word
test
driven
development
tdd
is
a
software
development
process
relying


In [0]:
from pyspark.sql.types import LongType


In [0]:
def word_count(lower_case_words):
    return lower_case_words.groupBy("word").count()
    
word_counts = word_count(lower_case_words)


In [0]:

def test_word_count():
    #Arrange
    schema = StructType([
        StructField("word", StringType() ,False)
    ])
    expected_schema = StructType([
        StructField("word", StringType() ,False),
        StructField("count", LongType() ,False)
    ])
    data = [
        Row("jack"),
        Row("and"),
        Row("jac"),
        Row("jack")
    ]
    expected_data = [
        Row("jack",2), 
        Row("and",1),
        Row("jac",1) 
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, expected_schema)
    
    #Act
    actual_df = word_count(input_df)

    #Assert
    assert actual_df.schema == expected_df.schema
    assert actual_df.collect() == expected_df.collect()
    print("Test Passed !")
    
test_word_count()

Test Passed !


In [0]:
word_counts_sorted = word_counts.sort(F.col("count").desc())


In [0]:
display(word_counts_sorted)

word,count
the,279
,148
test,134
a,131
and,129
to,122
of,118
tests,90
is,87
that,72


Output can only be rendered in Databricks

In [0]:
word_counts_sorted.show(word_counts_sorted.count())


