# Notebook para restaurar os dados de Avaliações e Livros da Amazon.
### Fonte: https://www.kaggle.com/datasets/saurabhbagchi/books-dataset/data
O objetivo é restaurar esses dados em um banco relacional com os dados de Livros, Usuários e as Avaliações e em seguida criar um modelo de recomendação em cima desses dados.

Os dados estão divididos em 2 CSVs, um de livros (books.csv) e outro de avaliações (ratings.csv). A união desses dois datasets, segundo a documentação do dataset, é pelo título do livro. O título não é uma boa chave para a relação desses dois dados, o mais indicado seria utilizar o ISBN, mas os dados de livros não possui essa informação. Farei o possívei para deixar os dados minimamente utilizáveis.

Não é necessário rodar esse notebook novamente, os dados serão gravados em um MySQL e depois um Dump com os dados em SQL será colocado no caminho mysql/db_backup.sql desse projeto.

In [2]:
from pyspark.sql import SparkSession

# Inicia o Spark

In [3]:
spark_executor_cores = "8"

spark = SparkSession.builder \
    .config("spark.driver.extraClassPath", "../lib/mysql-connector-j-9.1.0.jar") \
    .config("spark.executors.core", spark_executor_cores) \
    .appName('SparkByExamples.com') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/16 16:17:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/16 16:17:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Faz a leitura dos dados de Avaliações dos Livros a partir do CSV

In [15]:
spark.read.option("header","true").option("delimiter",";").option("escape",'"').csv("../datasets/books/ratings.csv").createOrReplaceTempView("tmp_ratings")

In [16]:
spark.sql("select * from tmp_ratings").show(5, True)

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
+-------+----------+-----------+
only showing top 5 rows



### Tratamento de colunas, valores e metadado

In [22]:
spark.sql("""
    SELECT 
        ISBN as isbn,
        `User-id` as user_id,
        `Book-Rating` as review_score
    FROM tmp_ratings
""").createOrReplaceTempView("tmp_reviews_parsed")

In [23]:
spark.sql("select * from tmp_reviews_parsed").show(5, False)

+----------+-------+------------+
|isbn      |user_id|review_score|
+----------+-------+------------+
|034545104X|276725 |0           |
|0155061224|276726 |5           |
|0446520802|276727 |0           |
|052165615X|276729 |3           |
|0521795028|276729 |6           |
+----------+-------+------------+
only showing top 5 rows



### Verificando as repetições de títulos e ISBNs
Embora tenhamos muitos títulos repetidos eles possuem ISBNs diferentes e suas próprias avaliações. Isso pode ser um erro da base ou são realmente livros com edições/revisões diferentes 

In [25]:
spark.sql("select isbn, count(1) as total, avg(review_score) as score from tmp_reviews_parsed group by 1 order by 2 desc").show(20, False)



+----------+-----+------------------+
|isbn      |total|score             |
+----------+-----+------------------+
|0971880107|2502 |1.0195843325339728|
|0316666343|1295 |4.468725868725869 |
|0385504209|883  |4.652321630804077 |
|0060928336|732  |3.448087431693989 |
|0312195516|723  |4.334716459197787 |
|044023722X|647  |3.187017001545595 |
|0679781587|639  |4.381846635367762 |
|0142001740|615  |4.219512195121951 |
|067976402X|614  |3.255700325732899 |
|0671027360|586  |3.718430034129693 |
|0446672211|585  |4.105982905982906 |
|059035342X|571  |4.900175131348512 |
|0316601950|568  |3.5933098591549295|
|0375727345|552  |3.039855072463768 |
|044021145X|529  |3.0756143667296785|
|0452282152|526  |4.218631178707224 |
|0440214041|523  |2.5239005736137665|
|0804106304|519  |3.0635838150289016|
|0440211727|517  |2.97678916827853  |
|0345337662|506  |3.535573122529644 |
+----------+-----+------------------+
only showing top 20 rows



                                                                                

### Verificando a variação de usuários e média de reviews por usuário

In [26]:
spark.sql("select count(1) as lines, count(distinct isbn) as isbns, count(distinct User_id) as users, count(1)/count(distinct User_id) as avg_user_review from tmp_reviews_parsed").show()



