In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()
spark = SparkSession. \
    builder. \
    appName("TDD - XXX"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
input_df = spark.read \
.format("text") \
.load("/user/itv005880/project/tdd.txt")

In [3]:
input_df.show()

+--------------------+
|               value|
+--------------------+
|Test-driven devel...|
|                    |
|Software engineer...|
|                    |
|Test-driven devel...|
|                    |
|Programmers also ...|
|       1. Add a test|
|The adding of a n...|
|2. Run all tests....|
|This shows that n...|
|3. Write the simp...|
|Inelegant or hard...|
|4. All tests shou...|
|If any fail, the ...|
|5. Refactor as ne...|
|Code is refactore...|
|Examples of refac...|
|moving code to wh...|
|removing duplicat...|
+--------------------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row

In [5]:
# "\W+". This splits the source at word boundaries removing spaces and punctuation.
# explode: Returns a new row for each element in the given array or map
def split_words_from_data(input_df):
    return input_df.select(explode(split(input_df.value, "\W+")).alias("word"))

words = split_words_from_data(input_df)

In [6]:
print(words)

+------------+
|        word|
+------------+
|        Test|
|      driven|
| development|
|         TDD|
|          is|
|           a|
|    software|
| development|
|     process|
|     relying|
|          on|
|    software|
|requirements|
|       being|
|   converted|
|          to|
|        test|
|       cases|
|      before|
|    software|
+------------+
only showing top 20 rows



In [7]:
def test_split_words_from_data():
    #Arrange
    schema = StructType([
        StructField("value", StringType())
    ])
    expected_schema = StructType([
        StructField("word", StringType())
    ])
    data = [
        Row("TDD is a software"),
        Row("development process"),
    ]
    expected_data = [
        Row("TDD"), Row("is"), Row("a"), Row("software"), 
        Row("development"), Row("process")
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, expected_schema)
    
    #Act
    actual_df = split_words_from_data(input_df)
    
    #Assert
    try:
        assert actual_df.schema == expected_df.schema, "Schema not matched"
        assert actual_df.collect() == expected_df.collect(), "Data not matched"
        print("All Test Cases Passed !")
    except AssertionError as msg:
        print(msg)    

test_split_words_from_data()

All Test Cases Passed !


In [8]:
def convert_to_lower_case(words):
    return words.select(lower(words.word).alias("word"))

lower_case_words = convert_to_lower_case(words)

In [9]:
def test_convert_to_lower_case():
    #Arrange
    schema = StructType([
        StructField("word", StringType())
    ])
    data = [
        Row("Jack"),
        Row("aND"),
        Row("Jill")
    ]
    expected_data = [
        Row("jack"), 
        Row("and"),
        Row("jill")
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, schema)
    
    #Act
    actual_df = convert_to_lower_case(input_df)
    
    #Assert
    try:
        assert actual_df.schema == expected_df.schema, "Schema not matched"
        assert actual_df.collect() == expected_df.collect(), "Data not matched"
        print("All Test Cases Passed !")
    except AssertionError as msg:
        print(msg)    

test_convert_to_lower_case()

All Test Cases Passed !


In [10]:
print(lower_case_words)

+------------+
|        word|
+------------+
|        test|
|      driven|
| development|
|         tdd|
|          is|
|           a|
|    software|
| development|
|     process|
|     relying|
|          on|
|    software|
|requirements|
|       being|
|   converted|
|          to|
|        test|
|       cases|
|      before|
|    software|
+------------+
only showing top 20 rows



In [11]:
from pyspark.sql.types import LongType

In [12]:
def word_count(lower_case_words):
    return lower_case_words.groupBy("word").count()
    
word_counts = word_count(lower_case_words)

In [17]:
def test_word_count():
    #Arrange
    schema = StructType([
        StructField("word", StringType())
    ])
    expected_schema = StructType([
        StructField("word", StringType()),
        StructField("count", LongType())
    ])
    data = [
        Row("jack"),
        Row("and"),
        Row("jac"),
        Row("jack")
    ]
    expected_data = [
        Row("jack",2), 
        Row("and",1),
        Row("jac",1) 
    ]
    input_df = spark.createDataFrame(data, schema)
    expected_df = spark.createDataFrame(expected_data, expected_schema)
    
    #Act
    actual_df = word_count(input_df)
    print(actual_df.schema)

    #Assert
    try:
        assert actual_df.schema == expected_df.schema, "Schema not matched"
        assert actual_df.collect() == expected_df.collect(), "Data not matched"
        print("All Test Cases Passed !")
    except AssertionError as msg:
        print(msg)
    
test_word_count()

StructType(List(StructField(word,StringType,true),StructField(count,LongType,false)))
Schema not matched


In [14]:
word_counts_sorted = word_counts.sort(col("count").desc())

In [15]:
print(word_counts_sorted)

+-----+-----+
| word|count|
+-----+-----+
|  the|  279|
|     |  148|
| test|  134|
|    a|  131|
|  and|  129|
|   to|  122|
|   of|  118|
|tests|   90|
|   is|   87|
| that|   72|
| code|   68|
|   in|   68|
|   be|   60|
|  for|   54|
|  tdd|   43|
|  are|   39|
|   or|   38|
| this|   36|
|   it|   36|
| unit|   34|
+-----+-----+
only showing top 20 rows



In [20]:
spark.stop()