# Homework 2: Question 4 - Extending the table

Haiya Niraj Shah \
Andrew id - haiyas

#Prerequisities from previous questions

In [None]:
#Setup Spark
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
import requests
import xml.etree.ElementTree as ET
conf=pyspark.SparkConf().set('spark.driver.host','127.0.0.1').setAppName("NewsData").setMaster("local")
sc=SparkContext.getOrCreate(conf=conf)
sqlContext=SQLContext(sc)
spark=sqlContext.sparkSession.builder.getOrCreate()

In [None]:
#Database configuration
db_config ={
    'username':"postgres",
    'password':"password",
    'url':"jdbc:postgresql://localhost:5432/postgres",
    'table':"news.google_newsFeed",
    'driver':"org.postgresql.Driver"}

In [None]:
#Fetching data
rss_url= "https://news.google.com/rss/search?q=technology&hl=en-US&gl=US&ceid=US:en"
response =requests.get(rss_url)
xml_data =response.content

In [None]:
#Standard date conversion
from datetime import datetime
root=ET.fromstring(xml_data)

channel=root.find('channel')
build_date_str=channel.find('lastBuildDate').text if channel.find('lastBuildDate') is not None else None

build_date=None
if build_date_str:
    try:
        build_date=datetime.strptime(build_date_str, '%a, %d %b %Y %H:%M:%S %Z')
    except:
        build_date=None

#Extract news
news_data=[]
for item in root.findall('.//item'):
    title=item.find('title')
    link=item.find('link')
    pub_date=item.find('pubDate')
    desc=item.find('description')
    source=item.find('source')

    pub_date_converted=None
    if pub_date is not None and pub_date.text:
        try:
            pub_date_converted=datetime.strptime(pub_date.text, '%a, %d %b %Y %H:%M:%S %Z')
        except:
            pub_date_converted=None

    source_text=source.text if source is not None else None

    news_item={
        'lastBuildDate':build_date,
        'title':title.text if title is not None else None,
        'link':link.text if link is not None else None,
        'pubDate':pub_date_converted,
        'description':desc.text if desc is not None else None,
        'source':source_text
    }

    news_data.append(news_item)

print(f"Parsed {len(news_data)} news items")

Parsed 100 news items


In [None]:
#Inserting data
df = spark.createDataFrame(news_data)
df.show(3, truncate=True)

df.write.format("jdbc")\
.mode("overwrite")\
.option("url",db_config['url'])\
.option("dbtable",db_config['table'])\
.option("user",db_config['username'])\
.option("password",db_config['password'])\
.option("driver",db_config['driver'])\
.save()

print(f"Inserted {df.count()} records")

+--------------------+-------------------+--------------------+-------------------+------------------+--------------------+
|         description|      lastBuildDate|                link|            pubDate|            source|               title|
+--------------------+-------------------+--------------------+-------------------+------------------+--------------------+
|<a href="https://...|2025-09-18 17:42:32|https://news.goog...|2025-09-18 13:05:56|            GOV.UK|Memorandum of Und...|
|<a href="https://...|2025-09-18 17:42:32|https://news.goog...|2025-09-17 13:37:37|      Fox Business|Expert predicts A...|
|<a href="https://...|2025-09-18 17:42:32|https://news.goog...|2025-09-18 16:57:47|The Times of India|New technology la...|
+--------------------+-------------------+--------------------+-------------------+------------------+--------------------+
only showing top 3 rows
Inserted 100 records


#You decided to extend your table to include non-technology related news as well. Update your table to add a new column called “category” and
- pre-populate it with the value of “technology” for existing records. Then, populate the table with the following feeds:
- Business: https://news.google.com/rss/search?q=business&hl=en
US&gl=US&ceid=US:en
Use category value of “business” for these news records.
- Sports: https://news.google.com/rss/search?q=sports&hl=en
US&gl=US&ceid=US:en  
Use category value of “sports” for these news records.

In [None]:
from pyspark.sql.functions import lit

In [None]:
existing_df=spark.read.format("jdbc")\
    .option("url",db_config['url'])\
    .option("dbtable",db_config['table'])\
    .option("user",db_config['username'])\
    .option("password",db_config['password'])\
    .option("driver",db_config['driver'])\
    .load()\
    .cache()

