# Counting the indicators to be used in the evaluation of companies

In [0]:
%run "../utils/mount_configuration"

In [0]:
%run "../utils/incremental_load"

In [0]:
tickers_financial_df = spark.read \
    .format('delta') \
    .load(f'{processed_folder_path}/tickers_financial')

In [0]:
tickers_price_df = spark.read \
    .format('delta') \
    .load(f'{processed_folder_path}/tickers_price') \
    .select('ticker', 'close_price', 'date')

In [0]:
tickers_details_df = spark.read \
    .format('delta') \
    .load(f'{processed_folder_path}/tickers_details') \
    .drop('market_cap', 'total_employees')

In [0]:
tickers_dividend_df = spark.read \
    .format('delta') \
    .load(f'{processed_folder_path}/tickers_dividend') \
    .select("ticker", "pay_date") 

In [0]:
from pyspark.sql.functions import year, desc, lead, col, when, sum, max
from pyspark.sql import Window

window_spec = Window.partitionBy("ticker").orderBy(desc("year"))
count_window = Window.partitionBy("ticker")

# Count continuous dividend years for each company
tickers_dividend_df = tickers_dividend_df \
    .withColumn("year", year("pay_date")) \
    .select("ticker", "year") \
    .distinct() \
    .withColumn("next_year", lead("year", 1).over(window_spec)) \
    .withColumn("is_continuous", when(col("year") - col("next_year") == 1, 1).otherwise(0)) \
    .withColumn("continuous_sum", sum("is_continuous").over(count_window) + 1)

# Extract final result for each company
tickers_cont_div_years_df = tickers_dividend_df \
    .groupBy("ticker") \
    .agg(max("continuous_sum").alias("continuous_dividend_years"))

# display(tickers_cont_div_years_df)


In [0]:
from pyspark.sql.functions import lag, round, isnan, row_number, abs


window_spec = Window.partitionBy("cagr_ticker").orderBy("fiscal_year")

# Filter data for fiscal year "FY", add lag and calculate CAGR
net_income_3_year_cagr = tickers_financial_df \
    .filter(col("fiscal_period") == "FY") \
    .withColumnRenamed("ticker", "cagr_ticker") \
    .withColumn("net_income_lag_3y", lag("net_income_loss", 2).over(window_spec)) \
    .withColumn(
        "net_income_3_year_cagr",
        ((col("net_income_loss") - col("net_income_lag_3y")) / 3) / abs(col("net_income_lag_3y"))) \
    .withColumn("net_income_3_year_cagr_pct", round(col("net_income_3_year_cagr") * 100, 2)) \
    .filter(col("net_income_3_year_cagr_pct").isNotNull()) \
    .select("cagr_ticker", "fiscal_year", "net_income_loss", "net_income_lag_3y", "net_income_3_year_cagr_pct")


latest_window = Window.partitionBy("cagr_ticker").orderBy(col("fiscal_year").desc())

# Add row number and select only the first (latest) one
latest_net_income_cagr = net_income_3_year_cagr \
    .withColumn("row_num", row_number().over(latest_window)) \
    .filter(col("row_num") == 1) \
    .select("cagr_ticker", "net_income_3_year_cagr_pct")

# display(net_income_3_year_cagr)
# display(latest_net_income_cagr)

In [0]:
from pyspark.sql.functions import collect_set, array_contains


window_spec = Window.partitionBy("ticker").orderBy(col("start_date").desc())

# Determining which ones are valid for quarterly fiscal periods
valid_tickers_for_quarters = tickers_financial_df \
    .filter(col("fiscal_period").like("Q%")) \
    .withColumn("row_num", row_number().over(window_spec)) \
    .filter((col("row_num") <= 4)) \
    .groupBy("ticker") \
    .agg(collect_set("fiscal_period").alias("quarters")) \
    .filter(array_contains(col("quarters"), "Q1") &
        array_contains(col("quarters"), "Q2") &
        array_contains(col("quarters"), "Q3") &
        array_contains(col("quarters"), "Q4"))

# display(valid_tickers_for_quarters)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, desc

window_spec = Window.partitionBy("ticker").orderBy(desc("start_date"))

