In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=56b8cef2c5a216b5365bc2c29974820afa1de291e0ff7da043d12c1ccb733c91
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
%cd drive/MyDrive/ASPD

/content/drive/MyDrive/ASPD


**Union**

In [5]:
def MapUnion(r):
  t = r.split()
  if len(t) != 2:
    return []
  return [(t[1], t[1])]

In [6]:
def ReducerUnion(r):
  keys, values = r[0], list(r[1])
  return values[0]

In [7]:
sc = SparkContext()

In [8]:
lines = sc.textFile('relations_small.txt')
lines.flatMap(MapUnion).groupByKey().map(ReducerUnion).collect()

['1', '4', '2', '3', '5', '6']

**INTERSECT**

In [9]:
def MapIntersection(r):
  t = r.split()
  if len(t) != 2:
    return []
  return [(t[1], t[1])]

In [10]:
def ReduceIntersection(r):
  key, values = r[0], list(r[1])
  if values == [key, key]:
    return [key]
  return []

In [11]:
lines = sc.textFile('relations_small.txt')
lines.flatMap(MapIntersection).groupByKey().flatMap(ReduceIntersection).collect()

['2', '3']

**EXCEPT**

In [12]:
def MapExcept(r):
  t = r.split()
  if len(t) != 2:
    return []
  return [(t[1], t[0])]

In [13]:
def ReducerExcept(r):
    key, values = r[0], list(r[1])
    if len(values) == 1:
        return (key)
    return []

In [14]:
lines = sc.textFile('relations_small.txt')
lines.flatMap(MapExcept).groupByKey().flatMap(ReducerExcept).collect()

['1', '4', '5', '6']

**INNER JOIN**

In [15]:
def MapInnerJoin(r):
  t = r.split('\t')
  if len(t) < 4:
    return []
  r = t[0]
  if r == 'Orders':
    a = [t[1], t[3]]
    b = t[2]
  if r == "Customers":
    b = t[1]
    a = t[2:]
  return [(b, (r, a))]

In [16]:
def ReduceInnerJoin(r):
  key, values = r[0], list(r[1])
  customers = []
  orders = []
  for value in values:
    if value[0] == 'Orders':
      orders.append(value[1])
    if value[0] == 'Customers':
      customers.append(value[1])
  return [o + [key] + c for o in orders for c in customers]

In [17]:
lines = sc.textFile('join_data.txt')
lines.flatMap(MapInnerJoin).groupByKey().flatMap(ReduceInnerJoin).collect()

