In [0]:
!pip install kagglehub

In [0]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("stefanoleone992/mutual-funds-and-etfs")

print("Path to dataset files:", path)

In [0]:
import shutil
import os
import glob
from pathlib import Path

# Local path where kagglehub downloaded files
local_path = Path("/root/.cache/kagglehub/datasets/stefanoleone992/mutual-funds-and-etfs/versions/4")

# Target path in DBFS
dbfs_path = "dbfs:/tmp/bronze/mutual-funds-and-etfs"

# Copy each file to DBFS
for file in glob.glob(str(local_path / "*")):
    file_name = os.path.basename(file)
    dbutils.fs.cp(f"file:{file}", f"{dbfs_path}/{file_name}")


In [0]:
# List local
# os.listdir("/root/.cache/kagglehub/datasets/whenamancodes/fraud-detection/versions/1")

# List DBFS
display(dbutils.fs.ls("dbfs:/tmp/bronze/mutual-funds-and-etfs/"))
# dbutils.fs.rm("dbfs:/_delta_log", recurse=True)


# Context

ETFs represent a cheap alternative to Mutual Funds and they are growing fast in the last years due to their passive approach (and the consequential lower fees).
This dataset includes the financial information collected from Yahoo Finance and includes all U.S. Mutual Funds and along with their historical prices.
Updated version relates to the November 2021 financial values.

Content
The file contains 23,783 Mutual Funds and 2,310 ETFs with:

General fund aspects (e.g. total_net_assets, fund family, inception date, etc.)
Portfolio indicators (e.g. cash, stocks, bonds, sectors, etc.)
Historical yearly and quarterly returns (e.g. year_to_date, 1-year, 3-years, etc.)
Financial ratios (price/earning, Treynor and Sharpe ratios, alpha, and beta)
ESG scores


In [0]:
def ingest_data():
    # Downloading the dataset from kaggle https://www.kaggle.com/datasets/stefanoleone992/mutual-funds-and-etfs/data
    path = kagglehub.dataset_download("stefanoleone992/mutual-funds-and-etfs")

    #Moving dataset from root to DBFS
    local_path = Path(path) #where the file is stored
    
    dbfs_path = "dbfs:/tmp/bronze/mutual-funds-and-etfs" # Target path in DBFS

    for file in glob.glob(str(local_path / "*")): # Copy each file to DBFS
        file_name = os.path.basename(file)
        dbutils.fs.cp(f"file:{file}", f"{dbfs_path}/{file_name}")
    

In [0]:
from concurrent.futures import ThreadPoolExecutor

paths = {
    "fund_a_e": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - A-E.csv",
    "fund_f_k": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - F-K.csv",
    "fund_l_p": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - L-P.csv",
    "d_q_z":   "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - Q-Z.csv",
    "etf_prices": "dbfs:/tmp/bronze/mutual-funds-and-etfs/ETF prices.csv",
    "etfs": "dbfs:/tmp/bronze/mutual-funds-and-etfs/ETFs.csv",
    "funds": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFunds.csv"
}

#The CSV file contains all historical prices available on YahooFinance for the 2,310 scraped ETFs
etf_prices = spark.read.csv(paths['etf_prices'], header=True, inferSchema=True)
del paths['etf_prices']

# The CSV file contains data for 2,310 ETFs with 142 different attributes
etfs = spark.read.csv(paths['etfs'], header=True, inferSchema=True)
del paths['etfs']

# The CSV file contains data for 23,783 Mutual Funds with 298 different attributes
funds =  spark.read.csv(paths['funds'], header=True, inferSchema=True)
del paths['funds']

# These datasets are a list of funds with their fund code, date and price 
# The CSV file contains all historical prices available on YahooFinance for the scraped Mutual Funds with symbol starting with letters from A to E
def load_csv(name, path):
    return name, spark.read.csv(path, header=True, inferSchema=True)

with ThreadPoolExecutor() as executor:
    results = executor.map(lambda kv: load_csv(*kv), paths.items())

