# Dados de Entrada
* Selecione "Adicionar ao Drive"

* Dados adicionados na aula anterior:
  * https://tinyurl.com/bigdata-gut-pt
  * https://tinyurl.com/bigdata-amz

* Dados novos:
  * https://tinyurl.com/bd-phash

  




## Acesso ao Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Setup

## Instalação de pacotes

In [2]:
!apt-get update  > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install findspark pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d9c240faac8f95445fbe3cdd71b29c5c22e29b1290eae52da44d4f4c5625a4eb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: findspark, pyspark
Successfully installed findspark-2.0.1 pyspark-3.5.1


## Preparação do ambiente

In [3]:
%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
%env SPARK_HOME=/content/spark-3.5.1-bin-hadoop3

env: PYTHONHASHSEED=1234
env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
env: SPARK_HOME=/content/spark-3.5.1-bin-hadoop3


In [4]:
import findspark
findspark.init("/content/spark-3.5.1-bin-hadoop3")

In [5]:
from pyspark.sql import SparkSession

appName = 'Big Data'
master = 'local[*]'

spark = SparkSession.builder     \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Revisão

In [6]:
import re

In [7]:
input_data = spark.sparkContext.textFile('/content/drive/My Drive/gut-pt/small/*')


In [8]:
input_data.take(10)

['The Project Gutenberg EBook of Noites de insomnia, offerecidas a quem não',
 'póde dormir. Nº6 (de 12), by Camilo Castelo Branco',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '',
 'Title: Noites de insomnia, offerecidas a quem não póde dormir. Nº6 (de 12)']

In [9]:
wc = input_data.map(lambda line: re.sub('[^a-zà-ù ]', ' ', line.lower()))  \
    .flatMap(lambda line: line.split())  \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda acc, v: acc + v)

In [10]:
wc.take(10)

[('project', 437),
 ('ebook', 55),
 ('de', 7377),
 ('quem', 222),
 ('n', 278),
 ('camilo', 3),
 ('branco', 23),
 ('restrictions', 10),
 ('may', 75),
 ('org', 61)]

## Cálculo de Média

In [11]:
input_data = spark.sparkContext.textFile('/content/drive/My Drive/amz/small.csv')


In [12]:
input_data.take(10)

['0020232233,A1IDMI31WEANAF,2.0,1474502400',
 '0020232233,A4BCEVVZ4Y3V3,1.0,1474156800',
 '0020232233,A2EZ9PY1IHHBX0,3.0,1473638400',
 '0020232233,A139PXTTC2LGHZ,5.0,1488412800',
 '0020232233,A3IB33V29XIL8O,1.0,1486512000',
 '0020232233,A1J86V48S4KRJE,5.0,1485475200',
 '0020232233,A14J12PRBLGHF4,5.0,1483315200',
 '0020232233,A2UKOWP9ICU416,5.0,1481932800',
 '0020232233,A2ONKKDETRWT79,4.0,1481760000',
 '0020232233,AK9GN9KZZNTEP,3.0,1481241600']

In [13]:
def process_line(line) :
  cod,user,eval,time = line.split(',')
  eval = float(eval)
  return (cod, (eval, 1))

In [14]:
evals = input_data.map(process_line)

In [15]:
evals.take(10)

[('0020232233', (2.0, 1)),
 ('0020232233', (1.0, 1)),
 ('0020232233', (3.0, 1)),
 ('0020232233', (5.0, 1)),
 ('0020232233', (1.0, 1)),
 ('0020232233', (5.0, 1)),
 ('0020232233', (5.0, 1)),
 ('0020232233', (5.0, 1)),
 ('0020232233', (4.0, 1)),
 ('0020232233', (3.0, 1))]

In [16]:
def acc_evals(acc, v) :
  evals_sum = acc[0]
  evals_count = acc[1]
  v_sum = v[0]
  v_count = v[1]
  evals_sum = evals_sum + v_sum
  evals_count = evals_count + v_count
  return (evals_sum, evals_count)

In [17]:
totals = evals.reduceByKey(acc_evals)

In [18]:
totals.take(10)

