# Question Bounes 1)

## 1.1)
## Using PySpark

I first installed PySpark by:
$ pip install pyspark

- Then, used 'SparkSession.builder.appName("FilterAuthors").getOrCreate()' to create a Spark session in order to use its built-in libraries and methods.
- I read the json file with '.read.json()' method. PySpark is basically designed to work with Big data.
- Pyspark uses resilient distributed datasets (RDDs) to work parallel on the data. Hence, it performs better than pandas.
- PySpark uses lazy processing, retrieving data from disk only when required, while the pandas module stores all data in memory, resulting in higher memory consumption compared to PySpark.

In [35]:
%%time

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("FilterAuthors").getOrCreate()

# Read the JSON file
df = spark.read.json("lighter_authors.json")

# Filter the DataFrame based on the text_reviews_count property
filtered_df = df.filter(col("text_reviews_count") >= 100)

# Sort the DataFrame in descending order by 'text_reviews_count'
sorted_df = filtered_df.sort(col("text_reviews_count").desc())

# Select specific columns and return the first 5 rows
result_df = sorted_df.select("id", "name", "text_reviews_count").limit(5)

# Show the resulting DataFrame
result_df.show()

# Stop the Spark session
spark.stop()

                                                                                

+-------+---------------+------------------+
|     id|           name|text_reviews_count|
+-------+---------------+------------------+
|   3389|   Stephen King|            608956|
|1077326|   J.K. Rowling|            606373|
| 153394|Suzanne Collins|            427224|
| 150038|Cassandra Clare|            416177|
|3433047|  Sarah J. Maas|            372923|
+-------+---------------+------------------+

CPU times: user 38.1 ms, sys: 11.3 ms, total: 49.5 ms
Wall time: 2.61 s


## Using Pandas

In [34]:
%%time
import pandas as pd

authors_json = pd.read_json('./lighter_authors.json', lines = True, chunksize=100)

authors_df = pd.concat(authors_json, ignore_index=False)

# Filter the DataFrame based on 'text_reviews_count'
filtered_df = authors_df[authors_df['text_reviews_count'] >= 100]

# Sort the filtered DataFrame in descending order by 'text_reviews_count'
sorted_df = filtered_df.sort_values(by='text_reviews_count', ascending=False)

# Select specific columns and return the first 5 rows
result_df = sorted_df[['id', 'name', 'text_reviews_count']].head(5)

# Print the resulting DataFrame
print(result_df)


             id             name  text_reviews_count
1017       3389     Stephen King              608956
86500   1077326     J.K. Rowling              606373
27522    153394  Suzanne Collins              427224
27110    150038  Cassandra Clare              416177
157593  3433047    Sarah J. Maas              372923
CPU times: user 23.2 s, sys: 1.93 s, total: 25.1 s
Wall time: 25.1 s


## Compare the performance of PySpark Vs Pandas:

| Library     | CPU user    | sys         | total       | Wall time   | 
| ----------- | ----------- | ----------- | ----------- | ----------- |
| PySpark | 38.1 ms | 11.3 ms | 49.5 | 2.61 |
| Pandas | 23.2 s | 1.93  | 25.1 s | 25.1 s |

## 1.2)

### First approach

In [43]:
# Create a Spark session
spark = SparkSession.builder.appName("JoinBooksAndAuthors").getOrCreate()

# Read the 'lighter_books.json' and 'lighter_authors.json' files
books_df = spark.read.json("lighter_books.json")
authors_df = spark.read.json("lighter_authors.json")

                                                                                

In [38]:
# Get the column names for each DataFrame for better understanding of the structure of datasets.
books_columns = books_df.columns
authors_columns = authors_df.columns

# Print the column names
print("Columns in 'lighter_books.json':", books_columns)
print("------------------------------------------------------------")
print("Columns in 'lighter_authors.json':", authors_columns)