# Filter 'FY' periods, exclude tickers with all 4 quarters (Q1-Q4)
tickers_financial_fy_df = tickers_financial_df \
      .filter(col("fiscal_period") == "FY") \
      .join(valid_tickers_for_quarters, "ticker", "left_anti") \
      .withColumn("row_num", row_number().over(window_spec)) \
      .filter(col("row_num") == 1) \
      .withColumn("net_income_loss_4q_sum", col("net_income_loss")) \
      .withColumnRenamed("interest_expense_operating", "interest_expense_operating_4q_sum") \
      .withColumnRenamed("income_tax_expense_benefit", "income_tax_expense_benefit_4q_sum") \
      .withColumnRenamed("revenues", "revenues_4q_sum") \
      .select('ticker', 'net_income_loss', 'net_income_loss_4q_sum', 'liabilities', 
            'equity', 'assets', 'current_assets', 'current_liabilities', 'long_term_debt', 
            'interest_expense_operating_4q_sum', 'income_tax_expense_benefit_4q_sum',
            'depreciation_and_amortization', 'revenues_4q_sum', 'fiscal_period',
            'fiscal_year', 'start_date', 'end_date')                                              

# display(tickers_financial_fy_df)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, sum, desc, year, current_date

window_spec = Window.partitionBy('ticker').orderBy('end_date').rowsBetween(-3, 0)

last_4_quarters_window_spec = Window.partitionBy('ticker').orderBy(desc('end_date'))

# Calculate the sum of 4 quarters' data, as it is typically done for annual calculations.
tickers_financial_quarter_df = tickers_financial_df \
     .filter("fiscal_period != 'FY' ") \
     .join(valid_tickers_for_quarters, "ticker", "inner") \
     .withColumn('net_income_loss_4q_sum', sum('net_income_loss').over(window_spec)) \
     .withColumn('interest_expense_operating_4q_sum', sum('interest_expense_operating').over(window_spec)) \
     .withColumn('income_tax_expense_benefit_4q_sum', sum('income_tax_expense_benefit').over(window_spec)) \
     .withColumn('revenues_4q_sum', sum('revenues').over(window_spec)) \
     .withColumn('row_num', row_number().over(last_4_quarters_window_spec)) \
     .filter(col("row_num") <= 4) \
     .select('ticker', 'net_income_loss', 'net_income_loss_4q_sum', 'liabilities', 
             'equity', 'assets', 'current_assets', 'current_liabilities', 'long_term_debt', 
             'interest_expense_operating_4q_sum', 'income_tax_expense_benefit_4q_sum',
             'depreciation_and_amortization', 'revenues_4q_sum', 'fiscal_period',
             'fiscal_year', 'start_date', 'end_date')

# display(tickers_financial_quarter_df)
                                                   

In [0]:
# Combine quarterly and fiscal year data into a single DataFrame.
tickers_financial_merged_df = tickers_financial_quarter_df \
    .union(tickers_financial_fy_df)

In [0]:
# Join the financial data with the latest net income CAGR values based on ticker
merged_net_income_cagr_df = tickers_financial_merged_df \
      .join(latest_net_income_cagr,
           (tickers_financial_merged_df.ticker == latest_net_income_cagr.cagr_ticker),
           "left") \
      .drop(latest_net_income_cagr.cagr_ticker)

# display(merged_net_income_cagr_df)

In [0]:
# Add dividend data to the dataframe.
merged_dividends_df = merged_net_income_cagr_df \
    .join(tickers_cont_div_years_df,
         (merged_net_income_cagr_df.ticker == tickers_cont_div_years_df.ticker),
         'left') \
    .drop(tickers_cont_div_years_df.ticker)

# display(merged_dividends_df)

In [0]:
from pyspark.sql.functions import col, to_date

details_df = tickers_details_df.orderBy('date')
merged_dividends_df = merged_dividends_df.orderBy('start_date')

# Add pre-processed financial data (including dividend information) to the dataset.
merged_details_df = details_df \
    .join(merged_dividends_df, 
         (details_df.ticker == merged_dividends_df.ticker) & (details_df.date >= merged_dividends_df.start_date) & 
         (details_df.date <= merged_dividends_df.end_date),
         'inner')

