In [11]:
# map vs. flatMap

# map transformation applies a function to each row in a DataFrame/Dataset and returns the new transformed Dataset.
# 1 => 1
# flatMap transformation flattens the DataFrame/Dataset after applying the function on every element and returns a new transformed Dataset. The returned Dataset will return more rows than the current DataFrame. It is also referred to as a one-to-many transformation function
# 1 => Many
# One of the use cases of flatMap() is to flatten column which contains arrays, list, or any nested collection

# map과 flatMap의 차이는 1대1 매핑과 1대다 매핑의 차이라는 것
# flatMap의 경우에 데이터 리스트가 들어있는경우 분산적으로 매핑 작업이 이루어져 더 많은 결과를 뽑아낼 수 있음

import pyspark

sc = pyspark.SparkContext.getOrCreate();
rdd = sc.parallelize([("name", "joe,sarah,tom"), ("car", "hyundai")])
result = rdd.map(lambda x: x[1].split(","))
print(result.collect())
# [['joe', 'sarah', 'tom'], ['hyundai']]

rdd = sc.parallelize([("name", "joe,sarah,tom"), ("car", "hyundai")])
result = rdd.flatMap(lambda x: x[1].split(","))
print(result.collect())
# ['joe', 'sarah', 'tom', 'hyundai']

# 해당 예제에서는 Map의 경우에는 각각의 리스트 내부에서의 작업만 이루어 졌지만 flatMap은 작업이 이루어짐과 동시에 하나의 리스트로 만들어 버리는것을 볼 수 있음
# 데이터의 간극을 평평하게 만들어줌


test_file = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
lines = sc.textFile(test_file)
words = lines.flatMap(lambda x: x.split())
# word_count = words.countByValue()
# print(word_count)
# for word, count in word_count.items():
    # print(f"{word}: {count}")
    
    
# How about sort by key?
# 키와 밸류 = 1로 매핑해서 키에 해당하는 밸류값들의 합으로 생성
word_count = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

# 밸류-키 를 변경한뒤 밸류값으로 소팅을 진행
sorted_word_count = word_count.map(lambda x: (x[1], x[0])).sortByKey()
for word, count in sorted_word_count.collect():
    print(f"{word}: {count}")

[['joe', 'sarah', 'tom'], ['hyundai']]
['joe', 'sarah', 'tom', 'hyundai']
defaultdict(<class 'int'>, {'Lorem': 12, 'ipsum': 57, 'dolor': 45, 'sit': 147, 'amet,': 1, 'consectetur': 44, 'adipiscing': 57, 'elit,': 1, 'sed': 194, 'do': 1, 'eiusmod': 1, 'tempor': 32, 'incididunt': 1, 'ut': 140, 'labore': 1, 'et': 117, 'dolore': 1, 'magna': 44, 'aliqua.': 1, 'Et': 18, 'odio': 54, 'pellentesque': 93, 'diam': 95, 'volutpat': 58, 'commodo': 45, 'egestas': 93, 'egestas.': 15, 'Ligula': 1, 'ullamcorper': 47, 'malesuada': 43, 'proin': 33, 'libero': 38, 'nunc': 113, 'consequat': 44, 'interdum': 32, 'varius': 40, 'sit.': 21, 'Varius': 6, 'quam': 65, 'quisque': 27, 'id': 127, 'vel': 79, 'elementum': 85, 'pulvinar': 53, 'etiam.': 7, 'At': 14, 'auctor': 31, 'urna': 59, 'cursus.': 12, 'Convallis': 7, 'tellus': 97, 'velit': 53, 'laoreet': 30, 'id.': 18, 'Nam': 5, 'justo': 20, 'amet': 149, 'Metus': 4, 'aliquam': 84, 'eleifend': 23, 'mi': 65, 'in': 164, 'nulla': 91, 'posuere': 33, 'sollicitudin': 31, 'aliq