# GaiaLens Data Engineering Task
Author: Rhys Cooper

### Summary:

This code extracts news sentiment data for Tesla Inc between 25-02-2023 and 25-08-2023 using GaiaLens’ news API, saves it as 'Tesla_newssent_extaction.csv' and then aggregates it. Before aggregating, a 5% decay for each month that has passed from the current date is applied to the sentiment score. Then the downweighted scores are averaged per required metric column specified in 'sentiment_columns.csv' and saved as a CSV called 'telsa_newssent_aggregation.csv'. The final output is therefore a dataframe with companyname and all the sentiment columns where each of the sentiment column is a mean over down-weighted values across all the dates.


## Section 1:  Prerequisites
- Import required packages.
- Load given CSV's into preliminary dataframes.

In [6]:
import requests
import pandas as pd
import numpy as np
import datetime

#load provided csvs as dataframes 
dates_tesla_df = pd.read_csv(filepath_or_buffer= '/Users/rhyscooper/Downloads/dates_tesla.csv')
sentiment_columns_df = pd.read_csv(filepath_or_buffer='/Users/rhyscooper/Downloads/sentiment_columns.csv')

## Section 2: Data extraction tools defintions
- Defines the 'data extraction statistics' class with a primary function of calculating all the dates in the data range.
- Defines the 'api call' used to call the Gaia Lens API.

In [7]:
# This class extracts the relevant statistics required for the data extraction.
# the "get_date_range" method creates a list of all dates between the first and last date of the 'dates_tesla_csv'.
class data_extraction_stats():
    def __init__(self, dates_df) -> None:
        self.first_date = dates_df.iloc[0, 0]
        self.last_date = dates_df.iloc[-1, 0]
        self.get_date_range()
        
    def get_date_range(self):
        first_date_DTD = datetime.datetime.strptime(self.first_date, '%d-%m-%Y').date()
        last_date_DTD = datetime.datetime.strptime(self.last_date, '%d-%m-%Y').date()
        # +1 added below so end date is included in the list
        self.date_list = [first_date_DTD + datetime.timedelta(days=x) for x in range((last_date_DTD  - first_date_DTD).days + 1)]
        self.date_list = [date.strftime('%Y-%m-%d') for date in self.date_list]
        self.possible_dates = len(self.date_list)

In [8]:
##This class houses the api call meta data and call functionality. The 'push call' method takes 'date' as input argument and returns the response from the GaiaLens news API. 
# Any failed push calls are excepted and the failed date returned. User required to input their 'X-RapidAPI-Key'.
class API_call():
    def __init__(self) -> None:
        self.url  = "https://gaialens-esg-news.p.rapidapi.com/news"
        self.headers = {"X-RapidAPI-Key": '',
	                    "X-RapidAPI-Host": "gaialens-esg-news.p.rapidapi.com"}
    
    def push_call(self, date):
        self.querystring = {"isin":"US88160R1014","date":date}
        try:
            response = requests.get(self.url, headers=self.headers, params=self.querystring)
            return response
        except Exception:
            print("date", date)

## Section 3: Main data extractor class defintion
- This 'data_extractor' is a multi-inhertiance child class from the aforementioned 'data_extraction_stats' and 'API_call' parents classes.
- It collects all articles for all available dates, turn them into dataframe rows and concatenate them into an overall dataframe which is saved as a CSV.
- Includes a validation check to ensure the dataframe is of expected dimensions. 

In [9]:

class data_extractor(data_extraction_stats, API_call):
    def __init__(self, dates_DF, Filepath) -> None:
        data_extraction_stats.__init__(self, dates_df = dates_DF)
        API_call.__init__(self)
        self.news_data = {}
        self.filepath = Filepath

    # This method stores the responses for all possible dates in the 'news_data' attribute.
    # Converting the response into json format will not work if there is no articles available for that date. This is excepted
    # and the empty dates are counted. 
    
    def call_for_all_dates(self):
        self.empty_dates = []
        for date in self.date_list:
            response_raw = self.push_call(date)
            try:
                response =response_raw.json()
            except Exception:
                self.empty_dates.append(date)   
            if isinstance(response, dict):
                self.news_data[date] = [response]
            else: 
                self.news_data[date] = response
                  
        self.n_empty_dates = len(self.empty_dates)
        self.filled_dates = len(self.date_list) - len(self.empty_dates)
    
   # each article for each date is constructed into a row of the dataframe. ie multiple rows for each article per date.     
    def get_per_date_df_rows(self):
        self.per_date_rows = {date: [] for date in self.news_data.keys()}
        for date in self.news_data.keys():
            day_articles = self.news_data[date]
            for article_Num in range(len(day_articles)):
                article = day_articles[article_Num]
                article_df = pd.DataFrame([article])
                self.per_date_rows[date].append(article_df)
    
    #unpacks the date:articles dictionary and stores all rows in a list, which are then concatenated to form the overall dataframe.               
    def construct_df(self):
        all_rows = []
        for dfs in self.per_date_rows.values():
            for df in dfs:
                all_rows.append(df)
        self.overall_df = pd.concat(all_rows, axis='rows')

    # validates the true availabe dates (all possible minus responces recieved for) against the number of unique dates in the dataframe
    # Also checks the number of rows in the dataframe is equal to the total amount of articles available across the data range.
    def validation_check(self):

        true_n_days = len(self.date_list) - self.n_empty_dates
        provided_n_days = self.overall_df['date'].nunique()
        print(true_n_days == provided_n_days)
        
        true_rows = sum([len(articles) for articles in self.news_data.values()])
        
        self.correct_shape = true_n_days ==provided_n_days and true_rows == self.overall_df.shape[0]
        
        return true_rows
    
    # the below method returns statements describing the success of the data extraction and if the resultant dataframe is of the expected dimensions.
    def report_extractions_success(self):
        expected_rows = self.validation_check()
        print("Extracted data for {suc} out of {tot} days as empty data for a total of {fail} days.".format(suc = self.filled_dates, tot = self.possible_dates, fail = self.n_empty_dates))
        if self.correct_shape:
            print("\n", "The returned dataframe is of the expected dimensions of", self.overall_df.shape)
        
        else:
            print("\n", "The returned dataframe is not of the expected dimensions of 60 by", expected_rows, "but instead", self.overall_df.shape)
    
    def run_all(self):
        self.call_for_all_dates()
        self.get_per_date_df_rows()
        self.construct_df()
        self.report_extractions_success()
        self.save_as_csv()       
            
    def save_as_csv(self):
        self.overall_df.to_csv(self.filepath + 'Tesla_newssent_extraction.csv', index=False)


## Section 4: Run Date extraction
- User should replace Filepath variable with their own. This w

In [10]:
#create class instance and run all methods.
de = data_extractor(dates_DF= dates_tesla_df, Filepath='/Users/rhyscooper/')
de.run_all()

True
Extracted data for 167 out of 182 days as empty data for a total of 15 days.

 The returned dataframe is of the expected dimensions of (1312, 60)


## Section 5: Data Aggregation class definition
- Main purpose of this class is to aggregate the data included in the data extraction object.
- It down weights the sentiment scores by 5% for every month that has passed from the current date.
- Only the mean values for the columns present in the sentiment_columns are cacluated, as per the brief.
- Non required columns are given a mean value of Nan and are subsquently dropped.
- Includes a validation test to check the sentiment values have been down weighted correctly, and the resultant dataframe only includes the required columns.