[('0020232233', (49.0, 13)),
 ('038536539X', (9.0, 3)),
 ('0486402029', (18.0, 6)),
 ('0486427706', (144.0, 30)),
 ('0486448789', (346.0, 87)),
 ('0545561647', (767.0, 193)),
 ('0641869665', (136.0, 32)),
 ('0735331146', (83.0, 17)),
 ('0769658237', (19.0, 4)),
 ('0769663192', (25.0, 6))]

In [19]:
def calc_avg(v) :
  return round(v[0]/v[1],1)

In [20]:
totals.mapValues(calc_avg).take(10)

[('0020232233', 3.8),
 ('038536539X', 3.0),
 ('0486402029', 3.0),
 ('0486427706', 4.8),
 ('0486448789', 4.0),
 ('0545561647', 4.0),
 ('0641869665', 4.2),
 ('0735331146', 4.9),
 ('0769658237', 4.8),
 ('0769663192', 4.2)]

In [21]:
avg = input_data.map(lambda line: line.split(',')) \
        .map(lambda line: (line[0], (float(line[2]), 1))) \
        .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
        .mapValues(lambda r: round(r[0]/r[1],1))

In [22]:
avg.take(10)

[('0020232233', 3.8),
 ('038536539X', 3.0),
 ('0486402029', 3.0),
 ('0486427706', 4.8),
 ('0486448789', 4.0),
 ('0545561647', 4.0),
 ('0641869665', 4.2),
 ('0735331146', 4.9),
 ('0769658237', 4.8),
 ('0769663192', 4.2)]

## Ordenação por chave e valor

In [23]:
# Ordenar RDD por chave (primeiro elemento de cada linha)
# Ordenar antes do MAP é um algoritmo problemático, porque exige ver todos os dados
#     Equivale a usar collect
sorted_prod = avg.sortBy(lambda line: line[0])


In [24]:
sorted_prod.take(5)

[('0020232233', 3.8),
 ('0152014764', 5.0),
 ('038536539X', 3.0),
 ('0486277577', 4.8),
 ('0486402029', 3.0)]

In [25]:
#Ordenar cada item do RDD pelo segundo elemento de cada (line[1])

sorted_avg = avg.sortBy(lambda line: line[1])


In [26]:
sorted_avg.take(10)

[('B00004R8U6', 1.0),
 ('B00004SCMY', 1.0),
 ('B00005BZ8M', 1.0),
 ('B000096QQX', 1.0),
 ('B0000A92NS', 1.0),
 ('B0001K2K66', 1.0),
 ('B0001OM16Q', 1.0),
 ('B000212VGS', 1.0),
 ('B00026RFJ6', 1.0),
 ('B0002DF64A', 1.0)]

In [27]:
sorted_rev = avg.sortBy(lambda line: line[1], ascending=False)

In [28]:
sorted_rev.take(10)

[('0880924691', 5.0),
 ('0980209269', 5.0),
 ('0996063234', 5.0),
 ('1574893920', 5.0),
 ('1589945123', 5.0),
 ('1592920527', 5.0),
 ('1616617403', 5.0),
 ('1616617837', 5.0),
 ('1616619848', 5.0),
 ('1616619147', 5.0)]

In [29]:
avg.takeOrdered(10, key=lambda line: -line[1])

[('0880924691', 5.0),
 ('0980209269', 5.0),
 ('0996063234', 5.0),
 ('1574893920', 5.0),
 ('1589945123', 5.0),
 ('1592920527', 5.0),
 ('1616617403', 5.0),
 ('1616617837', 5.0),
 ('1616619848', 5.0),
 ('1616619147', 5.0)]

In [30]:
avg.takeOrdered(10, key=lambda line: line[1])

[('B00004R8U6', 1.0),
 ('B00004SCMY', 1.0),
 ('B00005BZ8M', 1.0),
 ('B000096QQX', 1.0),
 ('B0000A92NS', 1.0),
 ('B0001K2K66', 1.0),
 ('B0001OM16Q', 1.0),
 ('B000212VGS', 1.0),
 ('B00026RFJ6', 1.0),
 ('B0002DF64A', 1.0)]

# Separação por arquivos

In [None]:
input_dir = 'file:/content/drive/My Drive/gut-pt/small/'

