In [None]:
# Para que o Jupyter consiga carregar o Spark corretamente no notebook
import findspark
findspark.init('/usr/local/spark')

# Para que os executors tenham mais memória e não falhem por falta de recursos
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 1G pyspark-shell'

# A partir daqui é código Spark que normalmente é executado com um comando similar ao comando abaixo:
# spark-submit --executor-memory 1G nome_do_script.py
from pyspark import SparkConf, SparkContext

# A linha abaixo está comentada porque essa é a forma de executar Spark em uma instalação local usando todos os cores
#conf = SparkConf().setMaster("local[*]").setAppName("NomeDoApp")

conf = SparkConf()
sc = SparkContext(conf=conf)

# countByValue

In [None]:
import collections

ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
rating_values = ratings.map(lambda x: x.split()[2])
result = rating_values.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for rating, number_of_ratings in sortedResults.items():
    print("{} usuários colocaram a nota {}".format(number_of_ratings, rating))

# Key Value Pair

In [None]:
def parse_ratings_as_key(line):
    fields = line.split()
    rating_field = int(fields[2])
    return (rating_field, 1)

ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings_count = ratings.map(parse_ratings_as_key)
ratings_sum = ratings_count.reduceByKey(lambda x, y: x + y)
results = ratings_sum.collect()
for rating, number_of_ratings in results:
    print("{} usuários colocaram a nota {}".format(number_of_ratings, rating))

# mapValues

In [None]:
def parse_user_and_rating(line):
    fields = line.split()
    user_field = int(fields[0])
    rating_field = int(fields[2])
    return (user_field, rating_field)

ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings_by_user = ratings.map(parse_user_and_rating)
totals_by_user = ratings_by_user.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
ratings_avg = totals_by_user.mapValues(lambda x: x[0] / float(x[1]))
results = ratings_avg.collect()
for user, avg in results:
    print("A média de notas do usuário {} é {}".format(user, avg))

# filter

In [None]:
def parse_movie_and_rating(line):
    fields = line.split()
    movie_field = fields[1]
    rating_field = fields[2]
    return (movie_field, rating_field)

ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings_by_movie = ratings.map(parse_movie_and_rating)
star_wars_ratings = ratings_by_movie.filter(lambda x: "50" == x[0])
min_start_wars_rating = star_wars_ratings.reduceByKey(lambda x, y: min(x, y))
results = min_start_wars_rating.collect()

for movie, rating in results:
    print("A pior nota do filme {} foi {}".format(movie, rating))

# field type

In [None]:
def parse_movie_and_rating(line):
    fields = line.split()
    movie_field = int(fields[1])
    rating_field = int(fields[2])
    return (movie_field, rating_field)

ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
print(ratings.take(5))
ratings_by_movie = ratings.map(parse_movie_and_rating)
print(ratings_by_movie.take(5))
star_wars_ratings = ratings_by_movie.filter(lambda x: 50 == x[0])
min_start_wars_rating = star_wars_ratings.reduceByKey(lambda x, y: min(x, y))
results = min_start_wars_rating.collect()

for movie, rating in results:
    print("A pior nota do filme {} foi {}".format(movie, rating))

# Melhorando formatação do resultado

In [None]:
def parse_movie_and_rating(line):
    fields = line.split()
    movie_field = int(fields[1])
    rating_field = int(fields[2])
    return (movie_field, rating_field)

movies = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.item").map(lambda x: x.split("|"))
movies_dict = movies.map(lambda x: (int(x[0]), x[1])).collectAsMap()
ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings_by_movie = ratings.map(parse_movie_and_rating)
star_wars_ratings = ratings_by_movie.filter(lambda x: 50 == x[0])
min_start_wars_rating = star_wars_ratings.reduceByKey(lambda x, y: min(x, y))
results = min_start_wars_rating.collect();
print(results)

for movie, rating in results:
    print("A pior nota do filme {} foi {}".format(movies_dict[movie], rating))

# Entendendo melhor o resultado

In [None]:
def parse_movie_and_rating(line):
    fields = line.split()
    movie_field = int(fields[1])
    rating_field = int(fields[2])
    return (movie_field, rating_field)