In [11]:
# This data aggregation class does not inherit from the data extraction class but does have the data extraction class instance saved as a class attribute.
class data_agg():
    def __init__(self, sent_cols_df, Data_extractor_obj) -> None:
        self.sent_cols = sent_cols_df
        self.DE_obj = Data_extractor_obj
        self.date_list =Data_extractor_obj.date_list
        self.current_date = datetime.date.today()
        self.discount_perc = 0.05
        self.pre_calc_weights()
    
    # Calculates the amount of full, 30 day length months that has elapsed between the inputted date and the current date. 
    def elapsed_months(self, date):
        date_dt = datetime.datetime.strptime(date, '%Y-%m-%d').date()
        date_diff = self.current_date - date_dt
        elapsed_full_months = date_diff.days // 30
        
        return elapsed_full_months    

    # To reduce run time, each down weight is pre-calculated for every date in the data range and stored in the weight dictionary.
    def pre_calc_weights(self):
        self.weight_dict = {}
        self.non_metric_cols = list(self.DE_obj.overall_df.columns[:5])
        self.metric_cols = list((self.sent_cols.iloc[:,0]))
        self.all_cols = list(self.DE_obj.overall_df.columns)
        for date in self.date_list:
            elapsed_full_months = self.elapsed_months(date)
            down_weight = 1- elapsed_full_months*self.discount_perc
            self.weight_dict[date] = down_weight

    # Function to be used in the 'apply_discounter_to_df' method below that discounts the value in each row by the the down weight.
    # 'metric_cols' only include numeric/string values, whereas 'non_metric_cols' includes companyname, date etc.
    def discounter(self, row):
        weight = self.weight_dict[row['date']]
        for col in row.index:
            if col in self.metric_cols and col not in self.non_metric_cols:
                    row[col] = row[col] * weight if not isinstance(row[col], (str)) else row[col]
        return row            
                
    def apply_discounter_to_df(self):
        df = self.DE_obj.overall_df.copy()
        df = df.apply(self.discounter, axis=1)
        self.DA_overall = df
    
    # Below method checks that the downweighted values present in the data aggregation dataframe equal the expected weighted down value based on its corresponding
    # value in the data extraction dataframe. 
    # It also checks the included columns in the final dataframe equal the specified columns to be included.  
    def DA_validation_check(self):
        
        df_DE = self.DE_obj.overall_df.copy()
        df_DE.reset_index(drop=True, inplace=True)
        df_DA = self.DA_overall.copy()
        df_DA.reset_index(drop=True, inplace=True)
        
        # Selects the first specified sentiment columns to test (GHG_emissions). The first non-zero metric score in the column is used in the test. 
        first_sent_col = self.sent_cols.iloc[0,0]
        DE_first_index = df_DE.loc[df_DE[first_sent_col] != 0].first_valid_index()
        DE_first_value =  df_DE[first_sent_col].loc[DE_first_index]
        DE_first_date = df_DE['date'].iloc[DE_first_index]
        
        DA_first_value = df_DA[first_sent_col].iloc[DE_first_index]

        test_elapsed_months= self.elapsed_months(DE_first_date)
        down_weight = self.weight_dict[DE_first_date]
        expected_value = DE_first_value * down_weight
        
        # Since final data aggregation dataframe only includes companyname and the sentiment columns included in the provided 'sentiment columns.csv', they form the the expected columns.
        expected_cols = ['companyname'] + self.metric_cols
        returned_cols = list(self.DA_mn_only_sent.columns)
        
        # If both tests are passed, prints comments describing the tests and demonstrates the down weighting equation being applied correctly.
        if all([expected_value == DA_first_value, expected_cols ==  returned_cols]):
            print("Returned columns match the", len(expected_cols), 'specified columns.')
            print('Down weighting has been applied correctly where a score of {DEFV} from {TEM} complete months ago is weighted down by {W} to {DAFV}'.format(DEFV = DE_first_value,TEM = test_elapsed_months,  W=down_weight, DAFV = DA_first_value))
            
        return all([expected_value == DA_first_value, expected_cols ==  returned_cols])

    # Firstly calculates the mean sentiment scores for all columns. If sentiment column isnt in the required columns, a value of nan is used as the mean.
    # Mean values per column, including nan, are stored in a dicitonary that is then turned into the dataframe of the mean values per all columns.
    def calc_mean_sents(self):
        all_cols =  {col: np.nan  for col in list(self.DA_overall.columns)}

        for col in self.DA_overall.columns:
            if col in self.metric_cols and col not in self.non_metric_cols:
                all_cols[col] = round(self.DA_overall[col].mean(), 5)
                
        #company name added as the first column according to the specification. 
        all_cols['companyname'] = self.DA_overall['companyname'].iloc[0]
        self.DA_mn = pd.DataFrame([all_cols])

    # This method defines all the required columns (company name + specified sentiment columns) and returns the dataframe only with these columns, effectivley dropping the non-required columns.
    def drop_columns(self):
        self.required_info = ['companyname']
        self.required_info.extend(self.metric_cols)
        
        self.DA_mn_only_sent = self.DA_mn[[col for col in self.required_info]]
    # User sh       
    def DA_save(self):
        self.DA_mn_only_sent.to_csv(self.DE_obj.filepath + 'Tesla_newssent_aggregation.csv', index=False)
 
    def run_all(self):
        self.apply_discounter_to_df()
        self.calc_mean_sents()
        self.drop_columns()
        if self.DA_validation_check():
            self.DA_save()
        else:
            print("error in data aggregation")
        

## Section 6: Run data aggregation

In [12]:
da = data_agg(sentiment_columns_df, de)
da.run_all()

Returned columns match the 49 specified columns.
Down weighting has been applied correctly where a score of 0.5 from 9 complete months ago is weighted down by 0.55 to 0.275
