In [1]:
#Import needed libraries
import polars as pl
import pandas as pd
from polars import col
import yfinance as yf
from ta import add_all_ta_features
from pandas.tseries.holiday import AbstractHolidayCalendar, Holiday, nearest_workday, USMartinLutherKingJr, USPresidentsDay, GoodFriday, USMemorialDay, USLaborDay, USThanksgivingDay

### Functions

#### Tweet Resources

In [2]:
def get_moving_stats(df, cols):
    """
    This function takes in a DataFrame and a list of column names and calculates the moving average, maximum and minimum values for each column over a list of periods. 
    The results are added to the DataFrame as new columns.

    :param df: DataFrame containing data to calculate moving statistics on
    :type df: pandas.DataFrame
    :param cols: List of column names to calculate moving statistics on
    :type cols: list[str]
    :return: Modified DataFrame with new columns containing moving statistics
    :rtype: pandas.DataFrame
    """
    
    # Define a list of periods to calculate
    periods = [2, 7, 15, 30]
    
    # Loop through each column in the list
    for col in cols:
        df[f'PWD {col}'] = df[col].shift(1)
        # Loop through each period in the list
        for p in periods:
            # Calculate moving average using rolling() and mean()
            df[f"{col}_MA_{p}"] = df[col].rolling(p).mean()
            # Calculate moving max using rolling() and max()
            df[f"{col}_MAX_{p}"] = df[col].rolling(p).max()
            # Calculate moving min using rolling() and min()
            df[f"{col}_MIN_{p}"] = df[col].rolling(p).min()

    # Remove current day's input columns
    df.drop(cols, axis=1, inplace=True)

    return df # Return the modified dataframe

In [3]:
# Define a function that slices and casts a column
def slice_and_cast(col):
    """
    Slices and casts a column to a Date type.

    This function takes a column name `col` as input and returns an expression that can be used to slice the first 10 characters from the column, parse them as a date in the format "%Y-%m-%d", and cast the result to a Date type.

    The function uses the `str.slice`, `str.strptime`, and `cast` methods from the polars library to perform these operations.

    Parameters
    ----------
    col : str
        The name of the column to slice and cast.

    Returns
    -------
    pl.Expr
        An expression that can be used to slice and cast the specified column.
    """
    
    return pl.col(col).str.slice(0, 10).str.strptime(pl.Datetime, "%Y-%m-%d").cast(pl.Date)

In [4]:
def split_column(df):
    """
    Splits the 'File' column of a DataFrame into multiple columns.

    This function takes a DataFrame `df` as input and returns a new DataFrame where the 'File' column has been split into multiple columns: 'Query_Type', 'Query', and 'FileName'. The original 'File' column is retained in the resulting DataFrame.

    The function first slices characters 22 to 30 from the 'File' column using the `str.slice` method. Then it applies a lambda function to split the 'File' column on '/' and creates new columns for each part of the split using the `apply` and `alias` methods.

    Parameters
    ----------
    df : DataFrame
        The input DataFrame with a 'File' column to be split.

    Returns
    -------
    DataFrame
        A new DataFrame with the 'File' column split into multiple columns.
    """
    
    df = df.with_columns(pl.col("File").str.slice(22, 30))
    df = df.with_columns([
                           col('File'),
                           *[col('File').apply(lambda s, i=i: s.split('/')[i]).alias(col_name)
                            for i, col_name in enumerate(['Query_Type', 'Query', 'FileName'])]
                        ])
    
    return df

In [5]:
def pivot_sentiment(df):
   """
   This function takes in a DataFrame and returns a new DataFrame with aggregated sentiment and signal data.

   The input DataFrame should have columns 'Query_Type', 'Sentiment_Label', 'Signal_Label', and 'Created_at'.
   The function groups the data by 'Created_at', 'Query', and 'Query_Type' and calculates the sum of positive and negative sentiments,
   bullish and bearish signals, as well as the total daily tweets for each group.

   :param df: A DataFrame with columns 'Query_Type', 'Sentiment_Label', 'Signal_Label', and 'Created_at'
   :return: A new DataFrame with aggregated sentiment and signal data
   """
   Query_Type=df["Query_Type"][0]
   features_querry = (df.lazy().groupby(
       ["Created_at", "Query", "Query_Type"]).agg([
           (pl.col('Sentiment_Label') == "Positive"
            ).sum().alias(f'{Query_Type} Sentiment Positive'),
           (pl.col('Sentiment_Label') == "Negative"
            ).sum().alias(f'{Query_Type} Sentiment Negative'),
           (pl.col('Signal_Label') == "Bullish"
            ).sum().alias(f'{Query_Type} Signal Bullish'),
           (pl.col('Signal_Label') == "Bearish"
            ).sum().alias(f'{Query_Type} Signal Bearish'),
           (pl.col('Created_at')).count().alias(f'{Query_Type} Total Daily Tweets'),
       ]).sort("Created_at", descending=False))

   return features_querry.collect()

