In [21]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import numpy as np
import pandas as pd
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
sc = spark.sparkContext

In [2]:
from pyspark.sql import functions as f

In [3]:
books = spark.read.csv('csv_dump/BX-CSV-Dump/books.csv', sep='";"', header=True)
books = books.drop('Image-URL-M', 'Image-URL-S', 'Image-URL-L""')
books = books.withColumnRenamed('"ISBN', 'ISBN') \
                .withColumnRenamed('Book-Author', 'Book_Author') \
                .withColumnRenamed('Year-Of-Publication', 'Year_of_Publication') \
                .withColumnRenamed('Book-Title', 'Book_Title')

books = books.withColumn('ISBN', f.regexp_replace('ISBN', '"', ''))
books = books.withColumn('Publisher', f.regexp_replace('Publisher', '""', '')) 

users = spark.read.csv('csv_dump/BX-CSV-Dump/users.csv', sep='";"', header=True)
users = users.withColumnRenamed('"User-ID', 'User_ID').withColumnRenamed('Age""', 'Age')
users = users.withColumn('User_ID', f.regexp_replace('User_ID', '"', '')).withColumn('Age', f.regexp_replace('Age', '""', ''))

ratings = spark.read.csv('csv_dump/BX-CSV-Dump/ratings.csv',sep='";"', header=True)
ratings = ratings.withColumnRenamed('"User-ID', 'User_ID')  \
                    .withColumnRenamed('Book-Rating""', 'Rating')
ratings = ratings.withColumn('User_ID', f.regexp_replace('User_ID', '"', '')) \
                    .withColumn('Rating', f.regexp_replace('Rating', '""', '')) 

In [4]:
users.show(5)

+-------+--------------------+----+
|User_ID|            Location| Age|
+-------+--------------------+----+
|      1|nyc, new york, us...|null|
|      2|stockton, califor...|  18|
|      3|moscow, yukon ter...|null|
|      4|porto, v.n.gaia, ...|  17|
|      5|farnborough, hant...|null|
+-------+--------------------+----+
only showing top 5 rows



In [5]:
books.show(5)

+----------+--------------------+--------------------+-------------------+--------------------+
|      ISBN|          Book_Title|         Book_Author|Year_of_Publication|           Publisher|
+----------+--------------------+--------------------+-------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|               1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|               1999|W. W. Norton &amp...|
+----------+--------------------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [6]:
ratings.show(5)

+-------+----------+------+
|User_ID|      ISBN|Rating|
+-------+----------+------+
| 276725|034545104X|     0|
| 276726|0155061224|     5|
| 276727|0446520802|     0|
| 276729|052165615X|     3|
| 276729|0521795028|     6|
+-------+----------+------+
only showing top 5 rows



In [7]:
ratings.createOrReplaceTempView("ratings")
books.createOrReplaceTempView("books")
users.createOrReplaceTempView("users")

## Task 1

#### Lista de usuarios junto con el número de libros que han valorado

In [8]:
libros_valorados = ratings.groupBy(ratings["User_ID"]) \
                            .agg(f.count("User_ID").alias("Libros")) \
                            .sort(f.asc("User_ID")) 

libros_valorados.show(5)



+-------+------+
|User_ID|Libros|
+-------+------+
|     10|     2|
|    100|     1|
|   1000|     1|
| 100001|     1|
| 100002|     1|
+-------+------+
only showing top 5 rows



                                                                                

In [9]:
libros_valorados = spark.sql(" \
                            SELECT User_ID, COUNT(User_ID) as libros \
                                FROM ratings \
                                GROUP BY User_ID \
                                ORDER BY User_ID \
                            ")

libros_valorados.show(5)



+-------+------+
|User_ID|libros|
+-------+------+
|     10|     2|
|    100|     1|
|   1000|     1|
| 100001|     1|
| 100002|     1|
+-------+------+
only showing top 5 rows



                                                                                

#### Rating máximo recibido por cada editorial

In [10]:
rating_maximo = books.select(books["Publisher"], "ISBN")\
                    .join(ratings, books.ISBN == ratings.ISBN, 'inner') \
                    .groupBy(books["Publisher"]) \
                    .agg(f.max(ratings.Rating).alias("max_rating"))

rating_maximo.show(5)

[Stage 16:>                                                         (0 + 8) / 8]

