In [129]:
!pip install pyspark
!pip install py4j



In [130]:
from google.colab import drive

import sys, os
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder\
        .master("local")\
        .appName('A')\
        .getOrCreate()

In [131]:
# 2.3.1. Умножение матрицы на вектор с применением MapReduce
def mult_matrix_vector(mat_rdd, vect_rdd):
    qty = mat_rdd.keys().distinct().count()
    print("qty", qty)
    rdd1 = vect_rdd.flatMap(lambda x: [((j, x[0]), (x[1])) for j in range(qty)])
    print("rdd1", rdd1.take(10))
    rdd2 = mat_rdd + rdd1
    print("rdd2", rdd2.take(10))
    result = rdd2.reduceByKey(lambda x, y: x * y).map(lambda x: (x[0][0], x[1])).reduceByKey(lambda x, y: x + y)
    print("result", result.take(10))
    return result

In [132]:
mat_rdd = spark.read.csv('matrix.csv', header=True).rdd
mat_rdd = mat_rdd.map(lambda x: ((int(x[0]), int(x[1])), float(x[2])))
print("mat_rdd", mat_rdd.take(20))
vect_rdd = spark.read.csv('vector.csv', header=True).rdd
vect_rdd = vect_rdd.map(lambda x: (int(x[0]), float(x[1])))
print("vect_rdd", vect_rdd.take(20))

mat_rdd [((0, 0), 6.0), ((0, 1), 4.0), ((0, 2), 3.0), ((0, 3), 3.0), ((0, 4), 1.0), ((0, 5), 3.0), ((0, 6), 5.0), ((0, 7), 3.0), ((0, 8), 6.0), ((0, 9), 9.0), ((1, 0), 5.0), ((1, 1), 1.0), ((1, 2), 9.0), ((1, 3), 4.0), ((1, 4), 5.0), ((1, 5), 9.0), ((1, 6), 10.0), ((1, 7), 8.0), ((1, 8), 7.0), ((1, 9), 3.0)]
vect_rdd [(0, 10.0), (1, 10.0), (2, 10.0), (3, 10.0), (4, 10.0), (5, 10.0), (6, 10.0), (7, 10.0), (8, 10.0), (9, 10.0)]


In [133]:
mult_matrix_vector(mat_rdd, vect_rdd)

qty 100
rdd1 [((0, 0), 10.0), ((1, 0), 10.0), ((2, 0), 10.0), ((3, 0), 10.0), ((4, 0), 10.0), ((5, 0), 10.0), ((6, 0), 10.0), ((7, 0), 10.0), ((8, 0), 10.0), ((9, 0), 10.0)]
rdd2 [((0, 0), 6.0), ((0, 1), 4.0), ((0, 2), 3.0), ((0, 3), 3.0), ((0, 4), 1.0), ((0, 5), 3.0), ((0, 6), 5.0), ((0, 7), 3.0), ((0, 8), 6.0), ((0, 9), 9.0)]
result [(0, 430.0), (2, 660.0), (4, 700.0), (6, 670.0), (8, 750.0), (10, 100.0), (12, 100.0), (14, 100.0), (16, 100.0), (18, 100.0)]


PythonRDD[1060] at RDD at PythonRDD.scala:53

In [134]:
# 2.3.4. Вычисление выборки с помощью MapReduce
def selection(sel_rdd, condition):
    rdd1 = sel_rdd.map(lambda x: (int(x[0]), x[1]))
    print("rdd1", rdd1.take(10))
    rdd2 = rdd1.flatMap(lambda x: [(x[1], x[1])] if condition(x[1]) else [])
    print("rdd2",  rdd2.take(10))
    result = rdd2.reduceByKey(lambda x, y: x)
    print("result", result.take(10))
    return result

In [135]:
sel_rdd = spark.read.csv("abc.csv", header=True).rdd
print("sel_rdd", sel_rdd.take(10))

sel_rdd [Row(value='1', c1='X'), Row(value='2', c1='A'), Row(value='3', c1='Q'), Row(value='4', c1='E'), Row(value='5', c1='Y'), Row(value='6', c1='K'), Row(value='7', c1='R'), Row(value='8', c1='A'), Row(value='9', c1='B')]


In [136]:
selection(sel_rdd, lambda x: x == 'X' or x == 'Y')

rdd1 [(1, 'X'), (2, 'A'), (3, 'Q'), (4, 'E'), (5, 'Y'), (6, 'K'), (7, 'R'), (8, 'A'), (9, 'B')]
rdd2 [('X', 'X'), ('Y', 'Y')]
result [('X', 'X'), ('Y', 'Y')]


PythonRDD[1084] at RDD at PythonRDD.scala:53

In [137]:
# 2.3.5. Вычисление проекции с помощью MapReduce
def projection(projection_rdd, index):
    rdd1 = projection_rdd.map(lambda x: (tuple([x[i] for i in index]), tuple([x[i] for i in index])))
    print("rdd1", rdd1.take(10))
    result = rdd1.reduceByKey(lambda x, y: x)
    print("result", result.take(10))
    return result

In [138]:
projection_rdd = spark.read.csv("proj.csv", header=True).rdd
print("projection_rdd =", projection_rdd.take(10))

