In [1]:
import feedparser
import csv
import pandas as pd
import re
import findspark
findspark.init()
findspark.find()
import pyspark
import os
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
import datetime
import time
from sqlalchemy import create_engine
import psycopg2
import requests
import xml.etree.ElementTree as ET

In [2]:
def insertIntoTable(df, table):
        """
        Using cursor.executemany() to insert the dataframe
        """
        # Create a list of tupples from the dataframe values
        tuples = list(set([tuple(x) for x in df.to_numpy()]))
    
        # Comma-separated dataframe columns
        cols = ','.join(list(df.columns))
        # SQL query to execute
        query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
            table, cols)
    
        try:
            cur.executemany(query, tuples)
            conn.commit()

        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1

In [3]:
def parseRSS( rss_url ): #функция получает линк на рсс ленту, возвращает распаршенную ленту с помощью feedpaeser
    return feedparser.parse( rss_url )  

    
def getHeadlines( rss_url ): #функция для получения заголовков новости
    headlines = []
    feed = parseRSS( rss_url )
    for newsitem in feed['items']:
        headlines.append(newsitem['title'])
    return headlines

def getCategory( rss_url ): #функция для получения описания новости
    category = []
    feed = parseRSS( rss_url )
    for newsitem in feed['items']:
        category.append(newsitem['category'])
    return category

def getLinks( rss_url ): #функция для получения ссылки на источник новости
    links = []
    feed = parseRSS( rss_url )
    for newsitem in feed['items']:
        links.append(newsitem['link'])
    return links

def getDates( rss_url ): #функция для получения даты публикации новости
    dates = []
    feed = parseRSS( rss_url )
    for newsitem in feed['items']:
        dates.append(newsitem['published'])
    return dates

In [4]:
def getUrl( rss_url ): #функция для получения названия издания
    urls = []
    feed = parseRSS( rss_url )
    for newsitem in feed['items']:
        urls.append(rss_url)
    return urls

In [5]:
#Список источников

newsurls = {'Kommersant': 'https://www.kommersant.ru/RSS/news.xml',
            'Lenta.ru': 'https://lenta.ru/rss/',
            'Vedomosti':'https://www.vedomosti.ru/rss/news',
            'Tass':'https://tass.ru/rss/v2.xml'} 

In [6]:
allheadlines = []
allcategory = []
alllinks = []
alldates = []
allurls = []
# Прогоняем нашии URL и добавляем их в наши пустые списки
for key,url in newsurls.items():
    allheadlines.extend( getHeadlines( url ) )
    
for key,url in newsurls.items():
    allcategory.extend( getCategory( url ) )
    
for key,url in newsurls.items():
    alllinks.extend( getLinks( url ) )
    
for key,url in newsurls.items():
    alldates.extend( getDates( url ) )

for key,url in newsurls.items():
    allurls.extend(getUrl( url ))

In [7]:
df = pd.DataFrame(
    {'source':allurls,
     'date_new': alldates,
     'category': allcategory,
     'headline': allheadlines
    })

In [8]:
df.head()

Unnamed: 0,source,date_new,category,headline
0,https://www.kommersant.ru/RSS/news.xml,"Mon, 26 Dec 2022 21:07:39 +0300",Бизнес,Украина с 2023 года поднимет тарифы на прокачк...
1,https://www.kommersant.ru/RSS/news.xml,"Mon, 26 Dec 2022 20:53:25 +0300",Мир,Эрдоган заявил о намерении Турции закрыть бреш...
2,https://www.kommersant.ru/RSS/news.xml,"Mon, 26 Dec 2022 20:29:33 +0300",Мир,Премьер Индии и президент Украины обсудили укр...
3,https://www.kommersant.ru/RSS/news.xml,"Mon, 26 Dec 2022 20:19:13 +0300",Экономика,ЦБ РФ сообщил о росте индикатора бизнес-климат...
4,https://www.kommersant.ru/RSS/news.xml,"Mon, 26 Dec 2022 20:08:06 +0300",Общество,Ялта оказалась перед угрозой повторного затопл...


In [9]:
df=df.drop_duplicates()

## Подключаемся к БД PostgreSql

In [23]:
conn = psycopg2.connect(
  database="postgres",
  user="admin",
  
  password="admin"
)

In [24]:
cur = conn.cursor()

In [14]:
#создаем таблицу сырых данных, первый слой

sql = '''CREATE TABLE IF NOT EXISTS public.news_raw_2(source varchar(200),
                                            date_new date,
                                            category varchar(200),
                                            headline varchar(200))
TABLESPACE pg_default;

ALTER TABLE IF EXISTS public.news_raw_2
    OWNER to postgres;
        '''