+-------+------+------+------------------+
|  lines| isbns| users|   avg_user_review|
+-------+------+------+------------------+
|1149780|340556|105283|10.920851419507423|
+-------+------+------+------------------+



                                                                                

# Faz a leitura os dados dos Livros a partir do CSV

In [78]:
spark.read.option("header","true").option("delimiter",";").csv("../datasets/books/books.csv").createOrReplaceTempView("tmp_books")

In [79]:
spark.sql("select * from tmp_books").show(5, False)

+----------+--------------------------------------------------------------------------------------------------+--------------------+-------------------+--------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|ISBN      |Book-Title                                                                                        |Book-Author         |Year-Of-Publication|Publisher                 |Image-URL-S                                                 |Image-URL-M                                                 |Image-URL-L                                                 |
+----------+--------------------------------------------------------------------------------------------------+--------------------+-------------------+--------------------------+------------------------------------------------------------+----------------------------------

## Faz o tratamento das colunas

In [80]:
spark.sql("""
    SELECT
        UPPER(ISBN) as isbn,
        `Book-Title` as title,
        REGEXP_REPLACE(`Book-Author`,'[\\\[\\\]\\\']','') as author,
        `Image-URL-L` as image_url,
        publisher,
        `Year-Of-Publication` as publication_year
    FROM
        tmp_books
""").createOrReplaceTempView("tmp_books_parsed")

In [81]:
spark.sql("select * from tmp_books_parsed where isbn = '085409878X'").show(5, True)

+----------+--------------------+-------------------+--------------------+-------------+----------------+
|      isbn|               title|             author|           image_url|    publisher|publication_year|
+----------+--------------------+-------------------+--------------------+-------------+----------------+
|085409878X|"Pie-powder"; bei...|John Alderson Foote|http://images.ama...|EP Publishing|            1973|
+----------+--------------------+-------------------+--------------------+-------------+----------------+



### Verifica duplicidades no Título

In [82]:
spark.sql("select count(1) as lines, count(distinct title) as titles, SUM(case when title is null then 1 end) as no_titles from tmp_books_parsed").show(10, True)

+------+------+---------+
| lines|titles|no_titles|
+------+------+---------+
|271379|242154|     NULL|
+------+------+---------+



In [83]:
spark.sql("select title, count(1) as lines from tmp_books_parsed group by 1 order by lines desc").show(5, False)

+-----------------+-----+
|title            |lines|
+-----------------+-----+
|Selected Poems   |27   |
|Little Women     |24   |
|Wuthering Heights|21   |
|The Secret Garden|20   |
|Dracula          |20   |
+-----------------+-----+
only showing top 5 rows



# Verifica a relação entre os Livros e as Avaliações
Precisamos saber se todas as avaliações serão ligadas com os seus respectivos livros

In [84]:
spark.sql("""
          select
            count(1) as review_lines,
            count(b.title) as books_lines,
            count(r.isbn) as reviews,
            count(DISTINCT r.isbn) as unique_isbns,
            count(distinct b.title) as book_titles,
            sum(case when b.title is null then 1 end) as no_books
          from tmp_reviews_parsed r
          left join tmp_books_parsed b
            on b.isbn = r.isbn
""").show(10, False)



+------------+-----------+-------+------------+-----------+--------+
|review_lines|books_lines|reviews|unique_isbns|book_titles|no_books|
+------------+-----------+-------+------------+-----------+--------+
|1163192     |1044068    |1163192|340556      |241005     |119124  |
+------------+-----------+-------+------------+-----------+--------+



                                                                                

# Escreve os Livros no banco transactional

In [92]:
spark.sql("""
    SELECT 
        *
    FROM
        (select 
            b.isbn,
            b.title,
            b.author,
            b.publication_year,
            b.publisher,
            b.image_url,
            SUM(1) OVER (PARTITION BY b.isbn ROWS UNBOUNDED PRECEDING) AS indx
        from tmp_books_parsed b 
            left join tmp_reviews_parsed r
            on r.isbn = b.isbn
        where 
            b.title is not null and r.isbn is not null
        group by
            1,2,3,4,5,6
        ) as t
    WHERE indx = 1
""").drop("indx").repartition(1).write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306") \
    .option("dbtable", "book_store.books") \
    .option("user", "book_store") \
    .option("password", "1234") \
    .mode("append") \
    .save()

                                                                                