projection_rdd = [Row(id='0', value='21', class1='A', class2='A'), Row(id='1', value='22', class1='W', class2='T'), Row(id='2', value='23', class1='Q', class2='E'), Row(id='3', value='24', class1='E', class2='R'), Row(id='4', value='25', class1='R', class2='A'), Row(id='5', value='26', class1='B', class2='B'), Row(id='6', value='77', class1='C', class2='C'), Row(id='7', value='88', class1='A', class2='C'), Row(id='8', value='99', class1='B', class2='B')]


In [139]:
projection(projection_rdd, [2, 2])

rdd1 [(('A', 'A'), ('A', 'A')), (('W', 'W'), ('W', 'W')), (('Q', 'Q'), ('Q', 'Q')), (('E', 'E'), ('E', 'E')), (('R', 'R'), ('R', 'R')), (('B', 'B'), ('B', 'B')), (('C', 'C'), ('C', 'C')), (('A', 'A'), ('A', 'A')), (('B', 'B'), ('B', 'B'))]
result [(('A', 'A'), ('A', 'A')), (('W', 'W'), ('W', 'W')), (('Q', 'Q'), ('Q', 'Q')), (('E', 'E'), ('E', 'E')), (('R', 'R'), ('R', 'R')), (('B', 'B'), ('B', 'B')), (('C', 'C'), ('C', 'C'))]


PythonRDD[1107] at RDD at PythonRDD.scala:53

In [140]:
first_rdd = spark.read.csv("1st.csv", header=True).rdd
print("first_rdd", first_rdd.take(10))
second_rdd = spark.read.csv("2nd.csv", header=True).rdd
print("second_rdd", second_rdd.take(10))

first_rdd [Row(i='0', value='10'), Row(i='1', value='10'), Row(i='2', value='20'), Row(i='3', value='30'), Row(i='4', value='40')]
second_rdd [Row(i='0', value='30'), Row(i='1', value='40'), Row(i='2', value='50'), Row(i='3', value='60'), Row(i='4', value='90')]


In [141]:
# 2.3.6. Вычисление объединения с помощью MapReduce
def union(first_rdd, second_rdd):
    rdd1 = first_rdd + second_rdd
    print("rdd1", rdd1.take(10))
    rdd2 = rdd1.map(lambda x: (x[1], x[1]))
    print("rdd2", rdd2.take(10))
    result = rdd2.reduceByKey(lambda x, y: x)
    print("result", result.take(10))
    return result

In [142]:
union(first_rdd, second_rdd)

rdd1 [Row(i='0', value='10'), Row(i='1', value='10'), Row(i='2', value='20'), Row(i='3', value='30'), Row(i='4', value='40'), Row(i='0', value='30'), Row(i='1', value='40'), Row(i='2', value='50'), Row(i='3', value='60'), Row(i='4', value='90')]
rdd2 [('10', '10'), ('10', '10'), ('20', '20'), ('30', '30'), ('40', '40'), ('30', '30'), ('40', '40'), ('50', '50'), ('60', '60'), ('90', '90')]
result [('10', '10'), ('20', '20'), ('40', '40'), ('50', '50'), ('60', '60'), ('30', '30'), ('90', '90')]


PythonRDD[1151] at RDD at PythonRDD.scala:53

In [143]:
# 2.3.6. Вычисление пересечения с помощью MapReduce
def intersection(first_rdd, second_rdd):
    first_rdd = first_rdd.map(lambda x: (x[1], '1'))
    second_rdd = second_rdd.map(lambda x: (x[1], '2'))
    r = first_rdd + second_rdd
    result = r.groupByKey().flatMap(lambda x: [(x[0], set(tuple(x[1])))])
    result = result.flatMap(lambda x: [x[0]] if len(x[1]) > 1 else [])
    print("result", result.take(10))
    return result

In [144]:
intersection(first_rdd, second_rdd)

result ['40', '30']


PythonRDD[1161] at RDD at PythonRDD.scala:53

In [145]:
# 2.3.6. Вычисление разности с помощью MapReduce
def difference(first_rdd, second_rdd):
    a_rdd2 = first_rdd.map(lambda x: (x[1], 0))
    print("a_rdd2", a_rdd2.take(10))
    b_rdd2 = second_rdd.map(lambda x: (x[1], 1))
    print("b_rdd2", b_rdd2.take(10))
    rdd1 = a_rdd2 + b_rdd2
    print("rdd1", rdd1.take(10))
    group = rdd1.groupByKey()
    print("group", group.take(10))
    result = group.flatMap(lambda x: [(x[0], x[0])] if sum(x[1]) == 0 else [])
    #print("result", result.take(10))
    return result

In [146]:
difference(first_rdd, second_rdd)