[['10308',
  '2016-09-18',
  '1',
  'Alfreds Futterkiste ',
  'Maria Anders ',
  'Germany'],
 ['10309',
  '2016-09-19',
  '2',
  'Ana Trujillo Emparedados y helados ',
  'Ana Trujillo ',
  'Mexico'],
 ['10310',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10311',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10312',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico']]

**SELECT**

In [18]:
def MapSelect(r, condition):
  t = r.split(',')
  if (len(t) != 13) or (t[0] == 'id'):
      return []
  if condition(t[4]):
        return [t]
  return []

In [19]:
def ReduceSelect(r):
  return [r]

In [20]:
def condition(r):
  return r == "Female"

In [21]:
lines = sc.textFile('person.csv')
lines.flatMap(lambda r: MapSelect(r, condition)).flatMap(ReduceSelect).collect()

[['2',
  'Emlynne',
  'Gyurkovics',
  'egyurkovics1@answers.com',
  'Female',
  '18.159.250.171',
  'visa',
  'Franc',
  'Assistant Media Planner',
  'Yamia',
  'Guinea-Bissau',
  '2XL',
  '$54875.67'],
 ['3',
  'Giralda',
  'McKie',
  'gmckie2@aboutads.info',
  'Female',
  '203.158.164.220',
  'mastercard',
  'Peso',
  'Professor',
  'Npath',
  'Philippines',
  '3XL',
  '$215185.74'],
 ['4',
  'Jemima',
  'Bolsover',
  'jbolsover3@linkedin.com',
  'Female',
  '154.170.244.122',
  'visa-electron',
  'Sol',
  'Design Engineer',
  'Photojam',
  'Peru',
  '2XL',
  '$162409.22'],
 ['5',
  'Elayne',
  'Kose',
  'ekose4@webmd.com',
  'Female',
  '78.193.224.172',
  'jcb',
  'Rupiah',
  'Chemical Engineer',
  'Tanoodle',
  'Indonesia',
  'M',
  '$387727.59'],
 ['8',
  'Emmye',
  'Bushnell',
  'ebushnell7@imdb.com',
  'Female',
  '98.164.241.90',
  'visa-electron',
  'Rupiah',
  'Librarian',
  'Talane',
  'Indonesia',
  '2XL',
  '$314931.78'],
 ['9',
  'Cindelyn',
  'Wealleans',
  'cwealleans8

**PROJECTION**

In [22]:
def MapProjection(r, columns):
    t = r.split(',')
    if (len(t) != 13) or (t[0] == 'id'):
      return []
    return [r, [t[k] for k in columns]]

In [23]:
def ReduceProjection(r):
  return [r]

In [24]:
columns = [0, 1]

In [25]:
lines = sc.textFile('person.csv')
lines.flatMap(lambda r: MapProjection(r, columns)).flatMap(ReduceProjection).collect()

['1,Freddy,Jost,fjost0@marketwatch.com,Male,141.252.171.163,jcb,Rial,Design Engineer,Dynabox,Iran,L,$999471.82',
 ['1', 'Freddy'],
 '2,Emlynne,Gyurkovics,egyurkovics1@answers.com,Female,18.159.250.171,visa,Franc,Assistant Media Planner,Yamia,Guinea-Bissau,2XL,$54875.67',
 ['2', 'Emlynne'],
 '3,Giralda,McKie,gmckie2@aboutads.info,Female,203.158.164.220,mastercard,Peso,Professor,Npath,Philippines,3XL,$215185.74',
 ['3', 'Giralda'],
 '4,Jemima,Bolsover,jbolsover3@linkedin.com,Female,154.170.244.122,visa-electron,Sol,Design Engineer,Photojam,Peru,2XL,$162409.22',
 ['4', 'Jemima'],
 '5,Elayne,Kose,ekose4@webmd.com,Female,78.193.224.172,jcb,Rupiah,Chemical Engineer,Tanoodle,Indonesia,M,$387727.59',
 ['5', 'Elayne'],
 '6,Nev,Spritt,nspritt5@posterous.com,Male,226.158.41.171,jcb,Rupiah,Geologist IV,Chatterbridge,Indonesia,L,$96470.70',
 ['6', 'Nev'],
 '7,Emmerich,Story,estory6@1und1.de,Male,238.11.55.197,china-unionpay,Koruna,Media Manager II,Skibox,Czech Republic,L,$176811.22',
 ['7', 'Emmeri

**AGGREGATON**

In [26]:
def MapAggregation(r):
  t = r.split(',')
  if (len(t) != 13) or (t[0] == 'id'):
    return []
  a = t[8]
  b = t[-1]
  return [(a, float(b[1:]))]

In [27]:
def ReduceAggregation(r):
  key, values = r[0], list(r[1])
  return [(key, sum(values))]

In [28]:
lines = sc.textFile('person.csv')
lines.flatMap(MapAggregation).groupByKey().flatMap(ReduceAggregation).collect()

[('Professor', 1877206.2800000003),
 ('Chemical Engineer', 6089887.490000001),
 ('Geologist IV', 478996.45999999996),
 ('Librarian', 5563675.890000001),
 ('Speech Pathologist', 3923419.64),
 ('Senior Quality Engineer', 3466124.1699999995),
 ('Structural Engineer', 5555297.13),
 ('Staff Scientist', 3435566.750000001),
 ('Web Developer II', 2534010.0300000003),
 ('Physical Therapy Assistant', 4262946.720000001),
 ('Payment Adjustment Coordinator', 6317398.769999999),
 ('Assistant Manager', 8059390.07),
 ('Safety Technician IV', 486140.06999999995),
 ('Sales Associate', 2059877.3499999999),
 ('Programmer Analyst IV', 371068.24),
 ('Junior Executive', 5946395.1899999995),
 ('Cost Accountant', 2296997.76),
 ('Office Assistant III', 1846546.6800000002),
 ('Marketing Assistant', 6555210.6899999995),
 ('Computer Systems Analyst IV', 850368.63),
 ('Tax Accountant', 5504577.03),
 ('Marketing Manager', 6208292.450000001),
 ('Information Systems Manager', 3931341.5299999993),
 ('General Manager', 

**LEFT OUTER JOIN**

In [29]:
def MapLeftOuterJoin(r):
    t = r.split('\t')
    if len(t) < 4:
        return []
    r = t[0]
    if r == 'Orders':
        a = [t[1], t[3]]
        b = t[2]
    if r == "Customers":
        b = t[1]
        a = t[2:]
    return [(b, (r, a))]

In [30]:
def ReduceLeftOuterJoin(r):
    key, values = r[0], list(r[1])
    customers = []
    orders = []
    for value in values:
        if value[0] == 'Orders':
            orders.append(value[1])
        if value[0] == 'Customers':
            customers.append(value[1])
    if len(orders) == 0:
        orders.append(['', ''])
    return [o + [key] + c for o in orders for c in customers]

In [31]:
lines = sc.textFile('join_data.txt')
lines.flatMap(MapLeftOuterJoin).groupByKey().flatMap(ReduceLeftOuterJoin).collect()

[['10308',
  '2016-09-18',
  '1',
  'Alfreds Futterkiste ',
  'Maria Anders ',
  'Germany'],
 ['', '', '4', 'NO ORDERS', 'NO ORDERS', 'NO ORDERS'],
 ['10309',
  '2016-09-19',
  '2',
  'Ana Trujillo Emparedados y helados ',
  'Ana Trujillo ',
  'Mexico'],
 ['10310',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10311',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10312',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico']]

**RIGHT OUTER JOIN**

In [32]:
def MapRightOuterJoin(r):
    t = r.split('\t')
    if len(t) < 4:
        return []
    r = t[0]
    if r == 'Orders':
        a = [t[1], t[3]]
        b = t[2]
    if r == "Customers":
        b = t[1]
        a = t[2:]
    return [(b, (r, a))]

In [33]:
def ReduceRightOuterJoin(r):
    key, values = r[0], list(r[1])
    customers = []
    orders = []
    for value in values:
        if value[0] == 'Orders':
            orders.append(value[1])
        if value[0] == 'Customers':
            customers.append(value[1])
    if len(customers) == 0:
        customers.append(['', '', ''])
    return [o + [key] + c for o in orders for c in customers]

In [34]:
lines = sc.textFile('join_data.txt')
lines.flatMap(MapRightOuterJoin).groupByKey().flatMap(ReduceRightOuterJoin).collect()

[['10308',
  '2016-09-18',
  '1',
  'Alfreds Futterkiste ',
  'Maria Anders ',
  'Germany'],
 ['10309',
  '2016-09-19',
  '2',
  'Ana Trujillo Emparedados y helados ',
  'Ana Trujillo ',
  'Mexico'],
 ['10310',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10311',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10312',
  '2016-09-20',
  '3',
  'Antonio Moreno Taquería ',
  'Antonio Moreno ',
  'Mexico'],
 ['10313', '2016-09-20', '5', '', '']]