In [None]:
#this is the running code , use it if needed further ..

from pyspark.sql import SparkSession
import re
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("ProjectGutenberg").getOrCreate()

# Function to extract necessary fields from each file's text
def extract_fields(file_content):
    title_match = re.search(r'Title:\s*(.*)', file_content)
    release_date_match = re.search(r'Release Date:\s*(.*)', file_content)
    language_match = re.search(r'Language:\s*(.*)', file_content)
    encoding_match = re.search(r'Character set encoding:\s*(.*)', file_content)
    
    title = title_match.group(1).strip() if title_match else None
    release_date = release_date_match.group(1).strip() if release_date_match else None
    language = language_match.group(1).strip() if language_match else None
    encoding = encoding_match.group(1).strip() if encoding_match else None
    
    return (title, release_date, language, encoding)

# Load the text files into an RDD
input_folder = "/home/jovyan/data/"  # Replace with your folder path
text_files_rdd = spark.sparkContext.wholeTextFiles(input_folder)

# Extract the relevant fields from each file
extracted_fields_rdd = text_files_rdd.map(lambda file: extract_fields(file[1]))

# Convert the RDD to a DataFrame
columns = ["Title", "ReleaseDate", "Language", "Encoding"]
df = spark.createDataFrame(extracted_fields_rdd, columns)

# Print the DataFrame content after loading
df.show(truncate=False)

# Print the number of rows in the DataFrame
print(f"Number of rows in the DataFrame: {df.count()}")

# Function to extract year from Release Date
def extract_year(release_date):
    if release_date:
        match = re.search(r',\s*(\d{4})', release_date)
        return match.group(1) if match else None
    return 

# Register the function as a UDF (User Defined Function)
extract_year_udf = F.udf(extract_year)

# Extract the year and add it as a new column
df = df.withColumn("ReleaseYear", extract_year_udf(df.ReleaseDate))

# Calculate the number of books released each year
books_per_year = df.groupBy("ReleaseYear").count().orderBy("ReleaseYear")
books_per_year.show()

# Find the most common language in the dataset
most_common_language = df.groupBy("Language").count().orderBy("count", ascending=False).first()
print(f"The most common language is: {most_common_language['Language']} with {most_common_language['count']} books")


from pyspark.sql.functions import length

# Determine the average length of book titles (in characters)
df = df.withColumn("TitleLength", length(df.Title))
average_title_length = df.agg({"TitleLength": "avg"}).first()
print(f"The average length of book titles is: {average_title_length['avg(TitleLength)']:.2f} characters")



# Determine the average length of book titles (in characters)
#average_title_length = df.withColumn("TitleLength", df.Title.length()).agg({"TitleLength": "avg"}).first()
#print(f"The average length of book titles is: {average_title_length['avg(TitleLength)']:.2f} characters")