In [6]:
def add_company_column(df):
    """
    This function takes in a DataFrame and returns a new DataFrame with an added 'Company' column.

    The input DataFrame should have a 'Query' column with values 'AMZN', 'AAPL', 'MSFT', 'Jeff Bezos', 'Tim Cook', 'Elon Musk' or 'Satya Nadella'.
    The function maps these values to their respective company names: 'AMAZON', 'APPLE', 'TSLA' and 'MICROSOFT'.
    If the value in the 'Query' column is not one of these six, the value in the new 'Company' column will be set to 'ERROR'.

    :param df: A DataFrame with a 'Query' column
    :return: A new DataFrame with an added 'Company' column
    """
    return df.with_columns(
        pl.when((pl.col("Query") == "AMZN") | (pl.col("Query") == "Jeff Bezos"))
        .then(pl.lit("AMAZON"))
        .when((pl.col("Query") == "AAPL") | (pl.col("Query") == "Tim Cook"))
        .then(pl.lit("APPLE"))
        .when((pl.col("Query") == "MSFT") | (pl.col("Query") == "Satya Nadella"))
        .then(pl.lit("MICROSOFT"))
        .when((pl.col("Query") == "TSLA") | (pl.col("Query") == "Elon Musk"))
        .then(pl.lit("TESLA"))
        .otherwise(pl.lit("ERROR"))
        .alias("Company")
    )

In [7]:
def find_columns(df, keywords):
    """
    This function takes in a DataFrame and a list of keywords and returns a list of column names that contain any of the specified keywords.

    :param df: A DataFrame
    :param keywords: A list of strings representing the keywords to search for in the column names
    :return: A list of column names that contain any of the specified keywords
    """
    columns = []
    for col in df.columns:
        if any(keyword in col for keyword in keywords):
            columns.append(col)
    return columns

#### TS Resources

In [8]:
def shift_columns(df):
    """
    This function takes in a DataFrame and shifts all columns except "Company" down by one row. The shifted columns are renamed with a 'PWD' prefix.
    The original columns except for 'Open', 'Close', 'Movement', 'Price Change' are then droped.

    :param df: DataFrame containing data to shift
    :type df: pandas.DataFrame
    :return: Modified DataFrame with shifted columns
    :rtype: pandas.DataFrame
    """
    
    # Create a copy of the input DataFrame to avoid modifying it directly
    df = df.copy()

    # Loop through each column in the DataFrame
    for col in df.columns:
       if col !="Company":
        # Shift the column down by one row and rename it with a 'PWD' prefix
         df[f'PWD {col}'] = df[col].shift(1)
         # Skip the specified columns
         if col not in ['Open', 'Close','Adj Close', 'Movement', 'Price Change']:
            # Drop the original column from the DataFrame
            df = df.drop(col, axis=1)

    return df

In [9]:
def fill_missing_dates(df):
    """
    This function takes in a dataframe with a DatetimeIndex and checks if any dates are missing. If a date is missing, 
    the function adds a new row to the dataframe with the same values as the previous row.

    :param df: The input dataframe.
    :type df: pd.DataFrame
    :return: A new dataframe with missing dates filled.
    :rtype: pd.DataFrame
    """
    
    # Create a date range that covers all trading days between the start and end dates of the input dataframe
    all_dates = pd.date_range(start=df.index.min(), end=df.index.max(), freq='D')
    
    # Reindex the input dataframe using this date range
    df = df.reindex(all_dates)
    
    # Forward fill any missing values (i.e. copy values from previous row)
    df.fillna(method='ffill', inplace=True)
    
    return df

In [10]:
def map_trading_days(df):
    """
    This function takes in a DataFrame with a DatetimeIndex and adds a new column 'Is Trading Day' that indicates whether each date is a trading day or not. A trading day is defined as a weekday that is not a holiday according to the USTradingCalendar.

    :param df: DataFrame with a DatetimeIndex
    :type df: pandas.DataFrame
    :return: Modified DataFrame with new 'Is Trading Day' column
    :rtype: pandas.DataFrame
    """
    
    # Create an instance of the USTradingCalendar class
    cal = USTradingCalendar()
    
    # Get all holidays between the start and end dates of the input DataFrame's index
    holidays = cal.holidays(start=df.index.min(), end=df.index.max())
    
    # Create a boolean mask indicating whether each date in the index is a holiday or falls on a weekend (Saturday or Sunday)
    mask = df.index.isin(holidays) | (df.index.dayofweek == 5) | (df.index.dayofweek == 6)
    
    # Invert the mask and convert it to an integer array (1 for trading days, 0 for non-trading days)
    df['Is Trading Day'] = (~mask).astype(int)
    
    return df