# Extrai os usuários dos dados de Avaliações

### Cria IDs numéricos para os usuários e utiliza o hash como username

In [93]:
spark.sql("""
    select 
        RANK() OVER (PARTITION BY 1 ORDER BY user_id) as id,
        user_id AS username
    FROM
        tmp_reviews_parsed
    WHERE
        user_id is not null
    group by
        2
""").createOrReplaceTempView("tmp_users")

In [94]:
spark.sql("select * from tmp_users").show(5, False)

+---+--------+
|id |username|
+---+--------+
|1  |10      |
|2  |100     |
|3  |1000    |
|4  |100001  |
|5  |100002  |
+---+--------+
only showing top 5 rows



### Verifica se não há duplicidades

In [95]:
spark.sql("select count(1) as lines, count(distinct id) as ids, count(distinct username) as names from tmp_users").show()

+------+------+------+
| lines|   ids| names|
+------+------+------+
|105283|105283|105283|
+------+------+------+



### Escreve os dados de usuários no banco relacional

In [96]:
spark.sql("""
    select 
        id,
        username
    from tmp_users
    where
        id is not null
""").write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306") \
    .option("dbtable", "book_store.users") \
    .option("user", "book_store") \
    .option("password", "1234") \
    .mode("append") \
    .save()

                                                                                

## Verifica duplicidades de Avaliações

In [99]:
spark.sql("""
    select
        r.isbn,
        u.id as user_id,
        review_score as rating
        -- review_time 
    from tmp_reviews_parsed r
        left join tmp_users u on u.username = r.user_id 
        left join tmp_books_parsed b on b.isbn = r.isbn
    where 
        r.isbn is not null and u.id is not null and b.title is not null
""").createOrReplaceTempView("tmp_db_ratings")

In [100]:
spark.sql("""
    select 
        count(1) as lines, 
        count(distinct user_id) as users, 
        count(distinct isbn) as isbns,
        count(distinct isbn || user_id) as keys 
    from tmp_db_ratings
""").show()

                                                                                

+-------+-----+------+-------+
|  lines|users| isbns|   keys|
+-------+-----+------+-------+
|1044068|92092|269764|1030656|
+-------+-----+------+-------+



In [102]:
spark.sql("""
    select 
        isbn ||'_' ||user_id as key,
        count(1) as duplications
    from tmp_db_ratings
    group by 1
    order by 2 desc
""").show()



+-----------------+------------+
|              key|duplications|
+-----------------+------------+
| 002542730X_66897|           2|
| 033026012X_57922|           2|
| 031242227X_75232|           2|
| 002089130X_12747|           2|
| 044651652X_76667|           2|
| 042516828X_83678|           2|
| 006091291X_77502|           2|
|000649840X_101080|           2|
| 080410753X_78240|           2|
| 080211671X_77040|           2|
| 031242227X_77435|           2|
| 080410526X_68165|           2|
| 067976402X_66626|           2|
| 059035342X_54772|           2|
| 043935806X_83737|           2|
| 080410526X_75939|           2|
| 051513449X_67931|           2|
|074322535X_104237|           2|
| 044022103X_89520|           2|
| 044651862X_27425|           2|
+-----------------+------------+
only showing top 20 rows



                                                                                

In [106]:
spark.sql("select * from tmp_users where id = 66897").show()

+-----+--------+
|   id|username|
+-----+--------+
|66897|   25981|
+-----+--------+



In [110]:
spark.sql("""
    select * from tmp_db_ratings where isbn = '002542730X' and user_id = 66897
""").show(10, False)

+----------+-------+------+
|isbn      |user_id|rating|
+----------+-------+------+
|002542730X|66897  |0     |
|002542730X|66897  |0     |
+----------+-------+------+



# Escreve as Avaliações no banco de dados

In [111]:
spark.sql("""
    select
        isbn,
        user_id,
        rating
        -- review_time as created_at
    from
        (select 
            *,
            SUM(1) OVER(PARTITION BY isbn,r.user_id ROWS UNBOUNDED PRECEDING) as indx
        from tmp_db_ratings r
        ) as t
    where
        indx = 1
""").drop('indx').write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306") \
    .option("dbtable", "book_store.ratings") \
    .option("user", "book_store") \
    .option("password", "1234") \
    .mode("append") \
    .save()

                                                                                