Columns in 'lighter_books.json': ['asin', 'author_id', 'author_name', 'authors', 'average_rating', 'description', 'edition_information', 'format', 'id', 'image_url', 'isbn', 'isbn13', 'language', 'num_pages', 'original_publication_date', 'publication_date', 'publisher', 'rating_dist', 'ratings_count', 'series_id', 'series_name', 'series_position', 'shelves', 'text_reviews_count', 'title', 'work_id']
------------------------------------------------------------
Columns in 'lighter_authors.json': ['about', 'average_rating', 'book_ids', 'fans_count', 'gender', 'id', 'image_url', 'name', 'ratings_count', 'text_reviews_count', 'work_ids', 'works_count']


In [39]:
# Perform the join operation. books with the same author_id as authors with the same id, will be joined together.   
joined_df = books_df.join(authors_df, books_df['author_id'] == authors_df['id'], 'left_outer')

# Count the number of books without a matching author
books_without_author = joined_df.filter(authors_df['id'].isNull()).count()

# Count the number of books that couldn't be joined
unjoined_count = joined_df.filter(authors_df['id'].isNull()).count()

# Show the count
print(f"Number of books that could not be joined: {unjoined_count}")

# Stop the Spark session
spark.stop()

                                                                                

Number of books without a matching author: 0


### Second approach

In this approach, 
- first, we return the number of rows in lighter_books.json. which is stored in 'num_rows_books'.
- Then, we define a variable named 'joined_df_count' as an incrementation. if any row of lighter_books.json and any row of lighter_authors.json were be joint, increase 'joined_df_count' for 1. or basically calculate the count of joined_df.
- Then, we return the difference between the 'joined_df_count' and number of rows from the books.json file.


In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("JoinAndCount").getOrCreate()

# Read 'lighter_books.json' and 'lighter_authors.json'
books_df = spark.read.json("lighter_books.json")
authors_df = spark.read.json("lighter_authors.json")

# Join the two DataFrames based on 'author_id' and 'id'
joined_df = books_df.join(authors_df, books_df['author_id'] == authors_df['id'], 'inner')

# Calculate the number of rows in 'lighter_books.json'
num_rows_books = books_df.count()

# Define and increment the variable 'k' for each matching row
joined_df_count = joined_df.count()

# Calculate the difference between 'k' and the number of rows in 'lighter_books.json'
difference = joined_df_count - num_rows_books

# Print the result
print("Number of Rows in lighter_books.json:", num_rows_books)
print("Value of 'joined_df_count':", joined_df_count)
print("Difference:", difference)

# Stop the Spark session
spark.stop()


23/10/31 12:26:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Number of Rows in lighter_books.json: 7027431
Value of 'joined_df_count': 7027431
Difference: 0


### conclusion:
As can be seen, all books have an id that exists in authors.json file

# Question Bounes 2)

## 2.1) & 2.2)

In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType

# Create a Spark session
spark = SparkSession.builder.appName("GroupAuthorsIntoGenres").getOrCreate()

# Read 'lighter_authors.json' and 'lighter_books.json'
authors_df = spark.read.json("lighter_authors.json")
books_df = spark.read.json("lighter_books.json")

# Define triggers for genres
genre_triggers = {
    "Romance": ["love", "relationship", "heart", "passion", "kiss", "romantic", "affection"],
    "Mystery": ["mystery", "detective", "crime", "suspense", "puzzle", "enigma", "whodunit"],
    "Fantasy": ["fantasy", "magic", "kingdom", "mythical", "enchanted", "magical"],
    "Science Fiction": ["science fiction", "space", "alien", "future", "technology", "extraterrestrial"],
    "Non-Fiction": ["non-fiction", "memoir", "history", "biography", "documentary", "autobiography"],
    "Adventure": ["adventure", "quest", "journey", "exploration", "excitement", "expedition"],
    "Horror": ["horror", "scary", "fear", "terror", "supernatural", "haunting"],
    "Comedy": ["comedy", "funny", "humor", "laughter", "comic", "hilarious"],
    "Drama": ["drama", "tragedy", "emotional", "theatre", "intense", "performing"],
    "Thriller": ["thriller", "suspenseful", "intense", "nail-biting", "tension", "exciting"],
    "Biography": ["biography", "life story", "autobiography", "history", "memoir", "personal journey"],
    "Fantasy": ["fantasy", "magic", "kingdom", "mythical", "enchanted", "magical"],
    "Science Fiction": ["science fiction", "space", "alien", "future", "technology", "extraterrestrial"],
    "Historical": ["historical", "period", "past", "epoch", "history", "retro"],
    "Self-Help": ["self-help", "personal growth", "motivation", "positive", "self-improvement", "inspiration"],
    "Cooking": ["cooking", "culinary", "recipes", "food", "chef", "cuisine"],
    "Travel": ["travel", "adventure", "exploration", "journey", "destination", "vacation"],
    "Science": ["science", "scientific", "discovery", "research", "knowledge", "experiment"],
    "Children": ["children", "kids", "juvenile", "childhood", "youth", "picture book"],
    "Poetry": ["poetry", "verse", "rhyme", "lyrical", "poem", "stanza"],
}

