For help, look here:
https://spark.apache.org/docs/latest/rdd-programming-guide.html

In [0]:
# Check out the pre-loaded dataset
display(dbutils.fs.ls('dbfs:/databricks-datasets'))
display(dbutils.fs.ls('dbfs:/databricks-datasets/airlines'))

## 1. Word Count

In [0]:
# Create rdd (sc = SparkContext)
rdd = sc.textFile("dbfs:/databricks-datasets/SPARK_README.md")

In [0]:
# Read 20 lines 
rdd.take(20)

In [0]:
# Example: lambda functions  
words = rdd.flatMap(lambda lines: lines.split(" "))

words.collect()

In [0]:
# Nicer print
for w in words.collect():
  print(w)

In [0]:
# Print 20 words
words.take(20)

In [0]:
# Add 1 to each word (= initialize a counter)
words = rdd.flatMap(lambda lines: lines.split(" ")) \
          .map(lambda x: (x, 1))

for w in words.collect():
  print(w)

In [0]:
# Take the previous function and
# 1. count the occurence of each word
words = rdd.flatMap(lambda lines: lines.split(" ")) \
          .map(lambda x: (x, 1)) \
          .reduceByKey(lambda x, y: x + y)

for w in words.collect():
  print(w)

In [0]:
# 2. change all capital letters to lower case
words = rdd.flatMap(lambda lines: lines.split(" ")) \
          .map(lambda x: x.lower()) \
          .map(lambda x: (x, 1)) \
          .reduceByKey(lambda x, y: x + y)

for w in words.collect():
  print(w)

In [0]:
# 3. eliminate stopwords 
stop_words = ['for', 'the', 'between', 'and', 'a', 'an', 'is', 'are', 'only', 'of', 'in', 'or']

words = rdd.flatMap(lambda lines: lines.split(" ")) \
          .map(lambda x: x.lower()) \
          .filter(lambda w: w not in stop_words) \
          .map(lambda x: (x, 1)) \
          .reduceByKey(lambda x, y: x + y)

for w in words.collect():
  print(w)

In [0]:
# 4. sort in alphabetical order
words = rdd.flatMap(lambda lines: lines.split(" ")) \
          .map(lambda x: x.lower()) \
          .filter(lambda w: w not in stop_words) \
          .map(lambda x: (x, 1)) \
          .reduceByKey(lambda x, y: x + y) \
          .sortByKey()

for w in words.collect():
  print(w)

In [0]:
# 5. sort from most to least frequent word
words = rdd.flatMap(lambda lines: lines.split(" ")) \
          .map(lambda x: x.lower()) \
          .filter(lambda w: w not in stop_words) \
          .map(lambda x: (x, 1)) \
          .reduceByKey(lambda x, y: x + y) \
          .map(lambda x: (x[1], x[0])) \
          .sortByKey(ascending=False)

for w in words.collect():
  print(w)

In [0]:
# 6.** remove punctuations 
same_words = rdd.map(lambda w: w.translate(w.maketrans(",.!?:;[]()-/=+#", 15*" "))) \
                .flatMap(lambda lines: lines.split()) \
                .map(lambda w: w.lower()) \
                .filter(lambda w: w not in stop_words) \
                .map(lambda w: (w, 1)) \
                .reduceByKey(lambda x, y: x + y) \
                .sortByKey()

for i in same_words.collect():
    print(i)

## 2. What does it do?

In [0]:
# Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
("TD", 35), ("Brooke", 25)])

# Try to undestand what this code does (line by line)
agesRDD = (dataRDD
  .map(lambda x: (x[0], (x[1], 1)))
  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  .map(lambda x: (x[0], x[1][0]/x[1][1])))

In [0]:
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
("TD", 35), ("Brooke", 25)])

# Split name and age and add one to the age: (name, (age, 1)) 
agesRDD = (dataRDD
  .map(lambda x: (x[0], (x[1], 1)))
  )

agesRDD.collect()

In [0]:
agesRDD = (dataRDD
  .map(lambda x: (x[0], (x[1], 1)))
  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))  # Find the same names and add the age and the counter
  )

agesRDD.collect()

In [0]:
agesRDD = (dataRDD
  .map(lambda x: (x[0], (x[1], 1)))
  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  .map(lambda x: (x[0], x[1][0]/x[1][1])))   # Keep the name, divide the age by the counter

agesRDD.collect()