In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


In [3]:
def countWords(files):
    # SparkSession is the entry point to Spark APIs
    spark = SparkSession.builder.master("spark://spark-master:7077").getOrCreate()

    # Read the text files. The content of each file is broken into lines automatically.
    df = spark.read.text(files)

    # Filter out empty lines.
    line = df.filter(df.value != '')

    # Split each line into words, and name the resulting column 'words'
    words = line.select(split(line.value, '\s*(?:\s+|[,\.:]\s|\?")\s*', -1).alias('words'))

    # Rotate each row of words and each word becomes its own row.
    word = words.select(explode(words.words).alias('word'))
    # Filter out empty strings.
    word = word.filter(word.word != '')

    # Group the rows by word and calculate the count for each word.
    wordCount = word.groupBy(word.word).count()
    # Sort the rows by count in descending order.
    wordCount = wordCount.sort('count', ascending=False)

    return wordCount.collect()
    
countWords([
    '/data/dag-analysis-inputs/1.txt'
    , '/data/dag-analysis-inputs/2.txt'
    , '/data/dag-analysis-inputs/3.txt'
    , '/data/dag-analysis-inputs/4.txt'
    , '/data/dag-analysis-inputs/5.txt'
])

[Row(word='the', count=142212),
 Row(word='of', count=131206),
 Row(word='a', count=96882),
 Row(word='and', count=76768),
 Row(word='to', count=74134),
 Row(word='Spark', count=64270),
 Row(word='in', count=57432),
 Row(word='is', count=53362),
 Row(word='on', count=52984),
 Row(word='that', count=52394),
 Row(word='as', count=43528),
 Row(word='data', count=41622),
 Row(word='can', count=38842),
 Row(word='RDD', count=37936),
 Row(word='be', count=37170),
 Row(word='an', count=34588),
 Row(word='RDDs', count=27728),
 Row(word='for', count=26962),
 Row(word='programming', count=24848),
 Row(word='operations', count=24848),
 Row(word='or', count=24696),
 Row(word='such', count=21718),
 Row(word='are', count=20822),
 Row(word='by', count=20640),
 Row(word='API', count=20238),
 Row(word='Scala', count=18824),
 Row(word='interface', count=18364),
 Row(word='program', count=17988),
 Row(word='Java', count=17974),
 Row(word='distributed', count=17372),
 Row(word='new', count=15146),
 Row(wo