In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("my second app")
sc = SparkContext(conf = conf)


In [2]:
rdd1 = sc.textFile("myTextFile.txt")
print("rdd1.collect() =", rdd1.collect())
pairs = rdd1.map(lambda x: (x.split(" ")[0], x))
print("pairs.collect() =", pairs.collect())
print("123"[2])

rdd1.collect() = ['key1 word1', 'key2 word2', '']
pairs.collect() = [('key1', 'key1 word1'), ('key2', 'key2 word2'), ('', '')]
3


In [3]:
exampleRdd = sc.parallelize([[1, 2], [3, 4], [3, 6], [3, 7]])
print("exampleRdd.collect() =", exampleRdd.collect())
examplePairs = exampleRdd.map(lambda x: (x[0], x[1]))
print("examplePairs.collect() =", examplePairs.collect())

exampleRdd.collect() = [[1, 2], [3, 4], [3, 6], [3, 7]]
examplePairs.collect() = [(1, 2), (3, 4), (3, 6), (3, 7)]


In [4]:
newRdd = examplePairs.reduceByKey(lambda x, y: x + y)
print("newRdd.collect() =", newRdd.collect())

newRdd.collect() = [(1, 2), (3, 17)]


In [5]:
newRdd2 = examplePairs.groupByKey()
print("newRdd2.collect() =", newRdd2.collect())
groupedRdd = newRdd2.map(lambda x: (x[0], list(x[1])))
print("groupedRdd.collect() =", groupedRdd.collect())

newRdd2.collect() = [(1, <pyspark.resultiterable.ResultIterable object at 0x0000018424991B00>), (3, <pyspark.resultiterable.ResultIterable object at 0x0000018424991AC8>)]
groupedRdd.collect() = [(1, [2]), (3, [4, 6, 7])]


In [6]:
newRdd3 = examplePairs.mapValues(lambda x: x + 1)
print("newRdd3.collect() =", newRdd3.collect())

newRdd3.collect() = [(1, 3), (3, 5), (3, 7), (3, 8)]


In [7]:
newRdd4 = examplePairs.map(lambda x: (x[0], x[1] + 1))
print("newRdd4.collect() =", newRdd4.collect())

newRdd4.collect() = [(1, 3), (3, 5), (3, 7), (3, 8)]


In [8]:
print("examplePairs.collect():", examplePairs.collect())
newRdd5 = examplePairs.flatMap(lambda x: (x[0], x[1] + 1))
print("newRdd5.collect() =", newRdd5.collect())

examplePairs.collect(): [(1, 2), (3, 4), (3, 6), (3, 7)]
newRdd5.collect() = [1, 3, 3, 5, 3, 7, 3, 8]


In [9]:
print("examplePairs.collect():", examplePairs.collect())
newRdd6 = examplePairs.keys()
print("newRdd6.collect() =", newRdd6.collect())

examplePairs.collect(): [(1, 2), (3, 4), (3, 6), (3, 7)]
newRdd6.collect() = [1, 3, 3, 3]


In [10]:
# now I want to suppress duplicates
listWithDuplicates = [1, 3, 3, 3]
mySet = set(listWithDuplicates)
print("mySet =", mySet)
myList = list(mySet)
print("myList =", myList)


mySet = {1, 3}
myList = [1, 3]


In [11]:
print("examplePairs.collect():", examplePairs.collect())
newRdd7 = examplePairs.values()
print("newRdd7.collect() =", newRdd7.collect())

examplePairs.collect(): [(1, 2), (3, 4), (3, 6), (3, 7)]
newRdd7.collect() = [2, 4, 6, 7]


In [12]:
unsortedKeyValueList = [[1,1], [3,3], [2,2] ]
unsortedRdd = sc.parallelize(unsortedKeyValueList)
print("unsortedRdd.collect() =", unsortedRdd.collect())
unsortedPairsRdd = unsortedRdd.map(lambda x: (x[0], x[1]))
print("unsortedPairsRdd.collect() =", unsortedPairsRdd.collect())
sortedPairsRdd = unsortedPairsRdd.sortByKey()
print("sortedPairsRdd.collect() =", sortedPairsRdd.collect())


