In [2]:
sc

### Read data

In [3]:
logs_rdd = sc.textFile('/user/testuser/spark.log')

logs_rdd.take(5)

['17/06/09 20:10:40 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]',
 '17/06/09 20:10:40 INFO spark.SecurityManager: Changing view acls to: yarn,curi',
 '17/06/09 20:10:40 INFO spark.SecurityManager: Changing modify acls to: yarn,curi',
 '17/06/09 20:10:40 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, curi); users with modify permissions: Set(yarn, curi)',
 '17/06/09 20:10:41 INFO spark.SecurityManager: Changing view acls to: yarn,curi']

### Count amount of data

In [4]:
logs_rdd.count()

2000

### Map

In [27]:
logs_rdd.map(lambda log: log.split()[0]).take(3)

['17/06/09', '17/06/09', '17/06/09']

In [30]:
logs_rdd.map(lambda log: log.split()).take(2)

[['17/06/09',
  '20:10:40',
  'INFO',
  'executor.CoarseGrainedExecutorBackend:',
  'Registered',
  'signal',
  'handlers',
  'for',
  '[TERM,',
  'HUP,',
  'INT]'],
 ['17/06/09',
  '20:10:40',
  'INFO',
  'spark.SecurityManager:',
  'Changing',
  'view',
  'acls',
  'to:',
  'yarn,curi']]

In [28]:
logs_rdd.flatMap(lambda log: log.split()).take(10)

['17/06/09',
 '20:10:40',
 'INFO',
 'executor.CoarseGrainedExecutorBackend:',
 'Registered',
 'signal',
 'handlers',
 'for',
 '[TERM,',
 'HUP,']

### Create (key, value) pairs using tuple

In [31]:
source_info_rdd = logs_rdd.map(lambda log: (log.split()[3], log.split(' ', 4)[4]) )
source_info_rdd.take(3)

[('executor.CoarseGrainedExecutorBackend:',
  'Registered signal handlers for [TERM, HUP, INT]'),
 ('spark.SecurityManager:', 'Changing view acls to: yarn,curi'),
 ('spark.SecurityManager:', 'Changing modify acls to: yarn,curi')]

### Get only key or value from (key, value) pairs

In [32]:
source_info_rdd.keys().take(3)

['executor.CoarseGrainedExecutorBackend:',
 'spark.SecurityManager:',
 'spark.SecurityManager:']

In [33]:
source_info_rdd.values().take(3)

['Registered signal handlers for [TERM, HUP, INT]',
 'Changing view acls to: yarn,curi',
 'Changing modify acls to: yarn,curi']

### mapValues --> doing map function to value of (key, value) pair only, return (key, mapped_value) instead

In [35]:
source_len_rdd = source_info_rdd.mapValues(lambda value: len(value.split()))
source_len_rdd.take(3)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5)]

### flatMapValues --> map to value only like above, but return (key, mapped_value_part1), (key, mapped_value_part2), ... instead

In [38]:
source_word_rdd = source_info_rdd.flatMapValues(lambda value: value.split())
source_word_rdd.take(10)

[('executor.CoarseGrainedExecutorBackend:', 'Registered'),
 ('executor.CoarseGrainedExecutorBackend:', 'signal'),
 ('executor.CoarseGrainedExecutorBackend:', 'handlers'),
 ('executor.CoarseGrainedExecutorBackend:', 'for'),
 ('executor.CoarseGrainedExecutorBackend:', '[TERM,'),
 ('executor.CoarseGrainedExecutorBackend:', 'HUP,'),
 ('executor.CoarseGrainedExecutorBackend:', 'INT]'),
 ('spark.SecurityManager:', 'Changing'),
 ('spark.SecurityManager:', 'view'),
 ('spark.SecurityManager:', 'acls')]

### Filter & Distinct

In [46]:
time_range = logs_rdd.map(lambda log: log.split()[1]).filter(lambda time: '20:10:41' <= time <= '20:10:50')
time_range.take(20)

['20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:41',
 '20:10:42',
 '20:10:42',
 '20:10:42',
 '20:10:42',
 '20:10:42',
 '20:10:42',
 '20:10:42',
 '20:10:45',
 '20:10:45',
 '20:10:45',
 '20:10:45']

In [47]:
time_range.distinct().take(20)

