In [None]:
# Display existing tables
dbutils.fs.ls("/FileStore/tables")


In [None]:
# Read in parquet table of the Open Library author data dump and display
file_location = '/FileStore/tables/author_key_df_snappy.parquet'
ol_authors = spark.read.parquet(file_location)
display(ol_authors)

In [None]:
# pull in authors.csv data from previous projects on NYT bestsellers, display.
file_location = '/FileStore/tables/authors.csv'
nyt_authors_df = spark.read.csv(file_location, inferSchema=True, header=False)
nyt_authors_df = nyt_authors_df.withColumnRenamed("_c0", "author_id")
nyt_authors_df = nyt_authors_df.withColumnRenamed("_c1", "author_name")
display(nyt_authors_df)

In [None]:
# import necessary functions
from pyspark.sql.functions import col, when, split, length, substring, expr, regexp_extract, upper, size, lower


In [None]:
# Create joined table between OL authors and the NYT bestseller authors
nyt_author_key = ol_authors.join(nyt_authors_df, on='author_name', how='inner')

# Choose only OL key and name
nyt_ol_joined_inner = nyt_author_key.select('author_key', 'author_name')

# Add the column from the NYT bestseller list to indicate ID from previous projects
nyt_author_add_id = nyt_ol_joined_inner.join(nyt_authors_df, on='author_name', how='inner')

# Select the final desired columns
nyt_author_key_id = nyt_author_add_id.select('author_key', 'author_name', 'author_id')
nyt_author_key_id = nyt_author_key_id.orderBy(col('author_id'))

display(nyt_author_key_id)

In [None]:
# Perform a left join to combine OL and bestsellers based on "author_id" and "author_name"
joined_df = ol_authors.join(nyt_author_key_id, on=["author_name","author_key"], how="left")

# Add a new column "present_in_nyt" to indicate if the author is also a bestseller
joined_df = joined_df.withColumn("nyt_bestseller", when(col("author_id").isNotNull(), 1).otherwise(0))

# Select the desired columns: author_id, author_name, and present_in_nyt
authors_for_model_df = joined_df.select("author_key", "author_name", "nyt_bestseller")

# Show the result DataFrame
display(authors_for_model_df)

In [None]:
# Count how many are on the Bestseller list ("1") and how many are not ("0")
unique_values_counts = authors_for_model_df.groupBy("nyt_bestseller").count()

display(unique_values_counts)

In [None]:
from pyspark.sql.functions import col, split, length, substring, expr, regexp_extract

# Use regexp_extract to pull the last word from the "author_name" column
result_df = authors_for_model_df.withColumn("last_name", regexp_extract(col("author_name"), r"\b(\w+)\s*$", 1))

# Filter out the empties and spaces from the "last_name" column
result_df = result_df.withColumn("last_name", expr("CASE WHEN trim(last_name) = '' THEN NULL ELSE last_name END"))

# Get the first letter of the last name using the substring function
authors_for_model_df = result_df.withColumn("last_initial", substring(col("last_name"), 1, 1))


display(authors_for_model_df)


In [None]:
# Display the counts of authors per first letter of last name.
alphabetical_last_name_counts = authors_for_model_df.groupBy("last_initial").count()

# Order it alphabetically by first letter of last name
alphabetical_last_name_counts = alphabetical_last_name_counts.orderBy("last_initial")


display(alphabetical_last_name_counts)

In [None]:
# Register the DataFrame as a temporary view
alphabetical_last_name_counts.createOrReplaceTempView("last_initial_counts")

numeric_last_initial_total = spark.sql("""
    SELECT SUM(count) AS numeric_last_initial_count
    FROM last_initial_counts
    WHERE last_initial IN ("1","2","3","4","5","6","7","8","9","0")
    """).collect()
                                                    
# Extract the count of numeric values from the result
numeric_last_initial_count = numeric_last_initial_total[0]["numeric_last_initial_count"]


# Use Spark SQL query to count the occurrences of null values in "last_initial" column
null_last_initial_result = spark.sql("""
    SELECT count AS null_last_initial_count
    FROM last_initial_counts
    WHERE last_initial IS NULL
""").collect()

# Extract the count of null values from the result
null_last_initial_count = null_last_initial_result[0]["null_last_initial_count"]

# Assign a variable to total rows for calculations
total_authors = authors_for_model_df.count()

# Calculate percentage of null and numeric for deletion consideration
total_special_character_initial = numeric_last_initial_count + null_last_initial_count
percentage_of_authors = (total_special_character_initial/total_authors) * 100

# Display the counts
print("-----------------------------------")
print("Count of Numeric Last Initials: ", numeric_last_initial_count)
print("Count of Null Last Initials: ", null_last_initial_count)
print("Null and Numeric Last Initials Represent ",percentage_of_authors,"%")



In [None]:
# Remove rows with "null" or numbers in the "last_initial" column
author_model_data = authors_for_model_df.filter(
    col("last_initial").isNotNull()  
    & ~col("last_initial").rlike(r"^\d")  # Remove rows starting with numbers
)

# Replace lower-case letters with their upper-case counterparts in the "last_name_initial" column
author_model_data = author_model_data.withColumn("last_initial", upper(col("last_initial")))

# Display the cleaned DataFrame
author_model_data.show()