In [None]:
input_files = spark.sparkContext.wholeTextFiles(input_dir+"*")


In [None]:
input_files.take(2)

In [None]:
# Cada item do RDD é uma tupla

import re
def process_file(f) :
  filename = f[0]
  text = f[1]
  filename = re.sub(input_dir, '', filename)
  text = re.sub('\*End of .*Project Gutenberg.*', '', text, flags=re.IGNORECASE|re.DOTALL)
  text = re.sub('[^a-zà-ù ]', ' ', text.lower())
  words = text.split()
  for w in words :
    yield ((filename, w), 1)

In [None]:
# flatmap pq um arquivo texto tem muitas palavras
word_pairs = input_files.flatMap(process_file)

In [None]:
word_pairs.take(10)

In [None]:
# Como na linha anterior as palavras estão organizadas por aqruivo, ao dar reduce aparece a quantidade
# de determinada palara dentro de determinado arquivo
wc = word_pairs.reduceByKey(lambda acc, v: acc + v)

In [None]:
wc.take(10)

In [None]:
sorted_wc = wc.sortBy(lambda item: item[1],ascending=False)

In [None]:
sorted_wc.take(10)

In [None]:
def contagem_total(item) :
  chave = item[0]
  contagem = item[1]
  nome_do_arquivo = chave[0]
  palavra = chave[1]
  return (palavra, contagem)

In [None]:
contagem_sem_arquivos = sorted_wc.map(contagem_total)

In [None]:
contagem_sem_arquivos.take(10)

In [None]:
total_geral = contagem_sem_arquivos.reduceByKey(lambda acc, v: acc+v)

In [None]:
total_geral.take(10)

In [None]:
total_wc = wc.map(lambda item: (item[0][1], item[1])) \
            .reduceByKey(lambda acc, v: acc + v) \
            .sortBy(lambda item: item[1],ascending=False)

In [None]:
total_wc.take(10)

In [None]:
total_wc.count()

In [None]:
!rm -rf total_wc

In [None]:
total_wc.saveAsTextFile("total_wc")

In [None]:
# O resultado terá múltiplas partições, cada arquivo é uma partição do RDD
!ls total_wc

In [None]:
!head -n 100 total_wc/part-00000