['20:10:41', '20:10:42', '20:10:45', '20:10:46', '20:10:47', '20:10:48']

### Reduce (aggregrate something on RDD, apply to 1-D only)

In [48]:
source_len_rdd.values().take(5)

[7, 5, 5, 18, 5]

In [49]:
source_len_rdd.values().reduce(lambda x, y: x+y)

17511

### reduceByKey --> aggregate with the same key only, return (key, aggregate_value)

In [50]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [51]:
source_total_word_rdd = source_len_rdd.reduceByKey(lambda x, y: x+y)
source_total_word_rdd.take(10)

[('spark.SecurityManager:', 56),
 ('slf4j.Slf4jLogger:', 2),
 ('storage.DiskBlockManager:', 5),
 ('executor.Executor:', 6647),
 ('broadcast.TorrentBroadcast:', 444),
 ('python.PythonRunner:', 4875),
 ('output.FileOutputCommitter:', 420),
 ('mapred.SparkHadoopMapRedUtil:', 60),
 ('executor.CoarseGrainedExecutorBackend:', 1235),
 ('Remoting:', 8)]

### Group (apply to 1-D only)

In [52]:
source_len_rdd.values().take(5)

[7, 5, 5, 18, 5]

In [65]:
source_len_rdd.values().groupBy(lambda value: value).mapValues( lambda l: len(list(l)) ).take(10)

[(18, 2),
 (2, 33),
 (6, 82),
 (4, 566),
 (8, 305),
 (14, 449),
 (7, 101),
 (5, 42),
 (3, 45),
 (13, 375)]

### groupByKey (apply to (key, value) pair tuple only)

In [66]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [73]:
source_len_rdd.groupByKey().mapValues(lambda value: value).take(3)

[('spark.SecurityManager:',
  <pyspark.resultiterable.ResultIterable at 0x7f668811a400>),
 ('slf4j.Slf4jLogger:',
  <pyspark.resultiterable.ResultIterable at 0x7f668811ab70>),
 ('storage.DiskBlockManager:',
  <pyspark.resultiterable.ResultIterable at 0x7f668811a630>)]

In [74]:
source_len_rdd.groupByKey().mapValues(lambda value: list(value)).take(3)

[('spark.SecurityManager:', [5, 5, 18, 5, 5, 18]),
 ('slf4j.Slf4jLogger:', [2]),
 ('storage.DiskBlockManager:', [5])]

In [76]:
source_total_line_rdd = source_len_rdd.groupByKey().mapValues(lambda value: len(value))
source_total_line_rdd.take(3)

[('spark.SecurityManager:', 6),
 ('slf4j.Slf4jLogger:', 1),
 ('storage.DiskBlockManager:', 1)]

### Join --> join 2 RDD by key, return (key, (value_from_rdd1, value_from_rdd2))

In [77]:
source_total_line_rdd.join(source_total_word_rdd).take(10)

[('spark.SecurityManager:', (6, 56)),
 ('slf4j.Slf4jLogger:', (1, 2)),
 ('storage.DiskBlockManager:', (1, 5)),
 ('executor.Executor:', (606, 6647)),
 ('broadcast.TorrentBroadcast:', (74, 444)),
 ('python.PythonRunner:', (375, 4875)),
 ('output.FileOutputCommitter:', (60, 420)),
 ('mapred.SparkHadoopMapRedUtil:', (30, 60)),
 ('executor.CoarseGrainedExecutorBackend:', (308, 1235)),
 ('Remoting:', (2, 8))]

### countByKey --> count number of each key

In [78]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [80]:
source_len_rdd.countByKey()

defaultdict(int,
            {'executor.CoarseGrainedExecutorBackend:': 308,
             'spark.SecurityManager:': 6,
             'slf4j.Slf4jLogger:': 1,
             'Remoting:': 2,
             'util.Utils:': 2,
             'storage.DiskBlockManager:': 1,
             'storage.MemoryStore:': 150,
             'executor.Executor:': 606,
             'netty.NettyBlockTransferService:': 1,
             'storage.BlockManagerMaster:': 2,
             'broadcast.TorrentBroadcast:': 74,
             'spark.CacheManager:': 75,
             'rdd.HadoopRDD:': 45,
             'Configuration.deprecation:': 5,
             'python.PythonRunner:': 375,
             'storage.BlockManager:': 257,
             'output.FileOutputCommitter:': 60,
             'mapred.SparkHadoopMapRedUtil:': 30})