+--------------------+----------+
|           Publisher|max_rating|
+--------------------+----------+
| Editions P. Terrail|        10|
|     Tri-State Press|         6|
|             'K' Pub|         8|
|(3 Queen Sq., WC1...|         0|
|(49 Poland St., W...|         0|
+--------------------+----------+
only showing top 5 rows



                                                                                

In [11]:
rating_maximo = spark.sql(" \
                          SELECT Publisher, MAX(Rating) as max_rating \
                          FROM books \
                          INNER JOIN ratings \
                          ON books.ISBN  == ratings.ISBN \
                          GROUP BY Publisher \
                        ")

rating_maximo.show(5)



+--------------------+----------+
|           Publisher|max_rating|
+--------------------+----------+
| Editions P. Terrail|        10|
|     Tri-State Press|         6|
|             'K' Pub|         8|
|(3 Queen Sq., WC1...|         0|
|(49 Poland St., W...|         0|
+--------------------+----------+
only showing top 5 rows



                                                                                

#### Nombre del autor que ha recibido más ratings

In [31]:
author_max = books.join(ratings, books.ISBN == ratings.ISBN) \
                .groupBy("Book_Author").count().orderBy("count", ascending=[0])

author_max.show(1)



+------------+-----+
| Book_Author|count|
+------------+-----+
|Stephen King|10053|
+------------+-----+
only showing top 1 row



                                                                                

## Task 2

In [24]:
from pyspark.sql import Window


#### ¿Cuál es el título del libro con mayor número de ratings para cada editorial?

In [16]:
books.show(5)

+----------+--------------------+--------------------+-------------------+--------------------+
|      ISBN|          Book_Title|         Book_Author|Year_of_Publication|           Publisher|
+----------+--------------------+--------------------+-------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|               1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|               1999|W. W. Norton &amp...|
+----------+--------------------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [17]:
num_ratings = spark.sql("   \
                        SELECT Book_Title, Publisher, MAX(rating) as rating FROM( \
                        SELECT Book_Title, Publisher, VIEW.rating \
                        FROM books \
                        INNER JOIN ( \
                            SELECT ISBN, COUNT(ISBN) AS rating \
                            FROM ratings \
                            GROUP BY ISBN \
                            ) VIEW \
                        ON books.ISBN == VIEW.ISBN \
                        ) VIEW2 \
                        GROUP BY Book_Title, Publisher \
")

num_ratings.show(5)



+--------------------+--------------------+------+
|          Book_Title|           Publisher|rating|
+--------------------+--------------------+------+
|                Liar|    Harpercollins Uk|     1|
|     Punch With Judy|Simon &amp; Schuster|     1|
|Prayers of the Co...|  HarperSanFrancisco|     2|
|Postcards from Fr...|         HarperTorch|     2|
|Fire and Rain : F...|         HarperTorch|     4|
+--------------------+--------------------+------+
only showing top 5 rows



                                                                                

In [18]:
overCategory = Window.partitionBy(num_ratings.Publisher) \
                        .orderBy(num_ratings.rating.desc())

In [27]:
ranked = num_ratings.withColumn("rating", f.dense_rank() \
                    .over(overCategory)) 
ranked = ranked.where(ranked.rating == 1)
ranked.show(5)



+--------------------+--------------------+------+
|          Book_Title|           Publisher|rating|
+--------------------+--------------------+------+
|Manna: foods of t...|101 Productions; ...|     1|
| Portraits of Cities|22nd. Century, Ne...|     1|
|Restoring Intimac...|                   3|     1|
|Denver Hiking Gui...|            3D Press|     1|
|Haley's Cleaning ...|3H Productions, Inc.|     1|
+--------------------+--------------------+------+
only showing top 5 rows



                                                                                

#### ¿Cuál es la diferencia entre el número de ratings de cada libro y el número de ratings del libro con mayor número de ratings de la misma editorial?

In [33]:
overCategory = Window.partitionBy(num_ratings.Publisher) \
                        .orderBy(num_ratings.rating.desc())
ranked = num_ratings.withColumn("rating_diff", f.max(num_ratings.rating) \
                    .over(overCategory) - num_ratings.rating) 

ranked.show(5)



+--------------------+--------------------+------+-----------+
|          Book_Title|           Publisher|rating|rating_diff|
+--------------------+--------------------+------+-----------+
|Manna: foods of t...|101 Productions; ...|     2|          0|
| Portraits of Cities|22nd. Century, Ne...|     1|          0|
|Restoring Intimac...|                   3|     1|          0|
|Denver Hiking Gui...|            3D Press|     1|          0|
|Haley's Cleaning ...|3H Productions, Inc.|     1|          0|
+--------------------+--------------------+------+-----------+
only showing top 5 rows



                                                                                