# Collect results into named variables
csv_dfs = dict(results)
fund_prices = (
    csv_dfs["fund_a_e"]
    .unionByName(csv_dfs["fund_f_k"])
    .unionByName(csv_dfs["fund_l_p"])
    .unionByName(csv_dfs["d_q_z"])
)



# Datasets Discrimination

In [0]:
display(etc_prices)

In [0]:
display(etc_prices.summary())

# FINE!

In [0]:
display(etfs)

In [0]:
display(etfs.summary())

In [0]:
display(fund_prices) # THIS IS FINE! 

In [0]:
display(fund_prices.summary())

In [0]:
display(funds)

In [0]:
display(funds.summary())


# Data Cleaning

This dataset has many tables, and multiple columns. There's too many columns to handle the data cleaning process at once due to multiple possible of occurring. In order to solve this problem, we can apply the Medallion Architecture and break it down our problem. 

The Medallion Architecture relies on 3 layers, where the first one receives the loaded raw data, and this data is processed for the next layer (silver) already formally treated (thus not having nulls, and appropriate data type treatment). This layer is already ML-ready and can be used for further algorithms. 

At first we'd have to create a categorical table which summarizes most information of our tables, and then split these big tables into smaller ones and domain-specify, similar to Star-Schema design where we have some dimension tables connected to fact tables. 

This fact table could be the one in the "golden" layer equivalent, and in which is the table with the output of the model and a second golden-layer table would be a recreation of the raw data ingested but already properly treated.


In [0]:
display(fund_prices.head(10)) #Seems already ok for silver

In [0]:
display(fund_prices.summary())

In [0]:
display(etf_prices.head(10)) #They seem already ok

In [0]:
from pyspark.sql.functions import col, lit, current_date
from functools import reduce
from pyspark.sql import DataFrame
import time

# Define function to clean, cast, add date, and write partitioned Parquet
def clean_and_store(df: DataFrame, name: str, id_columns: list, output_path: str):
    print(f"Starting processing for group: {name}")
    start_time = time.time()

    # Remove rows where all non-ID columns are null
    cols_to_check = [c for c in df.columns if c not in id_columns]
    df = df.dropna(subset=cols_to_check, how="all")

    # Remove rows with string values in non-ID columns
    for c in cols_to_check:
        df = df.filter(~col(c).rlike("[^0-9.,-]"))

    # Convert non-ID columns to DoubleType
    for c in cols_to_check:
        df = df.withColumn(c, F.regexp_replace(F.col(c), ',', ''))
        df = df.withColumn(c, F.when(F.col(c).isin("NULL", ""), None).otherwise(F.col(c).cast(DoubleType())))

    non_double_columns = [f.name for f in df.schema.fields if not isinstance(f.dataType, DoubleType) and f.name not in id_columns]

    for column in non_double_columns:
        desc_vals = df.select(column).distinct().orderBy(F.col(column).desc()).limit(1000).rdd.map(lambda r: str(r[0])).collect()
        asc_vals = df.select(column).distinct().orderBy(F.col(column).asc()).limit(1000).rdd.map(lambda r: str(r[0])).collect()

        def is_mixed(vals):
            return any(not v.replace('.', '', 1).replace('-', '', 1).isdigit() for v in vals)

        if is_mixed(desc_vals) or is_mixed(asc_vals):
            # Separate rows with string-like values
            string_rows = df.filter(F.col(column).rlike("[^0-9.,-]"))
            fund_prices = df.filter(~F.col(column).rlike("[^0-9.,-]"))
            fund_prices = df.withColumn(column, F.col(column).cast(DoubleType()))

            print(f"\nColumn '{column}' contained mixed values. String rows moved to separate DataFrame.")
            string_rows.select(column).distinct().show()
    

    # Add insertion_date column to fit with good practices of data engineering
    df = df.withColumn("insertion_date", current_date())

    # Repartition for parallel writing 
    # it boots write parallelism to DBFS and improves scalability during .write()
    df = df.repartition(100)

    # Cache if reused, it stores the dataframe in memory after transformations
    # it prevents recomputation of previous steps and speeds up multiple actions
    df.cache()

    # Write to DBFS partitioned by insertion_date
    try: 
        output_dir = f"{output_path}/{name}/"
        df.write.mode("overwrite").partitionBy("insertion_date").parquet(output_dir)
    except Exception as e:
        print(f'Error Processing group {name}, Exception {e}')

    df.unpersist()

    end_time = time.time()
    print(f"Finished processing group: {name} in {end_time - start_time:.2f} seconds. Saved to {output_dir}")



