In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, explode
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F 


In [2]:
spark = SparkSession.builder.appName('orrily').getOrCreate()

In [3]:
spark

# SEction 1 Fundamental
# chapter 1. itro to spark and pyspark

In [4]:
tuples = [('A',7),('A',8),('A',-4),
        ('B',3),('B',9),("B",-1),
        ("C",1),("C",5)]

In [6]:
rdd = spark.sparkContext.parallelize(tuples)

In [7]:
# drop negatives values
positive = rdd.filter(lambda x: x[1]>0)
positive.collect()

[('A', 7), ('A', 8), ('B', 3), ('B', 9), ('C', 1), ('C', 5)]

In [10]:
# find suma and average per key using groupBykey

sum_and_avg = positive.groupByKey().mapValues(lambda v: (sum(v),float(sum(v)/len(v))))
sum_and_avg

PythonRDD[15] at RDD at PythonRDD.scala:53

In [11]:
# using reduceByKey
# 1. create (sum,count) per key
sum_count = positive.mapValues(lambda x: (x,1))
sum_count_agg = sum_count.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [12]:
sum_avg = sum_count_agg.mapValues(lambda v: (v[0],float(v[0]/v[1])))

In [19]:
# create a Dataframe using Saprksession

In [5]:
df = spark.read.csv('data/emp.csv',header=True,inferSchema=True)

In [6]:
df.show()

+-----+----+-----+
| dept|name|hours|
+-----+----+-----+
|Sales|Barb|   40|
|Sales| Dan|   20|
|   IT|Alex|   22|
|   IT|Jane|   24|
|   HR|Alex|   20|
|   HR|Mary|   30|
+-----+----+-----+



In [7]:
averages = df.groupBy('dept')\
    .agg(avg('hours').alias('average'),sum('hours').alias("Total"))

TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [12]:
averages.show()

+-----+-------+-----+
| dept|average|Total|
+-----+-------+-----+
|Sales|   30.0|   60|
|   HR|   25.0|   50|
|   IT|   23.0|   46|
+-----+-------+-----+



In [8]:
spark

# chapter 2 Transformation and action

In [9]:
# Gnome
input_path = 'data/sample.fasta'
records_rdd = spark.read.text(input_path)\
    .rdd.map(lambda x: x[0])

records_rdd.collect()

['>seq1',
 'cGTAaccaataaaaaaacaagcttaacctaattc',
 '>seq2',
 'agcttagTTTGGatctggccgggg',
 '>seq3',
 'gcggatttactcCCCCCAAAAANNaggggagagcccagataaatggagtctgtgcgtccaca',
 'gaattcgcacca',
 'AATAAAACCTCACCCAT',
 'agagcccagaatttactcCCC',
 '>seq4',
 'gcggatttactcaggggagagcccagGGataaatggagtctgtgcgtccaca',
 'gaattcgcacca']

In [10]:
sc = spark.sparkContext
records_rdd = sc.textFile(input_path)

In [11]:
records_rdd.collect()

['>seq1',
 'cGTAaccaataaaaaaacaagcttaacctaattc',
 '>seq2',
 'agcttagTTTGGatctggccgggg',
 '>seq3',
 'gcggatttactcCCCCCAAAAANNaggggagagcccagataaatggagtctgtgcgtccaca',
 'gaattcgcacca',
 'AATAAAACCTCACCCAT',
 'agagcccagaatttactcCCC',
 '>seq4',
 'gcggatttactcaggggagagcccagGGataaatggagtctgtgcgtccaca',
 'gaattcgcacca']

In [12]:
# 2. Define mapper function
def process_FASTA_record(fasta_record):
    key_value_list = []
    if (fasta_record.startswith(">")):
        # z counts the number of FASTA sequences 
        key_value_list.append(('z',1))
    else:
        chars = fasta_record.lower()
        for c in chars:
            key_value_list.append((c,1))
    
    print(key_value_list)
    return key_value_list

In [13]:
pairs_rdd = records_rdd.flatMap(lambda rec: process_FASTA_record(rec))