In [None]:
!head -n 5 total_wc/*

#Cálculo de Média por agrupamento (muito ineficiente)

In [None]:
def process_line(line) :
  cod,user,eval,time = line.split(',')
  eval = float(eval)
  return (cod, eval)



In [None]:
input_data = spark.sparkContext.textFile('/content/drive/My Drive/amz/small.csv')


In [None]:
reviews = input_data.map(process_line)

In [None]:
reviews.take(20)

In [None]:
# Esta operação exige muita troca de dados (comunicação) entre as partições, resultando em desempenho ruim

grouped = reviews.groupByKey()

In [None]:
grouped.take(5)

In [None]:
grouped.mapValues(list).take(1)

In [None]:
def calc_avg(values) :
  return round(sum(values) / len(values),1)

In [None]:
avg = grouped.mapValues(calc_avg)

In [None]:
avg.take(5)

# Cálculo de Média por agregação (mais eficiente)

In [None]:
def process_line(line) :
  cod,user,eval,time = line.split(',')
  eval = float(eval)
  return (cod, eval)


In [None]:
reviews = input_data.map(process_line)

In [None]:
reviews.take(10)

In [None]:
# acc tem formato (soma, contagem)

def aggElement(acc, review) :
  return (acc[0]+review, acc[1]+1)

def aggPartials(acc1, acc2) :
  return (acc1[0]+acc2[0], acc1[1]+acc2[1])


In [None]:
sums = reviews.aggregateByKey((0,0), aggElement, aggPartials)

In [None]:
sums.take(5)

In [None]:
avgs = sums.mapValues(lambda v: round(v[0]/v[1],2))

In [None]:
avgs.take(5)

# Join

In [None]:
# Primeira fonte de dados: totalização das avaliações de produtos

input_data_reviews = spark.sparkContext.textFile('/content/drive/My Drive/amz/small.csv')

total_evals = input_data_reviews.map(lambda line: line.split(',')) \
        .map(lambda line: (line[0], (float(line[2]), 1))) \
        .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))



In [None]:
input_data_reviews.count()

In [None]:
total_evals.take(10)

In [None]:
input_data_reviews.count()/total_evals.count()

In [None]:
# Segunda fonte de dados: metadados de produtos

input_data_meta = spark.sparkContext.textFile('/content/drive/My Drive/amz/meta_small.json')


In [None]:
input_data_meta.take(1)

In [None]:
import json

def get_asin_and_brand(line) :
  dict = json.loads(line) #vai transformar a linha de dado em um dicionário JSON
  try :
    yield (dict['asin'], dict['brand'])
  except :
    pass


In [None]:
metadata = input_data_meta.flatMap(get_asin_and_brand)

In [None]:
metadata.take(10)

In [None]:
metadata.count()

In [None]:
metadata.keys().distinct().count()

In [None]:
def removeDuplicates(acc, v) :
  return acc

In [None]:
metadata_unique = metadata.reduceByKey(lambda acc, v: v)

In [None]:
metadata_unique.take(10)

In [None]:
metadata_unique.count()

In [None]:
metadata.keys().distinct().count()

In [None]:
total_evals.take(10)

In [None]:
total_evals.count()

In [None]:
metadata_unique.take(10)

In [None]:
joined = total_evals.join(metadata_unique)

In [None]:
joined.take(10)

In [None]:
total_evals.count()

In [None]:
joined.count()

In [None]:
def brand_totals(item) :
  asin = item[0]
  brand = item[1][1]
  sum_evals = item[1][0][0]
  count_evals = item[1][0][1]
  return (brand, (sum_evals, count_evals))

In [None]:
reviews_per_brand = joined.map(brand_totals)

In [None]:
reviews_per_brand.take(20)

In [None]:
total_reviews_per_brand = reviews_per_brand.reduceByKey(acc_evals)

In [None]:
total_reviews_per_brand.take(10)

In [None]:
total_filtered = total_reviews_per_brand.filter(lambda item: item[1][1] > 100)

In [None]:
total_filtered.take(10)

In [None]:
total_filtered.count()

In [None]:
avg_review_per_brand = total_filtered.mapValues(lambda v: round(v[0]/v[1],2))

In [None]:
avg_review_per_brand.take(10)

In [None]:
sorted_avg_review_per_brand = avg_review_per_brand.sortBy(lambda line: -line[1])

In [None]:
sorted_avg_review_per_brand.take(50)

# Criação de índices

In [None]:
input_data = spark.sparkContext.textFile('/content/drive/My Drive/amz/small.json')


In [None]:
!head '/content/drive/My Drive/amz/small.json'

In [None]:
import json

stopwords = {"i", "me", "my", "myself", "we", "our", "ours", "ourselves",
    "you", "your", "yours", "yourself", "yourselves", "he", "him", "his",
    "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they",
    "them", "their", "theirs", "themselves", "what", "which", "who", "whom",
    "this", "that", "these", "those", "am", "is", "are", "was", "were", "be",
    "been", "being", "have", "has", "had", "having", "do", "does", "did",
    "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as",
    "until", "while", "of", "at", "by", "for", "with", "about", "against",
    "between", "into", "through", "during", "before", "after", "above",
    "below", "to", "from", "up", "down", "in", "out", "on", "off", "over",
    "under", "again", "further", "then", "once", "here", "there", "when",
    "where", "why", "how", "all", "any", "both", "each", "few", "more", "most",
    "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so",
    "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now"}

def word_to_product(line):
  dict = json.loads(line)
  try :
    text = dict['reviewText']
    product = dict['asin']
    text = re.sub('[^a-z ]', ' ', text.lower())
    for word in text.split() :
      if word not in stopwords and len(word) > 3:
        yield word, product
  except :
    pass

In [None]:
associations = input_data.flatMap(word_to_product)

In [None]:
associations.take(100)

In [None]:
# Muito ineficiente
idx = associations.groupByKey()

In [None]:
# Aqui ainda não é interpretável
idx.take(10)

In [None]:
# Aqui, para interpretar, cria-se um conjunto
idx_set = idx.mapValues(set)

In [None]:
idx_set.take(10)

In [None]:
idx_set = idx_set.sortByKey()

In [None]:
# Encontra os valores associados à chave
p = idx_set.lookup('television')

In [None]:
p

## Índice mais eficiente com combinação

In [None]:
associations.take(10)

In [None]:
def to_set(item) :
  return {item}

def append(s, item) :
  s.add(item)
  return s

def extend(s1, s2) :
 s1.union(s2)
 return s1


In [None]:
idx_efficient = associations.combineByKey(to_set, append, extend)

In [None]:
idx_efficient.take(1)

# Arquivos Binários

## Exemplos com PHash

In [None]:
!ln -s /content/drive/My\ Drive/phash-input/ .

In [None]:
!pip install imagehash

In [None]:
from PIL import Image

file_prefix = '/content/phash-input/'
first_file = file_prefix+'blur/england.bmp'
second_file = file_prefix+'misc/england.bmp'
third_file = file_prefix+'compr/england.jpg'

first = Image.open(first_file)
second =  Image.open(second_file)
third =  Image.open(third_file)

In [None]:
from IPython.display import display
display(first)

In [None]:
display(second)

In [None]:
display(third)

In [None]:
import imagehash
from numpy import array

def hash_and_display(file) :
  ih = imagehash.phash(file) # phash pronuncia-se pi-rash
  img = Image.fromarray(ih.hash)
  img = img.resize((300,300))
  display(img)
  return str(ih)

h = hash_and_display(first)
print("\n"+h+"\n\n\n")

h = hash_and_display(second)
print("\n"+h+"\n\n\n")

h = hash_and_display(third)
print("\n"+h+"\n\n\n")




## Leitura das imagens no Spark

In [None]:
images = spark.sparkContext.binaryFiles("./phash-input/*/*bmp")

