In [1]:
# Question No 10-Book Metadata Extraction and Analysis
# create spark session 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("Book Metadata Analysis") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/13 15:00:27 WARN Utils: Your hostname, GOUTAM, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/02/13 15:00:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/13 15:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#Load Project Gutenberg Dataset
#Load all text files:
books_df = spark.read.text("/home/goutam/CSL7110/pyspark-tutorial/data/*.txt")

books_df = books_df.withColumnRenamed("value", "text")

books_df.show(5, truncate=False)


26/02/13 15:00:35 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: /home/goutam/CSL7110/pyspark-tutorial/data/*.txt.
java.io.FileNotFoundException: File /home/goutam/CSL7110/pyspark-tutorial/data/*.txt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:980)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1301)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:970)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.sinks.FileStreamSink$.hasMetadata(FileStreamSink.scala:58)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:384)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.sca

+----------------------------------------------+
|text                                          |
+----------------------------------------------+
|----- BOOK -----                              |
|Title: The Extermination of the American Bison|
|Author: William T. Hornaday                   |
|Link: http://www.gutenberg.org/ebooks/17748   |
|Bookshelf: Animal                             |
+----------------------------------------------+
only showing top 5 rows


In [3]:
#Add filename column
books_df = books_df.withColumn(
    "file_name",
    input_file_name()
)


In [4]:
#Reorder columns:
books_df = books_df.select("file_name", "text")


In [5]:
#Metadata Extraction (Regex)
# Extract File 
books_df = books_df.withColumn(
    "title",
    regexp_extract(
        col("text"),
        r"Title:\s*(.*)",
        1
    )
)


In [6]:
#Extract Release Date
books_df = books_df.withColumn(
    "release_date",
    regexp_extract(
        col("text"),
        r"Release Date:\s*(.*)",
        1
    )
)


In [7]:
#Extract Language
books_df = books_df.withColumn(
    "language",
    regexp_extract(
        col("text"),
        r"Language:\s*(.*)",
        1
    )
)


In [8]:
#Extract Encoding
books_df = books_df.withColumn(
    "encoding",
    regexp_extract(
        col("text"),
        r"Character set encoding:\s*(.*)",
        1
    )
)


In [9]:
#Clean Metadata (Important Step)
metadata_df = books_df.filter(col("title") != "")


In [10]:
# show result
metadata_df.select(
    "file_name",
    "title",
    "release_date",
    "language",
    "encoding"
).show(truncate=False)


+------------------------------------------------------------------------+--------------------------------------------------------------------------------+------------+--------+--------+
|file_name                                                               |title                                                                           |release_date|language|encoding|
+------------------------------------------------------------------------+--------------------------------------------------------------------------------+------------+--------+--------+
|file:///home/goutam/CSL7110/pyspark-tutorial/data/gutenberg_metadata.txt|The Extermination of the American Bison                                         |            |        |        |
|file:///home/goutam/CSL7110/pyspark-tutorial/data/gutenberg_metadata.txt|Deadfalls and Snares                                                            |            |        |        |
|file:///home/goutam/CSL7110/pyspark-tutorial/data/gutenberg_meta

In [40]:
# STEP 5 â€” ANALYSIS PART
#Books Released Per Year
metadata_df = metadata_df.withColumn(
    "year",
    regexp_extract(col("release_date"), r"(\d{4})", 1)
)



In [42]:
# Count books
books_per_year = metadata_df.groupBy("year").count()
books_per_year.show()


+----+-----+
|year|count|
+----+-----+
|    |15331|
+----+-----+



In [43]:
#ðŸ”¹ 2. Most Common Language
metadata_df.groupBy("language") \
    .count() \
    .orderBy(desc("count")) \
    .show(1)


+--------+-----+
|language|count|
+--------+-----+
|        |15329|
+--------+-----+
only showing top 1 row


In [44]:
#ðŸ”¹ 3. Average Title Length

metadata_df = metadata_df.withColumn(
    "title_length",
    length(col("title"))
)

metadata_df.select(
    avg("title_length")
).show()


+-----------------+
|avg(title_length)|
+-----------------+
|35.67118909399257|
+-----------------+



In [45]:
#QUESTION 11 â€” TF-IDF and Book Similarity
#Step 1 â€” Text Preprocessing
# Remove Gutenberg Header/Footer
clean_df = books_df.withColumn(
    "clean_text",
    regexp_replace(col("text"),
    "\\*\\*\\*.*?\\*\\*\\*", "")
)



In [46]:
#Lowercase + Remove Punctuation
clean_df = clean_df.withColumn(
    "clean_text",
    lower(regexp_replace(col("clean_text"), "[^a-zA-Z\\s]", " "))
)


In [47]:
#Tokenization
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
words_df = tokenizer.transform(clean_df)


In [48]:
#Remove Stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered_df = remover.transform(words_df)


In [49]:
#Step 2 â€” TF-IDF Calculation
#Term Frequency
from pyspark.ml.feature import HashingTF, IDF

tf = HashingTF(inputCol="filtered", outputCol="tf_features")
tf_df = tf.transform(filtered_df)


In [50]:
#IDF
idf = IDF(inputCol="tf_features", outputCol="tfidf_features")
idf_model = idf.fit(tf_df)

tfidf_df = idf_model.transform(tf_df)


                                                                                

In [51]:
#Step 3 â€” Cosine Similarity
#Convert to Vectors
from pyspark.ml.linalg import Vectors


In [52]:
#Self Join for Pairwise Similarity
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType
import numpy as np

def cosine_sim(v1, v2):
    return float(
        v1.dot(v2) /
        (np.linalg.norm(v1.toArray()) *
         np.linalg.norm(v2.toArray()))
    )

cosine_udf = udf(cosine_sim, FloatType())

pairs = tfidf_df.alias("a").crossJoin(
    tfidf_df.alias("b")
).filter(col("a.file_name") != col("b.file_name"))

similarity_df = pairs.withColumn(
    "similarity",
    cosine_udf(
        col("a.tfidf_features"),
        col("b.tfidf_features")
    )
)



In [53]:
#Top 5 Similar Books to "10.txt"
similarity_df.filter(col("a.file_name")=="10.txt") \
.orderBy(desc("similarity")) \
.select("b.file_name","similarity") \
.show(5)


+---------+----------+
|file_name|similarity|
+---------+----------+
+---------+----------+



26/02/13 15:31:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [65]:
#QUESTION 12 â€” Author Influence Network
#Step 1 â€” Extract Author & Year
from pyspark.sql.functions import col, regexp_extract, when
from pyspark.sql.types import IntegerType

author_df = metadata_df.withColumn(
    "author",
    regexp_extract(col("text"), "(?i)author:\\s*(.*)", 1)
).withColumn(
    "year_raw",
    regexp_extract(col("release_date"), r"(\\d{4})", 1)
).withColumn(
    "year",
    when(col("year_raw") == "", None)
    .otherwise(col("year_raw").cast(IntegerType()))
).drop("year_raw")



In [66]:
#Step 2 â€” Build Influence Edges
from pyspark.sql.functions import col, abs

X = 5

a = author_df.alias("a")
b = author_df.alias("b")

edges = a.join(
    b,
    col("a.author") != col("b.author")
).filter(
    abs(col("a.year") - col("b.year")) <= X
).select(
    col("a.author").alias("author1"),
    col("b.author").alias("author2")
)



In [63]:
#Step 3 â€” Degree Analysis
#In-Degree
in_degree = edges.groupBy("author2") \
.count() \
.withColumnRenamed("count","in_degree")


In [67]:
#Out-Degree
out_degree = edges.groupBy("author1") \
.count() \
.withColumnRenamed("count","out_degree")


In [68]:
#Top Authors
from pyspark.sql.functions import desc

in_degree.orderBy(desc("in_degree")).show(5)
out_degree.orderBy(desc("out_degree")).show(5)



+-------+---------+
|author2|in_degree|
+-------+---------+
+-------+---------+

+-------+----------+
|author1|out_degree|
+-------+----------+
+-------+----------+