In [15]:
cur.execute(sql)  

In [18]:
insertIntoTable(df, 'news_raw_2')

In [19]:
sql_final_drop = '''
DROP TABLE public.news_final;
COMMIT;
'''

In [20]:
cur.execute(sql_final_drop) 

In [27]:
#формирование итоговой витрины с необходимыми данными

sql_final = '''
CREATE TABLE IF NOT EXISTS public.news_final as
with table_all as --все записи по категории
(SELECT a.category,
        
        count(*) as count_news_all,
 		round(count(*)/(current_date-min(a.date_new)+1)) avg_news,
 		count(case when a.source like '%kommersant%' then a.source end) count_news_all_kommersant,
 		count(case when a.source like '%lenta%' then a.source end) count_news_all_lenta,
 		count(case when a.source like '%vedomosti%' then a.source end) count_news_all_vedomosti,
 		count(case when a.source like '%tass%' then a.source end) count_news_all_tass
from news_raw_2 as a
group by a.category),

table_day as --все записи по категории за сутки
(SELECT a.category,
        
        count(*) as count_news_day,
 		count(case when a.source like '%kommersant%' then a.source end) count_news_day_kommersant,
 		count(case when a.source like '%lenta%' then a.source end) count_news_day_lenta,
 		count(case when a.source like '%vedomosti%' then a.source end) count_news_day_vedomosti,
 		count(case when a.source like '%tass%' then a.source end) count_news_day_tass
from news_raw_2 as a
where (current_date - a.date_new)<1
group by a.category),

average_news as --день когда было сделано максимальное кол-во публикаций по данной новости
(SELECT d.category,
		max(date_new) date_max_count
FROM 
(SELECT c.category,
		c.date_new
FROM
(SELECT b.*,
		max(b.count_news_date) over (partition by b.category) max_count
FROM
(SELECT a.category,
        a.date_new,
        count(*) as count_news_date
from news_raw_2 as a
GROUP BY a.category,
        a.date_new) b
WHERE count_news_date <> 0

) c
WHERE c.max_count=c.count_news_date) d
GROUP BY d.category),

news_mon as
(SELECT a.category,
        
        count(*) as count_news_mon
from news_raw_2 as a
where extract(dow from date_new::timestamp)=1
GROUP BY a.category),

news_tue as
(SELECT a.category,
        
        count(*) as count_news_tue
from news_raw_2 as a
where extract(dow from date_new::timestamp)=2
GROUP BY a.category),

news_wed as
(SELECT a.category,
        
        count(*) as count_news_wed
from news_raw_2 as a
where extract(dow from date_new::timestamp)=3
GROUP BY a.category),

news_thu as
(SELECT a.category,
        
        count(*) as count_news_thu
from news_raw_2 as a
where extract(dow from date_new::timestamp)=4
GROUP BY a.category),

news_fri as
(SELECT a.category,
        
        count(*) as count_news_fri
from news_raw_2 as a
where extract(dow from date_new::timestamp)=5
GROUP BY a.category),

news_sat as
(SELECT a.category,
        
        count(*) as count_news_sat
from news_raw_2 as a
where extract(dow from date_new::timestamp)=6
GROUP BY a.category),

news_sun as
(SELECT a.category,
        
        count(*) as count_news_sun
from news_raw_2 as a
where extract(dow from date_new::timestamp)=7
GROUP BY a.category)


select table_all.*,
		
		table_day.count_news_day,
		table_day.count_news_day_kommersant,
		table_day.count_news_day_lenta,
		table_day.count_news_day_vedomosti,
		table_day.count_news_day_tass,
		average_news.date_max_count,
		news_mon.count_news_mon,
		news_tue.count_news_tue,
		news_wed.count_news_wed,
		news_thu.count_news_thu,
		news_fri.count_news_fri,
		news_sat.count_news_sat,
		news_sun.count_news_sun
from table_all left join table_day on table_all.category=table_day.category
				left join average_news on table_all.category=average_news.category
				left join news_mon on table_all.category=news_mon.category
				left join news_tue on table_all.category=news_tue.category
				left join news_wed on table_all.category=news_wed.category
				left join news_thu on table_all.category=news_thu.category
				left join news_fri on table_all.category=news_fri.category
				left join news_sat on table_all.category=news_sat.category
				left join news_sun on table_all.category=news_sun.category
order by table_all.count_news_all desc;
        

        '''

In [28]:
cur.execute(sql_final)  