In [None]:
#Technology added
existing_with_category=existing_df.withColumn("category",lit("technology"))
print(f"Existing records with category added: {existing_with_category.count()}")

Existing records with category added: 100


In [None]:
existing_with_category.write.format("jdbc")\
    .option("url",db_config['url'])\
    .option("dbtable",db_config['table'])\
    .option("user",db_config['username'])\
    .option("password",db_config['password'])\
    .option("driver",db_config['driver'])\
    .mode("overwrite")\
    .save()

In [None]:
def add_news_with_category(feed_url, category_name):
    response=requests.get(feed_url)
    xml_data=response.content
    root=ET.fromstring(xml_data)

    channel=root.find('channel')
    build_date_str=channel.find('lastBuildDate').text if channel.find('lastBuildDate') is not None else None
    build_date=None
    if build_date_str:
        try:
            build_date = datetime.strptime(build_date_str, '%a, %d %b %Y %H:%M:%S %Z')
        except:
            build_date = None

    #Same as Q2 but with category
    news_data=[]
    for item in root.findall('.//item'):
        title=item.find('title')
        link=item.find('link')
        pub_date=item.find('pubDate')
        desc=item.find('description')
        source =item.find('source')

        pub_date_converted = None
        if pub_date is not None and pub_date.text:
            try:
                pub_date_converted = datetime.strptime(pub_date.text, '%a, %d %b %Y %H:%M:%S %Z')
            except:
                pub_date_converted = None

        source_text=source.text if source is not None else None

        news_item={
            'lastBuildDate': build_date,
            'title':title.text if title is not None else None,
            'link': link.text if link is not None else None,
            'pubDate':pub_date_converted,
            'description': desc.text if desc is not None else None,
            'source':source_text,
            'category':category_name
        }
        news_data.append(news_item)

    df=spark.createDataFrame(news_data)
    df.write.format("jdbc")\
        .option("url",db_config['url'])\
        .option("dbtable",db_config['table'])\
        .option("user",db_config['username'])\
        .option("password",db_config['password'])\
        .option("driver",db_config['driver'])\
        .mode("append")\
        .save()

    print(f"Added {df.count()} {category_name} articles")

In [None]:
business_url="https://news.google.com/rss/search?q=business&hl=en-US&gl=US&ceid=US:en"
sports_url="https://news.google.com/rss/search?q=sports&hl=en-US&gl=US&ceid=US:en"
add_news_with_category(business_url, "business")
add_news_with_category(sports_url, "sports")

Added 102 business articles
Added 100 sports articles


In [None]:
final_df=spark.read.format("jdbc")\
    .option("url",db_config['url'])\
    .option("dbtable",db_config['table'])\
    .option("user",db_config['username'])\
    .option("password",db_config['password'])\
    .option("driver",db_config['driver'])\
    .load()

In [None]:
final_df.createOrReplaceTempView("news_table")

In [None]:
distinct_categories = spark.sql("""
    SELECT category, COUNT(*) as count
    FROM news_table
    GROUP BY category
    ORDER BY category
""")

In [None]:
print("Distinct categories in database:")
distinct_categories.show()

Distinct categories in database:
+----------+-----+
|  category|count|
+----------+-----+
|  business|  102|
|    sports|  100|
|technology|  100|
+----------+-----+



The notebook displays the categories from the database as a way to verify the output, confirming that the "technology," "business," and "sports" categories have been successfully populated.

### References

### Stack Overflow Solutions
- Spark DataFrame write to JDBC with mode overwrite creates empty table: https://stackoverflow.com/questions/45775495/spark-dataframe-write-to-jdbc-with-mode-overwrite-creates-empty-table
- RSS feed parsing with Python xml.etree: https://stackoverflow.com/questions/1912434/how-do-i-parse-xml-in-python

### Class Materials
- Lecture_2_Introduction_to_Cloud_And_Spark on scalable data processing with PySpark DataFrames
- Lecture_3_SQL_and_SparkSQL for data manipulation operations