In [11]:
def get_stock_data(company: str, ticker: str):
    """
    This function takes in a company name and ticker symbol as arguments and returns a dataframe with stock data and technical analysis features.

    :param company: The name of the company (e.g. "APPLE").
    :type company: str
    :param ticker: The ticker symbol for the company (e.g. "AAPL").
    :type ticker: str
    :return: A dataframe containing stock data and technical analysis features for the specified company.
    :rtype: pd.DataFrame

    The returned dataframe contains the following columns:
        - Company: The name of the company.
        - Price Change: The change in adjusted closing price from the previous day.
        - Movement: Whether the price went "Up" or "Down" from the previous day.

    In addition to these columns, the dataframe also contains several technical analysis features calculated using 
    the `add_all_ta_features` function from the `ta` library. 
    """
    
    df = yf.download(ticker,
                     start="2018-01-01",
                     end="2021-12-31")
    
    df["Company"] = company
    
    df["Price Change"] = df["Adj Close"] - df["Adj Close"].shift(1)
   
    # Apply a function to assign Up or Down based on Price Change
    df["Movement"] = df["Price Change"].apply(lambda x: "Up" if x > 0 else "Down")
   
     # Add all technical analysis features
    df = add_all_ta_features(
         df, open="Open", high="High", low="Low", close="Close", volume="Volume")
     
    # Fill missing dates by copying values from previous row
    df = fill_missing_dates(df)
     
    # Shift columns one day behind to avoid leakage 
    df = shift_columns(df)
     
    # Add trading days
    df = map_trading_days(df)
    
    return df

In [12]:
class USTradingCalendar(AbstractHolidayCalendar):
    """
    This class defines a custom trading calendar for the United States. It inherits from the AbstractHolidayCalendar class and specifies a list of holiday rules.

    The holiday rules include New Year's Day, Martin Luther King Jr. Day, Presidents' Day, Good Friday, Memorial Day, Independence Day, Labor Day, Thanksgiving Day and Christmas. The observance of holidays that fall on a weekend is determined by the nearest_workday function.
    """
    
    # Define a list of holiday rules
    rules = [
        Holiday('NewYearsDay', month=1, day=1, observance=nearest_workday),
        USMartinLutherKingJr,
        USPresidentsDay,
        GoodFriday,
        USMemorialDay,
        Holiday('USIndependenceDay', month=7, day=4, observance=nearest_workday),
        USLaborDay,
        USThanksgivingDay,
        Holiday('Christmas', month=12, day=25, observance=nearest_workday)
    ]


In [13]:
def extract_date_features(df, date_column):
    """
    Extracts day of week, day of month, day of year, quarter, week of year, and day of quarter features from a date column of a Pandas DataFrame.

    Parameters:
    df (DataFrame): The input DataFrame.
    date_column (str): The name of the date column in the input DataFrame.

    Returns:
    DataFrame: The input DataFrame with six new columns for the day of the week, day of the month, day of the year, quarter, week of year, and day of quarter.
    """
    df['day_of_week'] = pd.to_datetime(df[date_column]).dt.dayofweek
    df['day_of_month'] = pd.to_datetime(df[date_column]).dt.day
    df['day_of_year'] = pd.to_datetime(df[date_column]).dt.dayofyear
    df['month'] = pd.to_datetime(df[date_column]).dt.month
    df['quarter'] = pd.to_datetime(df[date_column]).dt.quarter
    df['year'] = pd.to_datetime(df[date_column]).dt.year
    df['week_of_year'] = pd.to_datetime(df[date_column]).dt.weekofyear
    df['day_of_quarter'] = df['day_of_year'] - pd.to_datetime(df[date_column]).dt.to_period('Q').dt.start_time.dt.dayofyear + 1
    return df

### Tweet Features

In [14]:
# Load the Tweet Datasets
tickers_df=pl.read_csv('../Data/ScoredDf/Tickers.csv', sep='~', encoding='utf-8')
ceos_df=pl.read_csv('../Data/ScoredDf/Ceos.csv', sep='~', encoding='utf-8')

In [15]:
# Remove Spaces Column names and replace them with _
tickers_df.columns = list(map(lambda x: x.replace(" ", "_"), tickers_df.columns))
ceos_df.columns = list(map(lambda x: x.replace(" ", "_"), ceos_df.columns))