unsortedRdd.collect() = [[1, 1], [3, 3], [2, 2]]
unsortedPairsRdd.collect() = [(1, 1), (3, 3), (2, 2)]
sortedPairsRdd.collect() = [(1, 1), (2, 2), (3, 3)]


In [13]:
list1 = [[1,1], [3,3], [2,2]]
list2 = [[4,4], [2,22]]
rdd1 = sc.parallelize(list1)
rdd2 = sc.parallelize(list2)
pairsRdd1 = rdd1.map(lambda x: (x[0], x[1]))
pairsRdd2 = rdd2.map(lambda x: (x[0], x[1]))
subtractedRdd = pairsRdd1.subtractByKey(pairsRdd2)
print("subtractedRdd.collect() =", subtractedRdd.collect())



subtractedRdd.collect() = [(1, 1), (3, 3)]


In [14]:
joinedRdd = pairsRdd1.join(pairsRdd2)
print("joinedRdd.collect() =", joinedRdd.collect())


joinedRdd.collect() = [(2, (2, 22))]


In [15]:
rightOuterJoinRdd = pairsRdd1.rightOuterJoin(pairsRdd2)
print("rightOuterJoinRdd.collect() =", rightOuterJoinRdd.collect())
leftOuterJoinRdd = pairsRdd1.leftOuterJoin(pairsRdd2)
print("leftOuterJoinRdd.collect() =", leftOuterJoinRdd.collect())


rightOuterJoinRdd.collect() = [(2, (2, 22)), (4, (None, 4))]
leftOuterJoinRdd.collect() = [(2, (2, 22)), (1, (1, None)), (3, (3, None))]


In [16]:
print("pairsRdd1.collect() =", pairsRdd1.collect())
print("pairsRdd2.collect() =", pairsRdd2.collect())
cogroupRdd = pairsRdd1.cogroup(pairsRdd2)
print("cogroupRdd.collect() =", cogroupRdd.collect())

complicated_construction = [(x, tuple(map(list, y))) for x, y in list(cogroupRdd.collect())]
print(complicated_construction)

pairsRdd1.collect() = [(1, 1), (3, 3), (2, 2)]
pairsRdd2.collect() = [(4, 4), (2, 22)]
cogroupRdd.collect() = [(2, (<pyspark.resultiterable.ResultIterable object at 0x00000184249AF9E8>, <pyspark.resultiterable.ResultIterable object at 0x00000184249AF198>)), (4, (<pyspark.resultiterable.ResultIterable object at 0x00000184249AF2B0>, <pyspark.resultiterable.ResultIterable object at 0x00000184249AF8D0>)), (1, (<pyspark.resultiterable.ResultIterable object at 0x00000184249AF208>, <pyspark.resultiterable.ResultIterable object at 0x00000184249AF780>)), (3, (<pyspark.resultiterable.ResultIterable object at 0x00000184249AF668>, <pyspark.resultiterable.ResultIterable object at 0x00000184249AF0B8>))]
[(2, ([2], [22])), (4, ([], [4])), (1, ([1], [])), (3, ([3], []))]


In [17]:
# [(x, tuple(map(list, y))) for x, y in list(cogroupRdd.collect())]
myPairList = [(1, [[11, 12], [13, 14]]), (2, [[22, 23], [24, 25]]), (3, [[33, 34], [35, 36]])]
newList = []
for pair in myPairList:
    print ("pair = ", pair)
    key, value = pair[0], pair[1]
    print ("key = ", key, ", value =", value)
    newList.append((key, value))
print("newList =", newList)
simpleList = [1, 2, 3]
print("simpleList =", simpleList)
simpleTuple = tuple(simpleList)
print("simpleTuple =", simpleTuple)
listAgain = list(simpleTuple)
print("listAgain =", listAgain)
(coordinate1, coordinate2, coordinate3) = simpleTuple
print("coordinate1 =", coordinate1, "coordinate2 =", coordinate2, "coordinate3 =", coordinate3)
simpleSimpleTuple = (1, 2, 3)
print("type(simpleSimpleTuple) =", type(simpleSimpleTuple))