In [14]:
pairs_rdd.getNumPartitions()

2

In [15]:
# reduce by key
frequency_rdd = pairs_rdd.reduceByKey(lambda x,y:x+y)

In [16]:
frequency_rdd.collect()

[('c', 61), ('g', 53), ('z', 4), ('t', 45), ('a', 73), ('n', 2)]

In [17]:
# group by key
grouped_rdd = pairs_rdd.groupByKey()
frequency_df = grouped_rdd.mapValues(lambda val:sum(val))

In [18]:
frequency_df

PythonRDD[31] at RDD at PythonRDD.scala:53

In [58]:
# hashmap solution
# parameter : fasta_record: string, a single FSTA record
# output: a list of (dna_letter, frequency)

from collections import defaultdict


def process_FASTA_as_hashmap(fasta_record):
    if (fasta_record.startswith(">")):
        return [('z',1)]
    hashmap = defaultdict(int)
    chars = fasta_record.lower()
    for c in chars:
        hashmap[c]+=1
    print('hashmap=',hashmap)
    key_value_list = [(k,v) for k,v in hashmap.items()]
    print("key_value_lsit=",key_value_list)
    return key_value_list

def test(fast_rec):
    if fast_rec.startswith(">"):
        return [('z',1)]
    else:
        return list(fast_rec.lower())


In [25]:
records_rdd.collect()

['>seq1',
 'cGTAaccaataaaaaaacaagcttaacctaattc',
 '>seq2',
 'agcttagTTTGGatctggccgggg',
 '>seq3',
 'gcggatttactcCCCCCAAAAANNaggggagagcccagataaatggagtctgtgcgtccaca',
 'gaattcgcacca',
 'AATAAAACCTCACCCAT',
 'agagcccagaatttactcCCC',
 '>seq4',
 'gcggatttactcaggggagagcccagGGataaatggagtctgtgcgtccaca',
 'gaattcgcacca']

In [57]:
pairs_rdd = records_rdd.flatMap(process_FASTA_as_hashmap)
frequency_rdd = pairs_rdd.reduceByKey(lambda x,y:x+y)
frequency_rdd.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 56, avinash, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\worker.py", line 595, in process
  File "C:\spark\spark3\python\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark3\python\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark3\python\pyspark\rdd.py", line 425, in func
    return f(iterator)
  File "C:\spark\spark3\python\pyspark\rdd.py", line 1946, in combineLocally
    merger.mergeValues(iterator)
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-56-7b38a6767932>", line 16, in process_FASTA_as_hashmap
AttributeError: 'collections.defaultdict' object has no attribute 'iteritems'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\worker.py", line 595, in process
  File "C:\spark\spark3\python\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark3\python\pyspark\rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark3\python\pyspark\rdd.py", line 425, in func
    return f(iterator)
  File "C:\spark\spark3\python\pyspark\rdd.py", line 1946, in combineLocally
    merger.mergeValues(iterator)
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "C:\spark\spark3\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-56-7b38a6767932>", line 16, in process_FASTA_as_hashmap
AttributeError: 'collections.defaultdict' object has no attribute 'iteritems'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more


In [36]:
# in hashmp format
frequency_rdd.collectAsMap()

{'c': 61, 'g': 53, 'z': 4, 't': 45, 'a': 73, 'n': 2}

In [37]:
# solution 3




TypeError: collectWithJobGroup() missing 2 required positional arguments: 'groupId' and 'description'

In [38]:
records = spark.sparkContext.textFile('data/sample.fasta')


In [39]:
# partitions  
# use sc.parallelize to create rdd from list,
numbers = [1,2,3,4,5,6,7,8,9,10]
num_partition = 3
rdd = sc.parallelize(numbers,num_partition)

In [40]:
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [41]:
rdd.getNumPartitions()

3

In [46]:
def scan(iterator):
    print(list(iterator))
    
(rdd.foreachPartition(scan))

In [47]:
def adder(iterator):
    yield sum(iterator)

rdd.mapPartitions(adder).collect()

[6, 15, 34]

