# Get Blogs

Get the new blog posts based on the latest feeds and store them in the lakehouse. 

## Library dependency

- __[pyspark.sql.functions.regexp_replace](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_replace.html)__: Replace all substrings of the specified string value that match regexp with replacement.
- __[trafilatura](https://pypi.org/project/trafilatura/0.5.0/)__: Trafilatura is Python package and command-line tool which seamlessly downloads, parses, and scrapes web page data: it can extract metadata, main body text and comments while preserving part of the text formatting and page structure. The output can be converted to different formats. _Requires installation_.
- __[os](https://docs.python.org/3/library/os.html)__: This module provides a portable way of using operating system dependent functionality. 
- __[path](https://docs.python.org/3/library/pathlib.html)__: This module offers classes representing filesystem paths with semantics appropriate for different operating systems. Path classes are divided between pure paths, which provide purely computational operations without I/O, and concrete paths, which inherit from pure paths but also provide I/O operations.
- __[pyspark.sql.types](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html)__: Define a schema structure and data types.


In [None]:
%run CONFIG

## Retrieve new blog posts

Retrieve new blog posts by joining the latest feeds with the _blog_post_ table. 

In [None]:
#####################################
# Check if there are new blog posts #
#####################################

from pyspark.sql.functions import regexp_replace

# remove URLs that contain /tv/ as they are videos from SQL BI (TO BE CHANGED)
# check if blog_posts table already exists. If yes, join feed with existing table to get only new blogs, otherwise get entire feeds table
if spark.catalog.tableExists("blog_posts") :
    query = f"SELECT f.title, f.author, f.blog, f.link, f.published FROM {lh_raw}.feeds f LEFT JOIN {lh_raw}.blog_posts p ON f.link = p.link WHERE p.link IS NULL AND f.link NOT LIKE '%/tv/%'"
else :
    query = f"SELECT title, author, blog, link, published FROM {lh_raw}.feeds WHERE link NOT LIKE '%/tv/%'"

feeds = spark.sql(query) 

# add new column to dataframe that has the title without spaces or special charactes, used for naming
feeds = feeds.withColumn('ctitle', regexp_replace('title', '[^a-zA-Z0-9]', '')) # remove spaces and special characters from title

display(feeds)

## Function: get_blogpost

Function to retrieve the contents of a given URL and return the text. 

In [None]:
#########################
# Function: get_article #
#########################

from trafilatura import fetch_url, extract

def get_blogpost(url):
    """
    Returns the text contents of a given URL as text.
       
    Args: 
        url (string): URL of the RSS feed to read.
    Returns:
        text (string): RSS feed content as text.
    Library dependencies:
        trafilatura
    """
    # Extract article from a url
    downloaded = fetch_url(url)
    article = extract(downloaded, favor_precision=True)

    return article


## Get blog posts and store them



In [None]:
#################################
# Get blog posts and store them #
#################################

import os
import pathlib
from pyspark.sql.types import *

# define schema for blog_posts table
schema = StructType([
  StructField('title', StringType(), True),
  StructField('author', StringType(), True),
  StructField('blog', StringType(), True),
  StructField('link', StringType(), True),
  StructField('post', StringType(), True),
  StructField('post_length', StringType(), True),  
  StructField('published', StringType(), True)
  ])
# create empty dataframe based on schema
emptyRDD = spark.sparkContext.emptyRDD()
df = spark.createDataFrame(emptyRDD,schema)

# define default path to store blog posts
path = "/lakehouse/default/Files/blog_posts"

# loop through all the feeds and call the get_blogpost function for each blog post url
for blog in feeds.collect() :
    # call function
    blogpost = get_blogpost(blog.link)

    # if store_blog_posts is true (paramter can be set in CONFIG), then each blog is also stored as txt file in the lakehouse. 
    # check if blogpost has at least 1 character, as empty files lead to errors when indexing
    # This can be used to train a custom chatGPT instance. 
    if (store_blog_posts == True) and (len(blogpost) > 0):
        # define metadata to be added to txt file
        blog_details = f"[BlogTitleStart] {blog.title} [BlogTItleEnd]\n[BlogUrlStart] {blog.link} [BlogUrlEnd]\n" 

        # create folder if it doesn't exist
        pathlib.Path(f"{path}/{blog.blog}").mkdir(parents=True, exist_ok=True) 
        
        try:
            with open(f"{path}/{blog.blog}/{blog.ctitle}.txt", "x") as f:
                # write metadata and blog post to file in the lakehouse
                f.write(blog_details + blogpost)
                print(f"[Log] Stored blog as Files/blog_posts/{blog.blog}/{blog.blog}_{blog.ctitle}.txt")
                
        except Exception as ex:
            print(f"[Error] Cloudn't store blog {blog.ctitle}: " + str(ex))
            pass

    # independently of storing the blogs to txt files, they are added to a a dataframe
    try:
        row = spark.createDataFrame([(blog.title, blog.author, blog.blog, blog.link, blogpost, len(blogpost), blog.published)], schema)
        df = df.union(row)
    except Exception as ex:
        print(f"[Error] Cloudn't add blog {blog.ctitle} to dataframe: " + str(ex))
        pass
    
# store the new blog posts to the blog_posts table (append)
df.write.format("delta").mode("append").saveAsTable("blog_posts")