a_rdd2 [('10', 0), ('10', 0), ('20', 0), ('30', 0), ('40', 0)]
b_rdd2 [('30', 1), ('40', 1), ('50', 1), ('60', 1), ('90', 1)]
rdd1 [('10', 0), ('10', 0), ('20', 0), ('30', 0), ('40', 0), ('30', 1), ('40', 1), ('50', 1), ('60', 1), ('90', 1)]
group [('10', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31a10>), ('20', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31610>), ('40', <pyspark.resultiterable.ResultIterable object at 0x7f4963f312d0>), ('50', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31710>), ('60', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31c50>), ('30', <pyspark.resultiterable.ResultIterable object at 0x7f4963f29b10>), ('90', <pyspark.resultiterable.ResultIterable object at 0x7f4963f29310>)]


PythonRDD[1175] at RDD at PythonRDD.scala:53

In [147]:
join1_rdd = spark.read.csv('join1.csv', header=True).rdd
join1_rdd = join1_rdd.map(lambda x: (int(x[0]), (x[1], x[2])))
print("join1_rdd", join1_rdd.take(10))
join2_rdd = spark.read.csv('join2.csv', header=True).rdd
join2_rdd = join2_rdd.map(lambda x: (int(x[0]), (x[1], x[2])))
print("join2_rdd", join2_rdd.take(10))

join1_rdd [(0, ('1', 'A')), (1, ('2', 'C')), (2, ('3', 'B')), (3, ('4', 'Q')), (4, ('5', 'W')), (5, ('6', 'S')), (6, ('7', 'A')), (7, ('8', 'B')), (8, ('9', 'D'))]
join2_rdd [(0, ('A', 'QW')), (1, ('B', 'WE')), (2, ('Q', 'ER')), (3, ('E', 'RT')), (4, ('R', 'TY')), (5, ('T', 'YU')), (6, ('N', 'UI')), (7, ('G', 'IO')), (8, ('L', 'OP'))]


In [89]:
# 2.3.7. Вычисление естественного соединения с помощью MapReduce
import itertools
def join(a_rdd, b_rdd):
    def combine(l):
        a = []
        b = []
        for item in l:
            if item[0] == 0:
                a += [item[1]]
            else:
                b += [item[1]]
        return list(itertools.product(a, b))
    a_rdd2 = a_rdd.map(lambda x: (x[1][1], (0, x[1][0])))
    print("a_rdd2", a_rdd2.take(10))
    b_rdd2 = b_rdd.map(lambda x: (x[1][0], (1, x[1][1])))
    print("b_rdd2", b_rdd2.take(10))
    rdd1 = a_rdd2 + b_rdd2
    print("rdd1", rdd1.take(10))
    group = rdd1.groupByKey()
    print("group", group.take(10))
    result = group.flatMap(lambda x: [(x[0], item) for item in combine(list(x[1]))])
    print("result", result.take(10))
    return result

In [148]:
join(join1_rdd, join2_rdd)

a_rdd2 [('A', (0, '1')), ('C', (0, '2')), ('B', (0, '3')), ('Q', (0, '4')), ('W', (0, '5')), ('S', (0, '6')), ('A', (0, '7')), ('B', (0, '8')), ('D', (0, '9'))]
b_rdd2 [('A', (1, 'QW')), ('B', (1, 'WE')), ('Q', (1, 'ER')), ('E', (1, 'RT')), ('R', (1, 'TY')), ('T', (1, 'YU')), ('N', (1, 'UI')), ('G', (1, 'IO')), ('L', (1, 'OP'))]
rdd1 [('A', (0, '1')), ('C', (0, '2')), ('B', (0, '3')), ('Q', (0, '4')), ('W', (0, '5')), ('S', (0, '6')), ('A', (0, '7')), ('B', (0, '8')), ('D', (0, '9')), ('A', (1, 'QW'))]
group [('C', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31e50>), ('W', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31210>), ('S', <pyspark.resultiterable.ResultIterable object at 0x7f4963f7a550>), ('R', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31ed0>), ('N', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31810>), ('L', <pyspark.resultiterable.ResultIterable object at 0x7f4963f31050>), ('A', <pyspark.resultiterable.ResultIterable ob

PythonRDD[1223] at RDD at PythonRDD.scala:53

In [149]:
agr_rdd = spark.read.csv("agr.csv", header=True).rdd
print("agr_rdd", agr_rdd.take(10))

agr_rdd [Row(id='0', class='A', value='1'), Row(id='1', class='B', value='2'), Row(id='2', class='C', value='3'), Row(id='3', class='A', value='1'), Row(id='4', class='B', value='2'), Row(id='5', class='C', value='3'), Row(id='6', class='A', value='1')]


In [150]:
# 2.3.8. Вычисление группировки и агрегирования с помощью MapReduce
def aggregate(agg_rdd, aggregator):
    rdd1 = agg_rdd.map(lambda x: (x[1], float(x[2])))
    print("rdd1", rdd1.take(10))
    result  = rdd1.reduceByKey(lambda x, y: aggregator(x, y))
    print("result", result .take(10))
    return result

In [151]:
aggregate(agr_rdd, lambda x, y: x + y)

rdd1 [('A', 1.0), ('B', 2.0), ('C', 3.0), ('A', 1.0), ('B', 2.0), ('C', 3.0), ('A', 1.0)]
result [('A', 3.0), ('B', 4.0), ('C', 6.0)]


PythonRDD[1246] at RDD at PythonRDD.scala:53