movies = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.item").map(lambda x: x.split("|"))
movies_dict = movies.map(lambda x: (int(x[0]), x[1])).collectAsMap()
ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings_by_movie = ratings.map(parse_movie_and_rating)
star_wars_ratings = ratings_by_movie.filter(lambda x: 1536 == x[0]) # Tente com o filme 1536
min_start_wars_rating = star_wars_ratings.reduceByKey(lambda x, y: min(x, y))
results = min_start_wars_rating.collect()
ratings_count = star_wars_ratings.countByKey()

for movie, rating in results:
    print("A pior nota do filme {} foi {} com {} notas".format(movies_dict[movie], rating, ratings_count[movie]))

# flatMap

In [None]:
# with open("text.txt", "w") as text_file:
#     text_file.write("Um texto simples\n")
#     text_file.write("Um texto de exemplo\n")
#     text_file.write("Exemplo de texto")

# text = sc.textFile("file:///home/dataeng6/text.txt") # Troque hadoop pelo seu login do JupyterHub
# rdd = text.map(lambda x: x)
# print(rdd.take(10))
# rdd = text.map(lambda x: x.split())
# print(rdd.take(10))
# rdd = text.flatMap(lambda x: x.split()) # Experimente usar x.lower().split() para ver o que muda no resultado
# print(rdd.take(10))

# word_count = rdd.countByValue()
# for word, count in word_count.items():
#     print("{}: {}".format(word, count))

# with open("text.txt", "w") as text_file:
#     text_file.write("Um texto simples\n")
#     text_file.write("Um texto de exemplo\n")
#     text_file.write("Exemplo de texto")

text = sc.parallelize(["Um texto simples", "Um texto de exemplo", "Exemplo de texto"])

# text = sc.textFile("file:///home/dataeng6/text.txt") # Troque hadoop pelo seu login do JupyterHub
# rdd = text.map(lambda x: x)
# print(rdd.take(10))
rdd = text.map(lambda x: x.split())
print(rdd.take(10))
rdd = text.flatMap(lambda x: x.split()) # Experimente usar x.lower().split() para ver o que muda no resultado
print(rdd.take(10))

# word_count = rdd.countByValue()
# print('word_count: ', word_count)
# for word, count in word_count.items():
#     print("{}: {}".format(word, count))

# sortByKey

In [None]:
# with open("text.txt", "w") as text_file:
#     text_file.write("Um texto simples\n")
#     text_file.write("Um texto de exemplo\n")
#     text_file.write("Exemplo de texto")

# text = sc.textFile("file:///home/hadoop/text.txt") # Troque hadoop pelo seu login do JupyterHub
# rdd = text.flatMap(lambda x: x.lower().split())
# rdd_counted = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
# rdd_sorted = rdd_counted.map(lambda x: (x[1], x[0])).sortByKey() # Experimente adicionar o argumento ascending=False

# word_count = rdd_sorted.collect()
# for count, word in word_count:
#     print("{}: {}".format(count, word))

# with open("text.txt", "w") as text_file:
#     text_file.write("Um texto simples\n")
#     text_file.write("Um texto de exemplo\n")
#     text_file.write("Exemplo de texto")

text = sc.parallelize(["Um texto simples", "Um texto de exemplo", "Exemplo de texto"])

# text = sc.textFile("file:///home/hadoop/text.txt") # Troque hadoop pelo seu login do JupyterHub
rdd = text.flatMap(lambda x: x.lower().split())
rdd_counted = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
rdd_sorted = rdd_counted.map(lambda x: (x[1], x[0])).sortByKey(ascending=False) # Experimente adicionar o argumento ascending=False

word_count = rdd_sorted.collect()
for count, word in word_count:
    print("{}: {}".format(count, word))



# broadcast

In [None]:
def parse_movie_count(line):
    fields = line.split()
    movie_field = int(fields[1])
    return (movie_field, 1)

movies = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.item").map(lambda x: x.split("|"))
dict_movies_names = movies.map(lambda x: (int(x[0]), x[1])).collectAsMap()
movies_dict = sc.broadcast(dict_movies_names) # Enviar dict para os executors apenas uma vez
ratings = sc.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings_count_by_movie = ratings.map(parse_movie_count).reduceByKey(lambda x, y: x + y)
ratings_count_by_movie = ratings_count_by_movie.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)
ratings_count_by_movie = ratings_count_by_movie.map(lambda x: (x[0], movies_dict.value[x[1]]))

result = ratings_count_by_movie.collect()
for count, movie in result:
    print("{}: {}".format(count, movie))