In [55]:
# Solution 3 create hashmap per partition rather than record
from collections import defaultdict
def process_FASTA_partition(iterator):
    hashmap = defaultdict(int)
    for fasta_record in iterator:
        if fasta_record.startswith(">"):
            hashmap['Z']+=1
        else:
            chars = fasta_record.lower()
            for c in chars:
                hashmap[c]+=1 
    print('hashmap=',hashmap)
    key_value_list = [(k,v) for k,v in hashmap.items()]
    print("key_value_list",key_value_list)
    return key_value_list


records = spark.sparkContext.textFile('data/sample.fasta')
pairs_rdd = records.mapPartitions(process_FASTA_partition)
frequency_rdd = pairs_rdd.reduceByKey(lambda a,b:a+b)
frequency_rdd.collect()

[('c', 61), ('g', 53), ('Z', 4), ('t', 45), ('a', 73), ('n', 2)]

In [60]:
spark

# Chapter 3 Mapper Transformation

In [61]:
# map
def mapper_func(x):
    if (x>0):
        return x+5
    else:
        return 0

# spark: sparksession
data = [1,-1,-2,3,4]
rdd = spark.sparkContext.parallelize(data)
rdd.collect()

[1, -1, -2, 3, 4]

In [62]:
rdd2 = rdd.map(mapper_func) # or rdd2 = rdd.map(lambda x: mapper_func(x)) 

rdd2.collect()  # rdd2 = rdd.map(lambda x : x+5 if x> 0 else 0)

[6, 0, 0, 8, 9]

In [63]:
rdd3 = rdd.map(lambda x : x*2 if x> 0 else 0)
rdd3.collect()

[2, 0, 0, 6, 8]

In [64]:
rdd4 = rdd.map(lambda x: (x,mapper_func(x)))
rdd4.collect()

[(1, 6), (-1, 0), (-2, 0), (3, 8), (4, 9)]

In [65]:
# define a simple mapper function
pairs = [('a',2),('b',-1),('d',2),('e',3)]
rdd = spark.sparkContext.parallelize(pairs)
rdd.collect()

[('a', 2), ('b', -1), ('d', 2), ('e', 3)]

In [66]:
rdd2 = rdd.map(lambda x: (x[0],x[1],x[1]+100))
rdd2.collect()

[('a', 2, 102), ('b', -1, 99), ('d', 2, 102), ('e', 3, 103)]

In [67]:
def create_key_value(string):
    tokens = string.split(",")
    return (tokens[0],(tokens[1],tokens[2]))

strings = ['a,10,11','b,8,19','c,20,21','c,2,8']
rdd = spark.sparkContext.parallelize(strings)
rdd.collect()

['a,10,11', 'b,8,19', 'c,20,21', 'c,2,8']

In [68]:
rdd2 = rdd.map(create_key_value)

In [69]:
rdd2.collect()

[('a', ('10', '11')),
 ('b', ('8', '19')),
 ('c', ('20', '21')),
 ('c', ('2', '8'))]

['1,Alex,30,124',
 '2,Bert,32,234',
 '3,Curt,28,312',
 '4,Don,32,180',
 '5,Mary,30,100',
 '6,Jane,28,212',
 '7,Joe,28,128',
 '8,Al,40,600']

In [74]:
def parse_record(record):
    tokens = record.split(",")
    age = int(tokens[2])
    number_of_friends = int(tokens[3])
    return (age,number_of_friends)


user_path = 'data/student.csv'
users = spark.sparkContext.textFile(user_path)
users.collect()


['1,Alex,30,124',
 '2,Bert,32,234',
 '3,Curt,28,312',
 '4,Don,32,180',
 '5,Mary,30,100',
 '6,Jane,28,212',
 '7,Joe,28,128',
 '8,Al,40,600']

In [75]:
pairs = users.map(parse_record)
pairs.collect()

[(30, 124),
 (32, 234),
 (28, 312),
 (32, 180),
 (30, 100),
 (28, 212),
 (28, 128),
 (40, 600)]

In [76]:
# average friends per year age  # mapvaleus (k,v)-> only applicatbel on values
total_by_age = pairs.mapValues(lambda x: (x,1))\
    .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [79]:
