In [None]:
from lib.llm.model import (
    model_api_client,
    make_impact_from_news,
    make_reasons_from_news,
    make_operational_countries,
    make_summary_from_news,
)
from lib.scraping.scrap import extract_text_to_dataframe
import pandas as pd

In [None]:
client = model_api_client()

In [None]:
news_data = spark.sql("select * from hive_metastore.default.news_data WHERE Date >= DATE_SUB(CURRENT_DATE(), 7) AND Date <= CURRENT_DATE()").toPandas().dropna().reset_index(drop=True)
stock_df = spark.sql("select * from hive_metastore.default.stock_data WHERE stockID is not NULL").toPandas().dropna().reset_index(drop=True)
event_df = spark.sql("select * from hive_metastore.default.cameo_event").toPandas().dropna().reset_index(drop=True)

In [None]:
stock_df['Countries'] = stock_df.apply(lambda row: make_operational_countries(row, client), axis=1)
stock_df = stock_df.explode('Countries')

In [None]:
filtered_data = pd.merge(news_data, stock_df, left_on=['ActionCountryCode'], right_on=['Countries'], how='inner')
filtered_data['EventCode'] = filtered_data['EventCode'].astype(int)
event_df['EventCode'] = event_df['EventCode'].astype(int)
filtered_data = (
    filtered_data.join(event_df, on='EventCode', how='left', rsuffix='r')
)
filtered_data = filtered_data.loc[filtered_data['NumMentions']>= 50]
filtered_data = filtered_data[
    ['Date', 'EventId', 'stockID', 'company_name', 'industry', 'Countries', 'ArticleUrl', 'NumMentions', 'EventName', 'AvgTone', 'GoldsteinScale', 'trading_market', 'industry', 'position']
    ].reset_index(drop=True)

In [None]:
news_df_processed = extract_text_to_dataframe(filtered_data, url_column = 'ArticleUrl', output_column = 'news_content')
news_df_processed = news_df_processed.loc[(news_df_processed['news_content'] !='""')].reset_index(drop=True)

In [None]:
news_df_processed['news_summary'] = news_df_processed.apply(lambda row: make_summary_from_news(row, client), axis=1)
news_df_processed['impact'] = news_df_processed.apply(lambda row: make_impact_from_news(row, client), axis=1)
news_df_processed['reasons'] = news_df_processed.apply(lambda row: make_reasons_from_news(row, client), axis=1)

In [None]:
news_df_processed.to_csv('news_data_processed.csv', index=False)

In [None]:
spark.createDataFrame(news_df_processed).write.mode("overwrite").saveAsTable("default.output_table_dashboard")