In [0]:

etfs = etfs.repartition(200).cache()
etfs.count()  # to trigger caching immediately


In [0]:
# Creating CATALOG table 

# Identifying key columns for the 2 biggest dataframes: funds and etfs

from pyspark.sql.functions import col
from functools import reduce

# Define column groups by pattern
join_keys = ['fund_symbol', 'exchange_code']

# Group 1: Identification
id_columns = join_keys + [
    'quote_type', 'region', 'fund_short_name', 'fund_long_name', 'currency',
    'fund_category', 'fund_family', 'exchange_name', 'exchange_timezone'
]

# Group 2: Volume and Assets
volume_columns = join_keys + [
    'avg_vol_3month', 'avg_vol_10day', 'total_net_assets'
]

# Group 3: Moving Averages and High/Low Stats
trend_columns = join_keys + [
    'day50_moving_average', 'day200_moving_average', 'week52_high_low_change',
    'week52_high_low_change_perc', 'week52_high', 'week52_high_change',
    'week52_high_change_perc', 'week52_low', 'week52_low_change', 'week52_low_change_perc'
]

# Group 4: Strategy & Yield
strategy_columns = join_keys + [
    'investment_strategy', 'fund_yield', 'inception_date', 'annual_holdings_turnover',
    'investment_type', 'size_type'
]

# Group 5: Expense Ratios
expense_columns = join_keys + [
    'fund_annual_report_net_expense_ratio', 'category_annual_report_net_expense_ratio'
]

# Group 6: Asset Allocation
asset_columns = join_keys + [
    'asset_stocks', 'asset_bonds'
]

# Group 7: Sectors
sector_columns = join_keys + [
    'fund_sector_basic_materials', 'fund_sector_communication_services',
    'fund_sector_consumer_cyclical', 'fund_sector_consumer_defensive',
    'fund_sector_energy', 'fund_sector_financial_services', 'fund_sector_healthcare',
    'fund_sector_industrials', 'fund_sector_real_estate', 'fund_sector_technology',
    'fund_sector_utilities'
]

# Group 8: Valuation Ratios
valuation_columns = join_keys + [
    'fund_price_book_ratio', 'fund_price_cashflow_ratio', 'fund_price_earning_ratio',
    'fund_price_sales_ratio'
]

# Group 9: Bond Ratings
bond_columns = join_keys + [
    'fund_bond_maturity', 'fund_bond_duration', 'fund_bonds_us_government', 'fund_bonds_aaa',
    'fund_bonds_aa', 'fund_bonds_a', 'fund_bonds_bbb', 'fund_bonds_bb', 'fund_bonds_b',
    'fund_bonds_below_b', 'fund_bonds_others'
]

# Group 10: Holdings
holding_columns = join_keys + [
    'top10_holdings', 'top10_holdings_total_assets'
]

# Group 11: Returns - Periods
return_columns = join_keys + [
    'returns_as_of_date', 'fund_return_ytd', 'category_return_ytd', 'fund_return_1month',
    'category_return_1month', 'fund_return_3months', 'category_return_3months',
    'fund_return_1year', 'category_return_1year', 'fund_return_3years', 'category_return_3years',
    'fund_return_5years', 'category_return_5years', 'fund_return_10years', 'category_return_10years',
    'years_up', 'years_down'
]

# Group 12: Returns - Annual
annual_return_columns = join_keys + [
    f'fund_return_{y}' for y in range(2020, 1999, -1)
] + [
    f'category_return_{y}' for y in range(2020, 1999, -1)
]