# Spark SQL

In [None]:
from pyspark.sql import SparkSession, Row

def parse_ratings(line):
    fields = line.split()
    return Row(user_id=int(fields[0]), 
               movie_id=int(fields[1]), 
               rating=int(fields[2]), 
               timestamp=int(fields[3]))

def parse_movies(line):
    fields = line.split("|")
    return Row(movie_id=int(fields[0]), 
               name=fields[1])

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

data = spark.sparkContext.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.data")
ratings = data.map(parse_ratings)
ratings_df = spark.createDataFrame(ratings).cache()
ratings_df.createOrReplaceTempView("ratings")

data = spark.sparkContext.textFile("s3a://data-eng-t2-school/spark/ml-100k/u.item")
movies = data.map(parse_movies)
movies_df = spark.createDataFrame(movies).cache()
movies_df.createOrReplaceTempView("movies")

# AVISO: lembre de executar o comando spark.stop() no último bloco de código quando acabar

# SELECT

In [None]:
result = spark.sql("SELECT * FROM ratings LIMIT 10")

for r in result.collect():
    print(r)

# Filmes com mais notas registradas

In [None]:
ratings_df.groupBy("user_id").count().orderBy("count", ascending=False).show()

# Filmes com mais notas registradas (melhorado)

In [None]:
ratings_df.join(movies_df, ratings_df.movie_id == movies_df.movie_id).groupBy(movies_df.name).count().orderBy("count", ascending=False).show()

In [None]:
spark.stop()

# Atividades

## Dados

**Descrição das colunas:**  
timestamp,user_id,action,adId,campaignId 

**Amostra:**  
2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01  
2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02  
2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02  
2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03

**Path:**  
s3a://data-eng-t2-school/spark/ad_action.csv

## Atividade 1

Qual é o anúncio mais popular?

**Resposta:**  
adId_06

## Atividade 2

Quantos clicks gerou a campanha mais popular?

**Resposta:**  
63983 clicks

## Atividade 3

Algum usuário só visualizou? Quantas actions foram enviadas por usuário que só visualizou?

**Resposta:**  
Sim, o usuário d99871cb-98b7-4ac5-97a5-b9a26c0f897b enviou apenas 1 action

## Atividade 4

Dos 10411 usuários, quantos usuários clicam mais que visualizam?

**Resposta:**  
410

## Atividade 5

Calcule a quantidade de clicks por dia da semana e apresente o resultado em ordem decrescente de quantidade de clicks.

**Resposta**  
27918, 4  
25424, 5  
25028, 0  
25027, 3  
25020, 6  
24973, 1  
24915, 2

In [None]:
from pyspark.sql import SparkSession, Row

def parse_ad_action(line):
    fields = line.split(",")
    return Row(timestamp=fields[0], 
               user_id=fields[1], 
               action=fields[2], 
               adId=fields[3],
               campaignId=fields[4]
              )

# def parse_movies(line):
#     fields = line.split("|")
#     return Row(movie_id=int(fields[0]), 
#                name=fields[1])

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

data = spark.sparkContext.textFile("s3a://data-eng-t2-school/spark/ad_action.csv")
ad_action = data.map(parse_ad_action)
print(ad_action)
ad_action_df = spark.createDataFrame(ad_action).cache()
ad_action_df.createOrReplaceTempView("ad_action")

In [None]:
ad_action_df.groupBy("adId").count().orderBy("count", ascending=False).show()

In [None]:
ad_action_df2 = ad_action_df.filter(ad_action_df['action'] == "click")
ad_action_df2.groupBy("campaignId").count().orderBy("count", ascending=False).show()

In [None]:
ad_action_df3 = ad_action_df.filter(ad_action_df['action'] == "view")
ad_action_df4 = ad_action_df.filter(ad_action_df['action'] == "click")

ad_action_df3.join(ad_action_df4, ad_action_df4.user_id != ad_action_df3.user_id)

# ad_action_df3.show()

ad_action_df5 = ad_action_df3.filter(ad_action_df['action'] == "view")
ad_action_df5.show()

# ratings_df.join(movies_df, ratings_df.movie_id == movies_df.movie_id).groupBy(movies_df.name).count().orderBy("count", ascending=False).show()
# ad_action_df4.show(5)
# ad_action_df3.groupBy("user_id").count().orderBy("count", ascending=False).show()

In [None]:
spark.stop()