In [88]:
sample_data = sc.parallelize([
    ('a', [1, 2]),
    ('b', [1, 0, 1, 20]),
    ('c', [1]),
    ('d', [])
])

sample_data.countByKey()

defaultdict(int, {'a': 1, 'b': 1, 'c': 1, 'd': 1})

In [90]:
sample_data = sc.parallelize([
    ('a', 5),
    ('b', 1),
    ('c', 2),
    ('d', 7, 9)
])

sample_data.countByKey()

defaultdict(int, {'a': 1, 'b': 1, 'c': 1, 'd': 1})

In [91]:
sample_data = sc.parallelize([
    ('a', 5),
    ('a', 1),
    ('c', 2),
    ('d', 7, 9)
])

sample_data.countByKey()

defaultdict(int, {'a': 2, 'c': 1, 'd': 1})

### countByValue --> look (key, value) pair as one element to count

In [92]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [93]:
source_len_rdd.countByValue()

defaultdict(int,
            {('executor.CoarseGrainedExecutorBackend:', 7): 1,
             ('spark.SecurityManager:', 5): 4,
             ('spark.SecurityManager:', 18): 2,
             ('slf4j.Slf4jLogger:', 2): 1,
             ('Remoting:', 2): 1,
             ('Remoting:', 6): 1,
             ('util.Utils:', 7): 2,
             ('storage.DiskBlockManager:', 5): 1,
             ('storage.MemoryStore:', 6): 1,
             ('executor.CoarseGrainedExecutorBackend:', 4): 307,
             ('executor.Executor:', 7): 1,
             ('netty.NettyBlockTransferService:', 4): 1,
             ('storage.BlockManagerMaster:', 4): 1,
             ('storage.BlockManagerMaster:', 2): 1,
             ('executor.Executor:', 8): 305,
             ('broadcast.TorrentBroadcast:', 5): 37,
             ('storage.MemoryStore:', 14): 149,
             ('broadcast.TorrentBroadcast:', 7): 37,
             ('spark.CacheManager:', 6): 75,
             ('rdd.HadoopRDD:', 3): 45,
             ('Configuration.d

### sortBy

In [95]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [106]:
# ASC
source_len_rdd.sortBy(lambda rdd: rdd[1]).take(5)

[('slf4j.Slf4jLogger:', 2),
 ('Remoting:', 2),
 ('storage.BlockManagerMaster:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2)]

In [100]:
# DESC
source_len_rdd.sortBy(lambda rdd: -rdd[1]).take(5)

[('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 18),
 ('storage.MemoryStore:', 14),
 ('storage.MemoryStore:', 14),
 ('storage.MemoryStore:', 14)]

In [97]:
# Similar to sortByKey
source_len_rdd.sortBy(lambda rdd: rdd[0]).take(5)

[('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6)]

### soryByKey

In [98]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [108]:
# ASC
source_len_rdd.sortByKey().take(5)

[('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6),
 ('Configuration.deprecation:', 6)]

In [107]:
# DESC
source_len_rdd.sortByKey(False).take(5)

[('util.Utils:', 7),
 ('util.Utils:', 7),
 ('storage.MemoryStore:', 6),
 ('storage.MemoryStore:', 14),
 ('storage.MemoryStore:', 14)]

### Partition

In [109]:
source_len_rdd.getNumPartitions()

2

In [110]:
source_len_rdd = source_len_rdd.repartition(1)

In [111]:
source_len_rdd.getNumPartitions()

1

### Max, Min, Mean

In [113]:
source_len_rdd.take(5)

[('executor.CoarseGrainedExecutorBackend:', 7),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 5),
 ('spark.SecurityManager:', 18),
 ('spark.SecurityManager:', 5)]

In [123]:
source_len_rdd.values().max()

18

In [115]:
source_len_rdd.values().min()

2

In [117]:
source_len_rdd.values().mean()

8.755500000000003

In [132]:
min_len = source_len_rdd.values().min()
source_len_rdd.filter(lambda rdd: rdd[1] == min_len).take(10)

[('slf4j.Slf4jLogger:', 2),
 ('Remoting:', 2),
 ('storage.BlockManagerMaster:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2),
 ('mapred.SparkHadoopMapRedUtil:', 2)]