# Group 13: Risk Metrics
risk_columns = join_keys + [
    'fund_alpha_3years', 'fund_beta_3years', 'fund_mean_annual_return_3years',
    'fund_r_squared_3years', 'fund_stdev_3years', 'fund_sharpe_ratio_3years',
    'fund_treynor_ratio_3years', 'fund_alpha_5years', 'fund_beta_5years',
    'fund_mean_annual_return_5years', 'fund_r_squared_5years', 'fund_stdev_5years',
    'fund_sharpe_ratio_5years', 'fund_treynor_ratio_5years', 'fund_alpha_10years',
    'fund_beta_10years', 'fund_mean_annual_return_10years', 'fund_r_squared_10years',
    'fund_stdev_10years', 'fund_sharpe_ratio_10years', 'fund_treynor_ratio_10years'
]

# Group DataFrames
group_dfs_etfs = {
    "identification": etfs.select(*id_columns),
    "volume": etfs.select(*volume_columns),
    "trend": etfs.select(*trend_columns),
    "strategy": etfs.select(*strategy_columns),
    "expenses": etfs.select(*expense_columns),
    "assets": etfs.select(*asset_columns),
    "sectors": etfs.select(*sector_columns),
    "valuation": etfs.select(*valuation_columns),
    "bonds": etfs.select(*bond_columns),
    "holdings": etfs.select(*holding_columns),
    "returns_period": etfs.select(*return_columns),
    "returns_annual": etfs.select(*annual_return_columns),
    "risk": etfs.select(*risk_columns)
}

print(f"Created {len(group_dfs_etfs)} grouped DataFrames by column pattern.")


In [0]:
# Apply function to each DataFrame
dbfs_output = "dbfs:/tmp/silver/etfs"

# for name, df in group_dfs_etfs.items():
#     clean_and_store(df, name, id_columns[:2], dbfs_output)

with ThreadPoolExecutor() as executor:
    futures = [
        executor.submit(clean_and_store, df, name, id_columns[:2], dbfs_output)
        for name, df in group_dfs_etfs.items()
    ]
    for future in futures:
        future.result()


In [0]:
etfs.unpersist()


In [0]:
funds = funds.repartition(200).cache()
funds.count()  # to trigger caching immediately


In [0]:
from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import DataFrame

# Join keys
join_keys = ['fund_symbol', 'exchange_code']

# Define relevant groups based on pattern matching or logical grouping
id_columns = join_keys + [
    'quote_type', 'region', 'fund_short_name', 'fund_long_name', 'currency',
    'initial_investment', 'subsequent_investment', 'fund_category', 'fund_family',
    'exchange_name', 'exchange_timezone', 'management_name', 'management_bio',
    'management_start_date'
]

volume_columns = join_keys + ['total_net_assets', 'year_to_date_return']

trend_columns = join_keys + [
    'day50_moving_average', 'day200_moving_average', 'week52_high_low_change',
    'week52_high_low_change_perc', 'week52_high', 'week52_high_change',
    'week52_high_change_perc', 'week52_low', 'week52_low_change', 'week52_low_change_perc'
]

# DISCARD
strategy_columns = join_keys + [
    'investment_strategy', 'fund_yield', 'morningstar_overall_rating', 'morningstar_risk_rating',
    'inception_date', 'last_dividend', 'last_cap_gain', 'annual_holdings_turnover',
    'investment_type', 'size_type'
]

expense_columns = join_keys + [
    'fund_annual_report_net_expense_ratio', 'category_annual_report_net_expense_ratio',
    'fund_prospectus_net_expense_ratio', 'fund_prospectus_gross_expense_ratio', 'fund_max_12b1_fee',
    'fund_max_front_end_sales_load', 'category_max_front_end_sales_load', 'fund_max_deferred_sales_load',
    'category_max_deferred_sales_load', 'fund_year3_expense_projection', 'fund_year5_expense_projection',
    'fund_year10_expense_projection'
]

asset_columns = join_keys + [
    'asset_cash', 'asset_stocks', 'asset_bonds', 'asset_others', 'asset_preferred', 'asset_convertible'
]