In [16]:
# Apply the slice function to the date column to create the dates
tickers_df = tickers_df.with_columns(slice_and_cast("Created_at"))
ceos_df = ceos_df.with_columns(slice_and_cast("Created_at"))

In [17]:
# Apply the split column to create the query columns
tickers_df = split_column(tickers_df)
ceos_df = split_column(ceos_df)

In [18]:
# Create Pivots per Day
tickers_sentiment_features = pivot_sentiment(tickers_df)
ceos_sentiment_features = pivot_sentiment(ceos_df)

In [19]:
#Create Company Columns
tickers_sentiment_features = add_company_column(tickers_sentiment_features)
ceos_sentiment_features = add_company_column(ceos_sentiment_features)

In [20]:
# Find Columns for moving stats and call the function to create the features
keywords = ["Sentiment", "Signal", "Total Daily"]

cols_tickers = find_columns(tickers_sentiment_features, keywords)
tickers_sentiment_features = get_moving_stats(tickers_sentiment_features.to_pandas(), cols_tickers)

cols_ceos = find_columns(ceos_sentiment_features, keywords)
ceos_sentiment_features = get_moving_stats(ceos_sentiment_features.to_pandas(), cols_ceos)

In [21]:
# Check if Indexes are the same size
idx1 = pd.Index(tickers_sentiment_features[["Created_at", "Company"]])
idx2 = pd.Index(ceos_sentiment_features[["Created_at", "Company"]])
idx1.difference(idx2)

Index([(2019-12-08 00:00:00, 'AMAZON')], dtype='object')

In [22]:
#Merge the two datasets
twitter_features=tickers_sentiment_features.merge(ceos_sentiment_features, on=["Created_at", "Company"], how="left")

In [23]:
# Fill missing values
twitter_features = twitter_features.fillna(method="backfill")

In [24]:
#Drop the non Query Columns and rename the "Created_at" column to "Date"
cols = [c for c in twitter_features.columns if "Query" not in c]
twitter_features = twitter_features[cols].rename(columns={'Created_at': 'Date'})

### Techincal Analysis Features

In [25]:
#Get the stock and TS data 
aapl_ta=get_stock_data("APPLE", "AAPL")
msft_ta=get_stock_data("MICROSOFT", "MSFT")
amzn_ta=get_stock_data("AMAZON", "AMZN")
tsla_ta=get_stock_data("TESLA", "TSLA")

[*********************100%***********************]  1 of 1 completed


  dip[idx] = 100 * (self._dip[idx] / value)
  din[idx] = 100 * (self._din[idx] / value)


[*********************100%***********************]  1 of 1 completed


  dip[idx] = 100 * (self._dip[idx] / value)
  din[idx] = 100 * (self._din[idx] / value)


[*********************100%***********************]  1 of 1 completed


  dip[idx] = 100 * (self._dip[idx] / value)
  din[idx] = 100 * (self._din[idx] / value)


[*********************100%***********************]  1 of 1 completed


  dip[idx] = 100 * (self._dip[idx] / value)
  din[idx] = 100 * (self._din[idx] / value)


In [26]:
# Combine the dataframes and remove index and rename it to Date for merging with Tweet Features
technical_df = pd.concat([amzn_ta, msft_ta, aapl_ta, tsla_ta]).reset_index().rename(columns={'index': 'Date'})

In [27]:
# Keep only the dates after 2019-01-01
technical_df = technical_df[technical_df['Date'] >= '2019-01-01']

In [28]:
# Extrct Date Features
technical_df = extract_date_features(technical_df,"Date")

  df['week_of_year'] = pd.to_datetime(df[date_column]).dt.weekofyear


### Final DataFrame Creation

In [29]:
# Merge TS and Tweet Features
final_df=technical_df.merge(twitter_features, on=["Date", "Company"], how="left")

In [30]:
# Add Movement Collumn to the Tweet DF
movementDf=technical_df[["Date", "day_of_week","day_of_month","day_of_year","month","year", "Company","Adj Close", "Movement"]]
tweet_df=twitter_features.merge(movementDf, on=["Date", "Company"], how="left")

In [31]:
#Save to csv to enable modeling
tweet_df.to_csv("../Data/FinalDF/TweetDF.csv",encoding="utf-8",sep="~", index=False)
technical_df.to_csv("../Data/FinalDF/TaDF.csv",encoding="utf-8",sep="~", index=False)
final_df.to_csv("../Data/FinalDF/FinalDF.csv",encoding="utf-8",sep="~", index=False)