# Define a UDF to assign genres based on triggers.
# The UDF assign_genre iterates through the genre_triggers dictionary.
# For each genre, it iterates through the list of triggers associated with that genre.
# It checks if any of these triggers (converted to lowercase for case-insensitive matching) are found in the 'about' text of the author.
# If a trigger is found in the 'about' text, the UDF returns the associated genre.
def assign_genre(text):
    for genre, triggers in genre_triggers.items():
        for trigger in triggers:
            if trigger in text.lower():
                return genre
    return None

# Register the UDF with Spark
assign_genre_udf = udf(assign_genre, StringType())

# Create a new column 'genre' based on the 'about' field
authors_df = authors_df.withColumn("genre", when(col("about").isNotNull(), assign_genre_udf(col("about"))).otherwise(None))

# Create a new column 'genre' based on the 'description' field
books_df = books_df.withColumn("genre", when(col("description").isNotNull(), assign_genre_udf(col("description"))).otherwise(None))

# Select and show the resulting DataFrame with 'id', 'name', and 'genre' columns
result_authors_df = authors_df.select("id", "name", "genre")
result_authors_df.show()

# Select and show the resulting DataFrame with 'id', 'title', and 'genre' columns
result_books_df = books_df.select("id", "title", "genre")
result_books_df.show()

# Stop the Spark session
spark.stop()


                                                                                

+---+--------------------+---------------+
| id|                name|          genre|
+---+--------------------+---------------+
|  4|       Douglas Adams|        Romance|
|  7|         Bill Bryson|    Non-Fiction|
| 10|         Jude Fisher|        Fantasy|
| 12|James Hamilton-Pa...|    Non-Fiction|
| 14|         Mark Watson|         Comedy|
| 16|       Edith Wharton|        Romance|
| 17|       Luther Butler|        Romance|
| 18|        Gary Paulsen|        Romance|
| 20|           Dale Peck|       Children|
| 23|       Angela Knight|        Romance|
| 24|       Delia Sherman|        Fantasy|
| 25|Patricia A. McKillip|        Romance|
| 26|      Anne McCaffrey|Science Fiction|
| 27|Zilpha Keatley Sn...|       Children|
| 29|        Kate Horsley|        Romance|
| 31|   Elaine Cunningham|        Fantasy|
| 32|       Philippa Carr|     Historical|
| 33|     Edward P. Jones|       Children|
| 36|        Satyajit Das|           NULL|
| 38|         Mark Smylie|        Fantasy|
+---+------

## 2.3)

As can be seen, if we increase the number of triggers for genres, the accuracy of dedicating of a genre would be higher.
- After reading two datasets using PySpart, we defined an Object with some key value pairs. the keys are the geners that we want to dedicate to each book or author's description or about part. the values are the list of words that we search for them in the description and the about text.
- then, we defined a User-Defined Function (UDF) to assign genres to authors and books based on certain "triggers" found in their 'about' and 'description' text.
- A UDF is a way to define a custom function that can be applied to the columns of a PySpark DataFrame.
- Then, We add a new column to the authors and books datafram, and wrote down the genres.
- The same approach works fine for both of the frameworks.