sector_columns = join_keys + [
    'fund_sector_basic_materials', 'fund_sector_communication_services', 'fund_sector_consumer_cyclical',
    'fund_sector_consumer_defensive', 'fund_sector_energy', 'fund_sector_financial_services',
    'fund_sector_healthcare', 'fund_sector_industrials', 'fund_sector_real_estate',
    'fund_sector_technology', 'fund_sector_utilities'
]

valuation_columns = join_keys + [
    'fund_price_book_ratio', 'category_price_book_ratio', 'fund_price_cashflow_ratio',
    'category_price_cashflow_ratio', 'fund_price_earning_ratio', 'category_price_earning_ratio',
    'fund_price_sales_ratio', 'category_price_sales_ratio', 'fund_median_market_cap',
    'category_median_market_cap', 'fund_year3_earnings_growth', 'category_year3_earnings_growth'
]

bond_columns = join_keys + [
    'fund_bond_maturity', 'category_bond_maturity', 'fund_bond_duration', 'category_bond_duration',
    'fund_bonds_us_government', 'fund_bonds_aaa', 'fund_bonds_aa', 'fund_bonds_a',
    'fund_bonds_bbb', 'fund_bonds_bb', 'fund_bonds_b', 'fund_bonds_below_b', 'fund_bonds_others'
]

holding_columns = join_keys + [
    'top10_holdings', 'top10_holdings_total_assets'
]

return_columns = join_keys + [
    'morningstar_return_rating', 'returns_as_of_date', 'fund_return_ytd', 'category_return_ytd',
    'fund_return_1month', 'category_return_1month', 'fund_return_3months', 'category_return_3months',
    'fund_return_1year', 'category_return_1year', 'fund_return_3years', 'category_return_3years',
    'fund_return_5years', 'category_return_5years', 'fund_return_10years', 'category_return_10years',
    'fund_return_last_bull_market', 'category_return_last_bull_market',
    'fund_return_last_bear_market', 'category_return_last_bear_market', 'years_up', 'years_down'
]

annual_return_columns = join_keys + [
    f'fund_return_{y}' for y in range(2020, 1999, -1)
] + [
    f'category_return_{y}' for y in range(2020, 1999, -1)
]

quarter_return_columns = join_keys + [
    f'fund_return_{y}_q{q}' for y in range(2021, 1999, -1) for q in range(4, 0, -1)
]
quarter_return_columns.remove('fund_return_2021_q4')

risk_columns = join_keys + [
    'fund_alpha_3years', 'fund_beta_3years', 'fund_mean_annual_return_3years', 'fund_r_squared_3years',
    'fund_stdev_3years', 'fund_sharpe_ratio_3years', 'fund_treynor_ratio_3years',
    'fund_alpha_5years', 'fund_beta_5years', 'fund_mean_annual_return_5years', 'fund_r_squared_5years',
    'fund_stdev_5years', 'fund_sharpe_ratio_5years', 'fund_treynor_ratio_5years',
    'fund_alpha_10years', 'fund_beta_10years', 'fund_mean_annual_return_10years',
    'fund_r_squared_10years', 'fund_stdev_10years', 'fund_sharpe_ratio_10years', 'fund_treynor_ratio_10years'
]

rank_columns = join_keys + [
    'fund_return_category_rank_ytd', 'fund_return_category_rank_1month', 'fund_return_category_rank_3months',
    'fund_return_category_rank_1year', 'fund_return_category_rank_3years', 'fund_return_category_rank_5years',
    'load_adj_return_1year', 'load_adj_return_3years', 'load_adj_return_5years', 'load_adj_return_10years'
]

sustainability_columns = join_keys + [
    'sustainability_score', 'sustainability_rank', 'esg_peer_group', 'esg_peer_count', 'esg_score',
    'peer_esg_min', 'peer_esg_avg', 'peer_esg_max', 'environment_score', 'peer_environment_min',
    'peer_environment_avg', 'peer_environment_max', 'social_score', 'peer_social_min',
    'peer_social_avg', 'peer_social_max', 'governance_score', 'peer_governance_min',
    'peer_governance_avg', 'peer_governance_max'
]

