In [33]:
import time
import re
from pyspark.sql import SparkSession, functions as sf

file_path = 'data/sample-2mb-text-file.txt'
spark = SparkSession.builder.appName("WordCountExample").getOrCreate()


# Word count

## Standard Python

In [34]:
start = time.time()

with open(file_path) as file:
    text = file.read()

words = re.split("\s+", text)
result = []
for word in set(words):
    result.append((word, words.count(word)))

end = time.time()

print("Word count :", result)
print("Elasped time :", end - start)

Word count : [('elit', 1538), ('parturient.', 36), ('Fames', 107), ('neque', 2070), ('Sodales', 100), ('ac', 2573), ('nibh', 2025), ('nisi', 1183), ('at', 3206), ('nam', 562), ('Lobortis', 146), ('Odio', 331), ('risus', 2238), ('At', 624), ('dis.', 39), ('diam.', 463), ('parturient', 206), ('Nulla', 418), ('lacinia', 418), ('dictum.', 193), ('Congue', 148), ('Dictum', 176), ('Scelerisque', 326), ('interdum', 1024), ('Porta', 114), ('Semper', 195), ('Cras', 245), ('morbi', 2293), ('Id', 695), ('aenean.', 189), ('Ridiculus', 36), ('Hac', 77), ('dictumst.', 48), ('semper', 1193), ('magna', 1225), ('lorem.', 221), ('molestie.', 134), ('magna.', 215), ('Posuere', 192), ('etiam', 942), ('convallis', 1065), ('enim', 2978), ('nascetur', 208), ('eiusmod', 5), ('Natoque', 44), ('tempor.', 182), ('Bibendum', 223), ('Fringilla', 144), ('Lorem', 281), ('egestas', 2783), ('mi', 1644), ('in.', 868), ('senectus', 629), ('Sapien', 146), ('dis', 213), ('massa', 2377), ('sem', 998), ('vel.', 367), ('libe

## Spark with RDD

In [36]:
start = time.time()

text = spark.sparkContext.textFile(file_path)
word_counts_rdd = text.flatMap(lambda line: line.split("\s+")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)
result = word_counts_rdd.collect()

end = time.time()
print("Word count :", result)
print("Elasped time :", end - start)


[Stage 3:>                                                          (0 + 2) / 2]

Word count : [('Lorem', 281), ('ipsum', 1430), ('consectetur', 1312), ('sed', 5650), ('do', 5), ('labore', 5), ('magna', 1225), ('Velit', 263), ('scelerisque', 1762), ('in', 4748), ('dictum', 1002), ('erat.', 118), ('Sit', 867), ('amet', 4594), ('justo', 768), ('diam', 2638), ('vulputate.', 270), ('lectus', 1890), ('nibh', 2025), ('condimentum', 826), ('venenatis', 1024), ('sociis', 200), ('natoque', 210), ('Habitant', 124), ('et.', 643), ('Interdum', 179), ('faucibus', 2180), ('eget', 3700), ('nullam.', 141), ('Aliquam', 436), ('purus', 2007), ('augue', 1226), ('interdum', 1024), ('sodales', 655), ('etiam', 942), ('sit.', 765), ('Quam', 301), ('sagittis', 1460), ('odio', 1815), ('mauris.', 411), ('nisi', 1183), ('lacus', 1483), ('Iaculis', 142), ('at', 3206), ('erat', 773), ('pellentesque', 2694), ('commodo', 1336), ('dui.', 260), ('nulla', 2412), ('porttitor', 962), ('quis', 2588), ('lobortis', 840), ('fermentum', 1205), ('Turpis', 373), ('integer', 1140), ('', 2847), ('vitae', 3361)

                                                                                

## Using Spark with DataFrames

In [35]:
start = time.time()

text = spark.read.text(file_path)
wordCounts = text.select(sf.explode(sf.split(text.value, "\s+")).alias("word")).groupBy("word").count()
result = wordCounts.collect()

end = time.time()
print("Word count :", result)
print("Elasped time :", end - start)




Word count : [Row(word='Sit', count=867), Row(word='Elit', count=308), Row(word='vehicula.', count=39), Row(word='eros.', count=78), Row(word='nam.', count=104), Row(word='dui.', count=260), Row(word='porttitor', count=962), Row(word='eleifend.', count=108), Row(word='consectetur.', count=256), Row(word='senectus.', count=106), Row(word='odio', count=1815), Row(word='Turpis', count=373), Row(word='Sed', count=986), Row(word='volutpat', count=1923), Row(word='integer.', count=203), Row(word='interdum', count=1024), Row(word='pretium', count=1347), Row(word='Eu', count=531), Row(word='semper.', count=239), Row(word='nisl.', count=295), Row(word='non.', count=433), Row(word='sem.', count=181), Row(word='ipsum.', count=233), Row(word='habitant.', count=104), Row(word='vel.', count=367), Row(word='volutpat.', count=342), Row(word='Sem', count=199), Row(word='Quisque', count=152), Row(word='Cursus', count=314), Row(word='tempor.', count=182), Row(word='sagittis', count=1460), Row(word='felis

On constate que le calcul est plus efficace en utilisant des DataFrames plutôt que les RDD.
Cela est dû au fait que les DataFrames exploitent le Catalyst Optimizer

In [38]:
spark.stop()

## TODO : hadoop
```bash
cd notebook/hadoop  
javac Words.java
javac WordsJob.java
hrestart
h -put ../data/sample-2mb-text-file.txt /
java WordsJob /sample-2mb-text-file.txt resultat
```