In [None]:
# a resposta é um byte binário não interpretável pelo ser humano
images.take(2)

In [None]:
some_input_images = images.take(5)

In [None]:
image_example_data = some_input_images[2][1]
image_example_filename = some_input_images[2][0]

In [None]:
print(image_example_filename)

In [None]:
from PIL import Image
from io import BytesIO # essa lib dá a possibilidade de carregar como uma área de memória como um arquivo em disco

file_content = BytesIO(image_example_data)

# Mesmo que tenha open, não tem releitura, não está carregando do disco, não tem muito overhead também 
img = Image.open(file_content)

In [None]:
# Aqui já printa a imagem, não os bytes
from IPython.display import display
display(img)

## Calculo do PHash

In [None]:
from PIL import Image
import imagehash

def hash_file(file_data):
  file_content = BytesIO(file_data)
  img = Image.open(file_content)
  h = imagehash.phash(img)
  return str(h)

## Função Map

In [None]:
# Observação: hash é usado como chave para permitir agrupamento

hashes_with_filenames = images.map(lambda line: (hash_file(line[1]), line[0]))

In [None]:
# Cada hash aponta para um arquivo diferente, has, nesse caso é uma chave, o valor (da tupla) é o nome do arquivo
# Depois pode-se agrupar arquivos por hash com groupbykey
hashes_with_filenames.take(10)

## Agrupamento

In [None]:
similar_images_iter = hashes_with_filenames.groupByKey()

In [None]:
similar_images = similar_images_iter.mapValues(set)

In [None]:
similar_images.collect()

## Distância / Similaridade

In [None]:
# Produto cartesiano do DF consigo mesmo

combinations = hashes_with_filenames.cartesian(hashes_with_filenames)


In [None]:
combinations.take(2)[1]

In [None]:
def distance(pair) :
  f1 = pair[0][1]
  f2 = pair[1][1]
  if f1 != f2 :
    v1 = imagehash.hex_to_hash(pair[0][0])
    v2 = imagehash.hex_to_hash(pair[1][0])
    d = v1-v2
    yield (pair[0][1], pair[1][1], d)

In [None]:
distances = combinations.flatMap(distance)

In [None]:
distances.take(3)

In [None]:
sorted_dist = distances.sortBy(lambda item: (item[0], item[2]))




In [None]:
sorted_dist.take(3)