# Calculate various financial ratios and metrics (EPS, BVPS, debt rate, ROE, etc.) and add them to the dataset
merged_details_df = merged_details_df \
    .withColumn("EPS", col("net_income_loss_4q_sum") / col("weighted_shares_outstanding")) \
    .withColumn("BVPS", col("equity") / col("weighted_shares_outstanding")) \
    .withColumn("debt_rate", col("liabilities") / col("assets")) \
    .withColumn("D/E", col("long_term_debt") / col("equity")) \
    .withColumn("current_liquidity_rate", col("current_assets") / col("current_liabilities")) \
    .withColumn("ROE", col("net_income_loss_4q_sum") / col("equity")) \
    .withColumn("ROA", col("net_income_loss_4q_sum") / col("assets")) \
    .withColumn("EBIDTA", 
        col("net_income_loss_4q_sum") + 
        col("interest_expense_operating_4q_sum") + 
        col("income_tax_expense_benefit_4q_sum") + 
        col("depreciation_and_amortization")) \
    .withColumn("net_profit_margin", col("net_income_loss_4q_sum") / col("revenues_4q_sum")) \
    .withColumn("debt_EBIDTA", col("long_term_debt") / col("EBIDTA")) \
    .select(details_df["ticker"], "name", "type", "primary_exchange", "weighted_shares_outstanding", "EPS", 
            "BVPS", "debt_rate", "revenues_4q_sum", "D/E", "current_liquidity_rate", "ROE", "ROA", "EBIDTA", "net_profit_margin", 
            "long_term_debt", "debt_EBIDTA", "continuous_dividend_years", "net_income_3_year_cagr_pct", "date", "fiscal_period", 
            "fiscal_year", "start_date", "end_date")

# display(merged_details_df)

In [0]:
tickers_price_df = tickers_price_df.orderBy('date')
merged_details_df = merged_details_df.orderBy('start_date')

# Join the price data with the whole dataset
merged_price_df = tickers_price_df.join(merged_details_df, 
                           (tickers_price_df.ticker == merged_details_df.ticker) & (tickers_price_df.date == merged_details_df.date),    
                           'inner') \
                           .drop(tickers_price_df.ticker, tickers_price_df.date) \
                           .orderBy(merged_details_df['date'])
                           

# display(merged_price_df)

In [0]:
# Calculate and add various financial ratios and metrics (market cap, P/S, P/E, EV, etc.) to the dataset.
final_df = merged_price_df.withColumn("market_cap", col("weighted_shares_outstanding") * col("close_price")) \
                          .withColumn("P/S", col("close_price") / (col("revenues_4q_sum") / col("weighted_shares_outstanding")))\
                          .withColumn("P/E", col("close_price") / col("EPS")) \
                          .withColumn("P/B", col("close_price") / col("BVPS")) \
                          .withColumn("EV", col("market_cap") + col("long_term_debt")) \
                          .withColumn("EV/EBITDA", col("EV") / col("EBIDTA")) \
                          .withColumn("debt_to_market_cap", col("long_term_debt") / col("market_cap")) \
                          .select("ticker", "name", "type", "primary_exchange", "market_cap", "close_price", 
                                  "EPS", "BVPS",  "P/S", "P/E", "P/B", "D/E", "ROE", "ROA", "debt_rate", "debt_EBIDTA",
                                  "debt_to_market_cap", "current_liquidity_rate", "EV", "EBIDTA", "EV/EBITDA", "net_profit_margin", 
                                  "continuous_dividend_years", "net_income_3_year_cagr_pct", "date", "fiscal_period", "fiscal_year", "start_date", "end_date")

# display(final_df)


In [0]:
mergeCondition = """target.ticker = source.ticker AND 
                    target.date = source.date"""

In [0]:
incrementalLoadDelta(input_df=final_df, databaseName="engineering_presentation", tableName="tickers_indicators", 
                     folderPath=presentation_folder_path, partitionField="ticker",mergeCondition=mergeCondition)

In [0]:
%sql
SELECT * FROM engineering_presentation.tickers_indicators;