In [None]:
# Display the counts of authors per first letter of last name.
last_initial_counts = author_model_data.groupBy("last_initial").count()

# Order it alphabetically by first letter of last name
last_initial_counts = last_initial_counts.orderBy("last_initial")


display(last_initial_counts)

In [None]:
# Read in parquet table of the Open Library author data dump and display
file_location = '/FileStore/tables/works_key_snappy.parquet'
ol_works = spark.read.parquet(file_location)
display(ol_works)

In [None]:
# Register the DataFrame as a temporary view
ol_works.createOrReplaceTempView("works")


# Use Spark SQL query to count the occurrences of null values in "last_initial" column
null_author_result = spark.sql("""
    SELECT count(*) AS null_author_count
    FROM works
    WHERE author_key IS NULL
""").collect()

# Fetch the count value from the result
null_author_count = null_author_result[0]["null_author_count"]
print("The number of works with a null in the author_key value is:", null_author_count)

In [None]:
# Building dense data
works_data = ol_works.dropna(subset=["author_key"])

display(works_data)


In [None]:
works_data.createOrReplaceTempView("works_data")
author_model_data.createOrReplaceTempView("author_data")

# Create a new DataFrame by joining 'df1' and 'df2' on "author_key"
works_model = spark.sql("""
    SELECT works_data.work_key, works_data.title, author_model_data.author_key, author_model_data.author_name,
           author_model_data.nyt_bestseller, author_model_data.last_initial
    FROM works_data works_data
    INNER JOIN author_data author_model_data ON works_data.author_key = author_model_data.author_key
""")

# Show the new DataFrame
works_model.show()

In [None]:
works_model = works_model.withColumnRenamed("nyt_bestseller", "nyt_best_author")
works_model = works_model.withColumnRenamed("last_initial", "author_last_initial")

display(works_model)

In [None]:
works_model.createOrReplaceTempView("works_model")

# Add a new column "title_word_count" that shows the number of words in the "title" column
works_model_data = works_model.withColumn("title_word_count", size(split(col("title"), " ")))

# Show the DataFrame with the new column "title_word_count"
display(works_model_data)

In [None]:

# dbfs_output_path = "/FileStore/tables/model_test_works_authors"

# works_model_data.write.parquet(dbfs_output_path)

In [None]:
# print(dbfs_output_path)

In [None]:
# pull in authors.csv data from previous projects on NYT bestsellers, display.
file_location = '/FileStore/tables/books.csv'
nyt_books_df = spark.read.csv(file_location, inferSchema=True, header=False)
# nyt_authors_df = nyt_authors_df.withColumnRenamed("_c0", "author_id")
# nyt_authors_df = nyt_authors_df.withColumnRenamed("_c1", "author_name")
display(nyt_books_df)

In [None]:

# Register the DataFrames as temporary tables
works_model.createOrReplaceTempView("table1")
nyt_books_df.createOrReplaceTempView("table2")

# Convert both "title" columns to lowercase before the join
works_model_lower = works_model.withColumn("title_lower", lower(col("title")))
nyt_books_df_lower = nyt_books_df.withColumn("title_lower", lower(col("_c1")))

# Perform the join on the lowercase "title_lower" column
model_data = works_model_lower.join(nyt_books_df_lower, ["title_lower"], "inner").drop("title_lower")

# Show the merged DataFrame with the common "title" column
display(model_data)

In [None]:
# Drop Duplicates
model_data = model_data.dropDuplicates(["work_key"])
display(model_data)

In [None]:
# Perform a left join to combine OL and bestsellers based on "author_id" and "author_name"
attempt = works_model.join(model_data, on=["work_key","title","author_key","author_name","nyt_best_author", "author_last_initial"], how="left")

# Add a new column "work_on_nyt" to indicate if the work is also a bestseller
attempt = attempt.withColumn("work_on_nyt", when(col("_c1").isNotNull(), 1).otherwise(0))

# Select the desired columns: author_id, author_name, and present_in_nyt
attempt_model = attempt.select("work_key", "title", "author_key", "author_name", "nyt_best_author","author_last_initial","work_on_nyt")

# Show the result DataFrame
display(attempt_model)

In [None]:
attempt_model.columns

In [None]:
# Register the DataFrame as a temporary view
attempt_model.createOrReplaceTempView("attempt_model")

works_on_nyt_total = spark.sql("""
    SELECT COUNT(*) AS works_on_nyt_count
    FROM attempt_model
    WHERE work_on_nyt == 1
    """).collect()
                                                    
# Extract the count of numeric values from the result
works_on_nyt_count = works_on_nyt_total[0]["works_on_nyt_count"]

works_NOT_on_nyt_total = spark.sql("""
    SELECT COUNT(*) AS works_NOT_on_nyt_count
    FROM attempt_model
    WHERE work_on_nyt == 0
    """).collect()
                                                    
# Extract the count of numeric values from the result
works_NOT_on_nyt_count = works_NOT_on_nyt_total[0]["works_NOT_on_nyt_count"]

# Assign a variable to total rows for calculations
total_works = attempt_model.count()


# Display the counts
print("-----------------------------------")
print("Count of Works on a NYT Bestseller List: ", works_on_nyt_count)
print("Count of Works NOT on a NYT Bestseller List: ", works_NOT_on_nyt_count)
print("Total works represented ", total_works)