# List of grouped DataFrames
group_dfs = {
    "identification": funds.select(*id_columns),
    "volume": funds.select(*volume_columns),
    "trend": funds.select(*trend_columns),
    "expenses": funds.select(*expense_columns),
    "assets": funds.select(*asset_columns),
    "sectors": funds.select(*sector_columns),
    "valuation": funds.select(*valuation_columns),
    "bonds": funds.select(*bond_columns),
    "holdings": funds.select(*holding_columns),
    "returns_period": funds.select(*return_columns),
    "returns_annual": funds.select(*annual_return_columns),
    "returns_quarter":funds.select(*quarter_return_columns),
    "risk": funds.select(*risk_columns),
    "rank":funds.select(*rank_columns),
    "sustainability":funds.select(*sustainability_columns)
}

print(f"Created {len(group_dfs)} grouped DataFrames by column pattern.")


In [0]:
# Apply function to each DataFrame
dbfs_output = "dbfs:/tmp/silver/funds"

with ThreadPoolExecutor() as executor:
    futures = [
        executor.submit(clean_and_store, df, name, id_columns[:2], dbfs_output)
        for name, df in group_dfs.items()
    ]
    for future in futures:
        future.result()

In [0]:
funds.unpersist()

In [0]:
# Example
display(dbutils.fs.ls("dbfs:/tmp/silver/funds/assets/"))


In [0]:
display(dbutils.fs.ls("dbfs:/tmp/silver/etfs/"))



# Final Work

In [0]:
dbutils.fs.rm("dbfs:/tmp/bronze/", recurse=True)

In [0]:
%run ./funds_env

In [0]:
%run ./etfs_env

In [0]:


class Cleaning:
    
    def __init__(self):
        self.paths_ingest = {
            "fund_a_e": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - A-E.csv",
            "fund_f_k": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - F-K.csv",
            "fund_l_p": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - L-P.csv",
            "d_q_z":   "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFund prices - Q-Z.csv",
            "etf_prices": "dbfs:/tmp/bronze/mutual-funds-and-etfs/ETF prices.csv",
            "etfs": "dbfs:/tmp/bronze/mutual-funds-and-etfs/ETFs.csv",
            "funds": "dbfs:/tmp/bronze/mutual-funds-and-etfs/MutualFunds.csv"
        }

    def import_data(self):
        # Downloading the dataset from kaggle https://www.kaggle.com/datasets/stefanoleone992/mutual-funds-and-etfs/data
        path = kagglehub.dataset_download("stefanoleone992/mutual-funds-and-etfs")

        #Moving dataset from root to DBFS
        local_path = Path(path) #where the file is stored
        
        dbfs_path = "dbfs:/tmp/bronze/mutual-funds-and-etfs" # Target path in DBFS

        for file in glob.glob(str(local_path / "*")): # Copy each file to DBFS
            file_name = os.path.basename(file)
            dbutils.fs.cp(f"file:{file}", f"{dbfs_path}/{file_name}")
        

    def ingest_data(self):

        #The CSV file contains all historical prices available on YahooFinance for the 2,310 scraped ETFs
        etf_prices = spark.read.csv(self.paths_ingest['etf_prices'], header=True, inferSchema=True)
        del self.paths_ingest['etf_prices']

        # The CSV file contains data for 2,310 ETFs with 142 different attributes
        etfs = spark.read.csv(self.paths_ingest['etfs'], header=True, inferSchema=True)
        del self.paths_ingest['etfs']

        # The CSV file contains data for 23,783 Mutual Funds with 298 different attributes
        funds =  spark.read.csv(self.paths_ingest['funds'], header=True, inferSchema=True)
        del self.paths_ingest['funds']
        # These datasets are a list of funds with their fund code, date and price 
        # The CSV file contains all historical prices available on YahooFinance for the scraped Mutual Funds with symbol starting with letters from A to E
        def load_csv(name, path):
            return name, spark.read.csv(path, header=True, inferSchema=True)

        with ThreadPoolExecutor() as executor:
            results = executor.map(lambda kv: load_csv(*kv), paths.items())

        # Collect results into named variables
        csv_dfs = dict(results)
        fund_prices = (
            csv_dfs["fund_a_e"]
            .unionByName(csv_dfs["fund_f_k"])
            .unionByName(csv_dfs["fund_l_p"])
            .unionByName(csv_dfs["d_q_z"])
        )

        return etf_prices, etfs, funds, fund_prices
    
    def transform(self,df, pipe):

        if pipe == 'funds':

            funds_quarter_return_columns = join_keys + [
                f'fund_return_{y}_q{q}' for y in range(2021, 1999, -1) for q in range(4, 0, -1)
            ]
            funds_quarter_return_columns.remove('fund_return_2021_q4')

            # List of grouped DataFrames
            group_dfs = {
                "identification": funds.select(*funds_id_columns),
                "volume": funds.select(*funds_volume_columns),
                "trend": funds.select(*funds_trend_columns),
                "expenses": funds.select(*funds_expense_columns),
                "assets": funds.select(*funds_asset_columns),
                "sectors": funds.select(*funds_sector_columns),
                "valuation": funds.select(*funds_valuation_columns),
                "bonds": funds.select(*funds_bond_columns),
                "holdings": funds.select(*funds_holding_columns),
                "returns_period": funds.select(*funds_return_columns),
                "returns_annual": funds.select(*funds_annual_return_columns),
                "returns_quarter":funds.select(*funds_quarter_return_columns),
                "risk": funds.select(*funds_risk_columns),
                "rank":funds.select(*funds_rank_columns),
                "sustainability":funds.select(*funds_sustainability_columns)
            }

            print(f"Created {len(group_dfs)} grouped DataFrames by column pattern for FUNDS.")

            return group_dfs
        elif pipe == 'etfs':

            # Group 12: Returns - Annual
            etf_annual_return_columns = join_keys + [
                f'fund_return_{y}' for y in range(2020, 1999, -1)
            ] + [
                f'category_return_{y}' for y in range(2020, 1999, -1)
            ]

            # Group DataFrames
            group_dfs = {
                "identification": etfs.select(*etf_id_columns),
                "volume": etfs.select(*etf_volume_columns),
                "trend": etfs.select(*etf_trend_columns),
                "strategy": etfs.select(*etf_strategy_columns),
                "expenses": etfs.select(*etf_expense_columns),
                "assets": etfs.select(*etf_asset_columns),
                "sectors": etfs.select(*etf_sector_columns),
                "valuation": etfs.select(*etf_valuation_columns),
                "bonds": etfs.select(*etf_bond_columns),
                "holdings": etfs.select(*etf_holding_columns),
                "returns_period": etfs.select(*etf_return_columns),
                "returns_annual": etfs.select(*etf_annual_return_columns),
                "risk": etfs.select(*etf_risk_columns)
            }

            print(f"Created {len(group_dfs)} grouped DataFrames by column pattern for ETFs.")
            return group_dfs

    def clean_and_store(self, df: DataFrame, name: str, id_columns: list, output_path: str):
        print(f"Starting processing for group: {name}")
        start_time = time.time()

        # Remove rows where all non-ID columns are null
        cols_to_check = [c for c in df.columns if c not in id_columns]
        df = df.dropna(subset=cols_to_check, how="all")

        # Remove rows with string values in non-ID columns
        for c in cols_to_check:
            df = df.filter(~col(c).rlike("[^0-9.,-]"))

        # Convert non-ID columns to DoubleType
        for c in cols_to_check:
            df = df.withColumn(c, F.regexp_replace(F.col(c), ',', ''))
            df = df.withColumn(c, F.when(F.col(c).isin("NULL", ""), None).otherwise(F.col(c).cast(DoubleType())))

        non_double_columns = [f.name for f in df.schema.fields if not isinstance(f.dataType, DoubleType) and f.name not in id_columns]

        for column in non_double_columns:
            desc_vals = df.select(column).distinct().orderBy(F.col(column).desc()).limit(1000).rdd.map(lambda r: str(r[0])).collect()
            asc_vals = df.select(column).distinct().orderBy(F.col(column).asc()).limit(1000).rdd.map(lambda r: str(r[0])).collect()

            def is_mixed(vals):
                return any(not v.replace('.', '', 1).replace('-', '', 1).isdigit() for v in vals)

            if is_mixed(desc_vals) or is_mixed(asc_vals):
                # Separate rows with string-like values
                string_rows = df.filter(F.col(column).rlike("[^0-9.,-]"))
                fund_prices = df.filter(~F.col(column).rlike("[^0-9.,-]"))
                fund_prices = df.withColumn(column, F.col(column).cast(DoubleType()))

                print(f"\nColumn '{column}' contained mixed values. String rows moved to separate DataFrame.")
                string_rows.select(column).distinct().show()
        

        # Add insertion_date column
        df = df.withColumn("insertion_date", current_date())

        # Repartition for parallel writing
        df = df.repartition(100)

        # Cache if reused
        df.cache()

        # Write to DBFS partitioned by insertion_date
        try: 
            output_dir = f"{output_path}/{name}/"
            df.write.mode("overwrite").partitionBy("insertion_date").parquet(output_dir)
        except Exception as e:
            print(f'Error Processing group {name}, Exception {e}')

        df.unpersist()

        end_time = time.time()
        print(f"Finished processing group: {name} in {end_time - start_time:.2f} seconds. Saved to {output_dir}")



    def run_etl_pipeline(self):

        print('Importing Data')
        self.import_data()
        print('Ingesting data')
        etf_prices, etfs, funds, fund_prices = self.ingest_data()

        print('Loading treating dic etfs...')
        group_dfs_etfs = self.transform(etfs, 'etfs')
        print('Loading treating dic funds...')
        group_dfs_funds = self.transform(funds, 'funds')

        # ===== ETFS Transform and Loading
        # ==================
        print('Transforming ETFs')
        etfs = etfs.repartition(200).cache()
        etfs.count()  # to trigger caching immediately

        # Apply function to each DataFrame
        dbfs_output = "dbfs:/tmp/silver/etfs"

        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(self.clean_and_store, df, name, id_columns[:2], dbfs_output)
                for name, df in group_dfs_etfs.items()
            ]
            for future in futures:
                future.result()
        
        etfs.unpersist()

        # ======== FUNDS Transform and Loading
        # ==================

        print('Transforming Funds')
        funds = funds.repartition(200).cache()
        funds.count()  # to trigger caching immediately


       # Apply function to each DataFrame
        dbfs_output = "dbfs:/tmp/silver/funds"

        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(self.clean_and_store, df, name, id_columns[:2], dbfs_output)
                for name, df in group_dfs_funds.items()
            ]
            for future in futures:
                future.result()
        
        funds.unpersist()

        # ========== ETF_PRICES Transform and Loading
        # =========================
        print('Loading ETF PRICES')
        etf_prices = etf_prices.withColumn("insertion_date", current_date())

        etf_prices = etf_prices.repartition(200).cache()
        etf_prices.cache()

        dbfs_output = "dbfs:/tmp/silver/etf_prices"

        # Write to DBFS partitioned by insertion_date
        try: 
            output_dir = f"{dbfs_output}/"
            etf_prices.write.mode("overwrite").partitionBy("insertion_date").parquet(output_dir)
        except Exception as e:
            print(f'Error Processing group {name}, Exception {e}')

        etf_prices.cache()
        # ========== FUND_PRICES Transform and Loading
        # =========================
        print('Loading Fund Prices')
        fund_prices = fund_prices.withColumn("insertion_date", current_date())
        dbfs_output = "dbfs:/tmp/silver/fund_prices"

        fund_prices = fund_prices.repartition(200).cache()
        fund_prices.cache()

        # Write to DBFS partitioned by insertion_date
        try: 
            output_dir = f"{dbfs_output}/"
            fund_prices.write.mode("overwrite").partitionBy("insertion_date").parquet(output_dir)
        except Exception as e:
            print(f'Error Processing group {name}, Exception {e}')

        fund_prices.cache()

        print('Cleaning pipeline finished')



In [0]:
cleaning = Cleaning()
cleaning.run_etl_pipeline()