total_by_age.collect()

[(30, (224, 2)), (32, (414, 2)), (28, (652, 3)), (40, (600, 1))]

In [81]:
average_by_age = total_by_age.mapValues(lambda x: x[0]//x[1])
average_by_age.collect()

[(30, 112), (32, 207), (28, 217), (40, 600)]

In [83]:
# DataFrame Mapper does not have map() function

tuples3 = [ ('alex', 440, 'PHD'), ('jane', 420, 'PHD'),
('bob', 280, 'MS'), ('betty', 200, 'MS'),
('ted', 180, 'BS'), ('mary', 100, 'BS') ]

df = spark.createDataFrame(tuples3,['name','amount','education'])
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- amount: long (nullable = true)
 |-- education: string (nullable = true)



In [84]:
df.show()

+-----+------+---------+
| name|amount|education|
+-----+------+---------+
| alex|   440|      PHD|
| jane|   420|      PHD|
|  bob|   280|       MS|
|betty|   200|       MS|
|  ted|   180|       BS|
| mary|   100|       BS|
+-----+------+---------+



In [85]:
# to show new column with 10% bounus
df2 = df.rdd.map(lambda x:(x['name'],x['amount'],x['education'],x['amount']/10))\
    .toDF(['name','amount','education','bonus'])

df2.show()

In [87]:
# easy way to add new column withColumn
df4 = df.withColumn('bonus',df.amount/10)
df4.show()

+-----+------+---------+-----+
| name|amount|education|bonus|
+-----+------+---------+-----+
| alex|   440|      PHD| 44.0|
| jane|   420|      PHD| 42.0|
|  bob|   280|       MS| 28.0|
|betty|   200|       MS| 20.0|
|  ted|   180|       BS| 18.0|
| mary|   100|       BS| 10.0|
+-----+------+---------+-----+



In [92]:
# bonus amount for PHD student 30% and MS 20%
def compute_bonus(amount,education):
    if education=='PHD': return int(amount*0.30)
    if education=='MS': return int(amount*0.20)
    return int(amount*0.10)

# register your python function as UDF 
from pyspark.sql.functions import udf
compute_bonus_udf = udf(lambda amount,education: compute_bonus(amount,education),IntegerType())

In [93]:
df5 = df.withColumn('bonus',compute_bonus_udf(df.amount,df.education))
df5.show()

+-----+------+---------+-----+
| name|amount|education|bonus|
+-----+------+---------+-----+
| alex|   440|      PHD|  132|
| jane|   420|      PHD|  126|
|  bob|   280|       MS|   56|
|betty|   200|       MS|   40|
|  ted|   180|       BS|   18|
| mary|   100|       BS|   10|
+-----+------+---------+-----+



In [94]:
# Dataframe does not have flatmap but have explode fucniton
# explode
some_data = [
('alex', ['Java','Scala', 'Python']),
('jane', ['Cobol','Snobol']),
('bob', ['C++',]),
('ted', []),
('max', [])
]

In [95]:
df = spark.createDataFrame(
data=some_data, schema = ['name', 'known_languages'])

In [97]:
df.show(truncate=False)

+----+---------------------+
|name|known_languages      |
+----+---------------------+
|alex|[Java, Scala, Python]|
|jane|[Cobol, Snobol]      |
|bob |[C++]                |
|ted |[]                   |
|max |[]                   |
+----+---------------------+



In [100]:
exploded_column = df.select(df.name,explode(df.known_languages).alias('language'))
exploded_column.show(truncate=False)

+----+--------+
|name|language|
+----+--------+
|alex|Java    |
|alex|Scala   |
|alex|Python  |
|jane|Cobol   |
|jane|Snobol  |
|bob |C++     |
+----+--------+



In [101]:
some_data = [
... ('alex', ['Java','Scala', 'Python'], ['MS', 'PHD']),
... ('jane', ['Cobol','Snobol'], ['BS', 'MS']),
... ('bob', ['C++'], ['BS', 'MS', 'PHD']),
... ('ted', [], ['BS', 'MS']),
... ('max', ['FORTRAN'], []),
... ('dan', [], [])
... ]

In [107]:
# exploding mutliple column (cannot explode 2 column at time)
df = spark.createDataFrame(some_data,['name','languges','degree'])

In [109]:
df.show(truncate=0)

+----+---------------------+-------------+
|name|languges             |degree       |
+----+---------------------+-------------+
|alex|[Java, Scala, Python]|[MS, PHD]    |
|jane|[Cobol, Snobol]      |[BS, MS]     |
|bob |[C++]                |[BS, MS, PHD]|
|ted |[]                   |[BS, MS]     |
|max |[FORTRAN]            |[]           |
|dan |[]                   |[]           |
+----+---------------------+-------------+



In [113]:
explode_1 = df.select(df.name,explode(df.languges).alias('language'),df.degree)

In [114]:
explode_1.show(truncate=0)

+----+--------+-------------+
|name|language|degree       |
+----+--------+-------------+
|alex|Java    |[MS, PHD]    |
|alex|Scala   |[MS, PHD]    |
|alex|Python  |[MS, PHD]    |
|jane|Cobol   |[BS, MS]     |
|jane|Snobol  |[BS, MS]     |
|bob |C++     |[BS, MS, PHD]|
|max |FORTRAN |[]           |
+----+--------+-------------+



In [117]:
explode_2 = explode_1.select(explode_1.name,explode_1.language,explode(explode_1.degree).alias('degree'))
explode_2.show()

+----+--------+------+
|name|language|degree|
+----+--------+------+
|alex|    Java|    MS|
|alex|    Java|   PHD|
|alex|   Scala|    MS|
|alex|   Scala|   PHD|
|alex|  Python|    MS|
|alex|  Python|   PHD|
|jane|   Cobol|    BS|
|jane|   Cobol|    MS|
|jane|  Snobol|    BS|
|jane|  Snobol|    MS|
| bob|     C++|    BS|
| bob|     C++|    MS|
| bob|     C++|   PHD|
+----+--------+------+



# chapter4 Reduction in spark
 
- reduceByKey()
- combineByKey()
- groupByKey()
- aggregateByKey()

In [4]:
data =[
('alex', 2), ('alex', 4), ('alex', 8),
('jane', 3), ('jane', 7),
('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
('clint', 9)
]


key_value_pairs = spark.sparkContext.parallelize(data)
key_value_pairs.collect()

[('alex', 2),
 ('alex', 4),
 ('alex', 8),
 ('jane', 3),
 ('jane', 7),
 ('rafa', 1),
 ('rafa', 3),
 ('rafa', 5),
 ('rafa', 6),
 ('clint', 9)]

In [121]:
## reduceByKey (soruce and target rdd data types are same)
sum_per_key = key_value_pairs.reduceByKey(lambda x,y:x+y)
sum_per_key.collect()

[('clint', 9), ('alex', 14), ('jane', 10), ('rafa', 15)]

In [122]:
# using add function
from operator import add 
sum_per_key1 = key_value_pairs.reduceByKey(add)
sum_per_key1.collect()

[('clint', 9), ('alex', 14), ('jane', 10), ('rafa', 15)]

In [123]:
## groupByKey
sum_per_key2 = key_value_pairs.groupByKey().mapValues(lambda x: sum(x))
sum_per_key2.collect()

[('clint', 9), ('alex', 14), ('jane', 10), ('rafa', 15)]

In [124]:
## aggregate by key
sum_per_key3 = key_value_pairs.aggregateByKey(0,(lambda C,V:C+V),(lambda C1,C2:C1+C2))
sum_per_key3.collect()

[('clint', 9), ('alex', 14), ('jane', 10), ('rafa', 15)]

In [5]:
## combineByKey
sum_per_key4 = key_value_pairs.combineByKey(
    (lambda x:x),
    (lambda C,x:C+x),
    (lambda C1,C2:C1+C2)
)
sum_per_key4.collect()

[('clint', 9), ('alex', 14), ('jane', 10), ('rafa', 15)]

In [6]:
# movielens 
# requirement -> find the average rating for each user 

## 1st solution reduceByKey



In [4]:
# define a function that accepts a csv record 
# and return a pair of (userId,Rating)
# parameter : rating_record (as csv)
# rating_record = "userid,movieid,raiting,timestamp"



def create_pair(rating_record):
    record = rating_record.split(',')
    return (record[0],float(record[2]))


rating_path = 'data/movielens/ratings.csv'
rating_rdd = spark.sparkContext.textFile(rating_path)
rating = rating_rdd.filter(lambda x: x!='userId,movieId,rating,timestamp')
rating.collect()

['1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815',
 '1,50,5.0,964982931',
 '1,70,3.0,964982400',
 '1,101,5.0,964980868',
 '1,110,4.0,964982176',
 '1,151,5.0,964984041',
 '1,157,5.0,964984100',
 '1,163,5.0,964983650',
 '1,216,5.0,964981208',
 '1,223,3.0,964980985',
 '1,231,5.0,964981179',
 '1,235,4.0,964980908',
 '1,260,5.0,964981680',
 '1,296,3.0,964982967',
 '1,316,3.0,964982310',
 '1,333,5.0,964981179',
 '1,349,4.0,964982563',
 '1,356,4.0,964980962',
 '1,362,5.0,964982588',
 '1,367,4.0,964981710',
 '1,423,3.0,964982363',
 '1,441,4.0,964980868',
 '1,457,5.0,964981909',
 '1,480,4.0,964982346',
 '1,500,3.0,964981208',
 '1,527,5.0,964984002',
 '1,543,4.0,964981179',
 '1,552,4.0,964982653',
 '1,553,5.0,964984153',
 '1,590,4.0,964982546',
 '1,592,4.0,964982271',
 '1,593,4.0,964983793',
 '1,596,5.0,964982838',
 '1,608,5.0,964982931',
 '1,648,3.0,964982563',
 '1,661,5.0,964982838',
 '1,673,3.0,964981775',
 '1,733,4.0,964982400',
 '1,736,3.0,964982653',
 

In [5]:
# load user difined python function
ratings = rating.map(create_pair)
ratings.count()

100836

In [13]:
# aggregateByKey 
# c = (c[0],c[1]) = (sum of rating,count of rating)
# zero value -> c= (0.0,1)
# seq func : (C,V) -> C
# comb_func: (C,C) -> C 
sum_count = ratings.aggregateByKey((0.0,0),(lambda c,v:(c[0]+v,c[1]+1)),
                (lambda x,y: (x[0]+y[0],x[1]+y[1])))

sum_count.collect()

[('1', (1013.0, 232)),
 ('4', (768.0, 216)),
 ('8', (168.0, 47)),
 ('9', (150.0, 46)),
 ('10', (459.0, 140)),
 ('12', (140.5, 32)),
 ('14', (163.0, 48)),
 ('16', (365.0, 98)),
 ('17', (442.0, 105)),
 ('19', (1833.0, 703)),
 ('20', (869.0, 242)),
 ('21', (1444.5, 443)),
 ('22', (306.0, 119)),
 ('24', (401.5, 110)),
 ('26', (68.0, 21)),
 ('29', (335.5, 81)),
 ('33', (591.0, 156)),
 ('34', (294.0, 86)),
 ('40', (388.0, 103)),
 ('44', (161.0, 48)),
 ('45', (1546.5, 399)),
 ('48', (133.0, 33)),
 ('50', (862.0, 310)),
 ('53', (100.0, 20)),
 ('54', (100.0, 33)),
 ('56', (175.0, 46)),
 ('57', (1615.0, 476)),
 ('60', (82.0, 22)),
 ('63', (984.0, 271)),
 ('64', (1948.5, 517)),
 ('66', (1387.0, 345)),
 ('68', (4074.5, 1260)),
 ('69', (201.0, 46)),
 ('70', (268.0, 62)),
 ('73', (779.5, 210)),
 ('74', (756.0, 177)),
 ('77', (116.0, 29)),
 ('82', (767.5, 227)),
 ('83', (390.5, 118)),
 ('84', (1081.0, 293)),
 ('86', (275.0, 70)),
 ('88', (226.0, 56)),
 ('91', (1952.5, 575)),
 ('93', (416.0, 97)),
 ('

In [19]:
average_rating = sum_count.mapValues(lambda x: round((x[0]/x[1]),2))

In [20]:
average_rating.collect()

[('1', 4.37),
 ('4', 3.56),
 ('8', 3.57),
 ('9', 3.26),
 ('10', 3.28),
 ('12', 4.39),
 ('14', 3.4),
 ('16', 3.72),
 ('17', 4.21),
 ('19', 2.61),
 ('20', 3.59),
 ('21', 3.26),
 ('22', 2.57),
 ('24', 3.65),
 ('26', 3.24),
 ('29', 4.14),
 ('33', 3.79),
 ('34', 3.42),
 ('40', 3.77),
 ('44', 3.35),
 ('45', 3.88),
 ('48', 4.03),
 ('50', 2.78),
 ('53', 5.0),
 ('54', 3.03),
 ('56', 3.8),
 ('57', 3.39),
 ('60', 3.73),
 ('63', 3.63),
 ('64', 3.77),
 ('66', 4.02),
 ('68', 3.23),
 ('69', 4.37),
 ('70', 4.32),
 ('73', 3.71),
 ('74', 4.27),
 ('77', 4.0),
 ('82', 3.38),
 ('83', 3.31),
 ('84', 3.69),
 ('86', 3.93),
 ('88', 4.04),
 ('91', 3.4),
 ('93', 4.29),
 ('96', 3.88),
 ('98', 3.91),
 ('100', 3.95),
 ('102', 3.36),
 ('106', 4.44),
 ('107', 3.91),
 ('108', 3.99),
 ('110', 3.73),
 ('111', 3.34),
 ('112', 3.59),
 ('113', 3.65),
 ('115', 3.77),
 ('116', 3.44),
 ('119', 4.18),
 ('121', 3.29),
 ('122', 4.55),
 ('128', 4.36),
 ('130', 3.54),
 ('132', 3.04),
 ('134', 3.57),
 ('138', 3.52),
 ('139', 2.14),

In [38]:
# groupby key
rating_grouped = ratings.groupByKey()
rating_grouped.count()

610

In [40]:
# average ratign
average_rating = rating_grouped.mapValues(lambda x: sum(x)/len(x))
average_rating.take(2)

[('1', 4.366379310344827), ('4', 3.5555555555555554)]

In [42]:
# for reduce by key datatype should be same
def create_pair(rating_record):
    record = rating_record.split(",")
    return (record[0],(float(record[2]),1))

ratings = rating.map(create_pair)
ratings.take(2)

[('1', (4.0, 1)), ('1', (4.0, 1))]

In [43]:
# reduce by key
sum_count = ratings.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
sum_count.count()

610

In [45]:
sum_count.take(3)

[('1', (1013.0, 232)), ('4', (768.0, 216)), ('8', (168.0, 47))]

In [44]:
average_rating = sum_count.mapValues(lambda x: x[0]/x[1])
average_rating.take(3)

[('1', 4.366379310344827), ('4', 3.5555555555555554), ('8', 3.574468085106383)]

In [6]:
# combine by key
ratings.take(3)

[('1', 4.0), ('1', 4.0), ('1', 4.0)]

In [7]:
sum_count = ratings.combineByKey((lambda x: (x,1)),
                    (lambda x,y:(x[0]+y,x[1]+1)),
                    (lambda x,y:(x[0]+y[0],x[1]+y[1])))

sum_count.count()

610

In [8]:
sum_count.take(3)

[('1', (1013.0, 232)), ('4', (768.0, 216)), ('8', (168.0, 47))]

In [9]:
average_rating = sum_count.mapValues(lambda x: x[0]/x[1])
average_rating.take(3)

[('1', 4.366379310344827), ('4', 3.5555555555555554), ('8', 3.574468085106383)]

In [10]:
spark.stop()