print("simpleSimpleTuple =", simpleSimpleTuple)
# map function
# instead of list function use map, to make a list out of tuple
myMapList = map(list, simpleSimpleTuple)
print("myMapList =", myMapList)
# the above needs adjustment
simpleList = [1, 2, 3]
listPlus1 = list(map(lambda x: x+1, simpleList))
print("listPlus1 =", listPlus1)
# may be in a new implementation they changed it, and they return the object map
onePlusTwo = list(map(lambda x, y, z: x+y+z, [1, 2, 3], [11, 22], [111, 222, 333, 444]))
print("listSum =", onePlusTwo)
myGeneratorList = [x**2 for x in [1, 2]]
print("myGeneratorList =", myGeneratorList)


pair =  (1, [[11, 12], [13, 14]])
key =  1 , value = [[11, 12], [13, 14]]
pair =  (2, [[22, 23], [24, 25]])
key =  2 , value = [[22, 23], [24, 25]]
pair =  (3, [[33, 34], [35, 36]])
key =  3 , value = [[33, 34], [35, 36]]
newList = [(1, [[11, 12], [13, 14]]), (2, [[22, 23], [24, 25]]), (3, [[33, 34], [35, 36]])]
simpleList = [1, 2, 3]
simpleTuple = (1, 2, 3)
listAgain = [1, 2, 3]
coordinate1 = 1 coordinate2 = 2 coordinate3 = 3
type(simpleSimpleTuple) = <class 'tuple'>
simpleSimpleTuple = (1, 2, 3)
myMapList = <map object at 0x00000184249ABF60>
listPlus1 = [2, 3, 4]
listSum = [123, 246]
myGeneratorList = [1, 4]


In [18]:
numStringPairsList = [[1,"1"], [3,"3"], [2,"123456789012345678901"]]
numStringRdd = sc.parallelize(numStringPairsList)
numStringPairsRdd = numStringRdd.map(lambda x: (x[0], x[1]))
result = numStringPairsRdd.filter(lambda keyValue: len(keyValue[1]) > 20)
print("result.collect() =", result.collect())


result.collect() = [(2, '123456789012345678901')]


In [19]:
stringNumPairsList = [["panda", 0], ["pink", 3], ["pirate", 3], ["panda", 1], ["pink", 4]]
stringNumRdd = sc.parallelize(stringNumPairsList)
stringNumPairsRdd = stringNumRdd.map(lambda x: (x[0], x[1]))
keyValue1Rdd = stringNumPairsRdd.mapValues(lambda x: (x, 1))
print("keyValue1Rdd.collect() =", keyValue1Rdd.collect())
reducedRdd = keyValue1Rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
print("reducedRdd.collect() =", reducedRdd.collect())
averagedRdd = reducedRdd.mapValues(lambda x: (x[0] / x[1]))
print("averagedRdd.collect() =", averagedRdd.collect())


keyValue1Rdd.collect() = [('panda', (0, 1)), ('pink', (3, 1)), ('pirate', (3, 1)), ('panda', (1, 1)), ('pink', (4, 1))]
reducedRdd.collect() = [('panda', (1, 2)), ('pink', (7, 2)), ('pirate', (3, 1))]
averagedRdd.collect() = [('panda', 0.5), ('pink', 3.5), ('pirate', 3.0)]


In [20]:
numList = [1, 2, 3]
numRdd = sc.parallelize(numList)
sumCount = numRdd.aggregate((0, 0),
                            (lambda acc, value: (acc[0] + value, acc[1] + 1)),
                             (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
print("sumCount.collect() =", sumCount)
newVar = sumCount[0] / float(sumCount[1])
print("newVar =", newVar)

sumCount.collect() = (6, 3)
newVar = 2.0
