## Software Development Lifecycle
### Requirement Gathering

This process is currently managed via Excel VBA, using data collected from a system known as InvestOne. The requirement for the information is to transform it into reports that are in a format accepted by external companies, as well as in a way that is more useful for internal users.

### Design

The output for these reports are simple excel formats, with data summarised into several columns. There are two main versions of these reports, list of holdings (from SecurityDesc) and cash and fund values (from Fund Trend). The other version is similar, but also adds an additional column for holdings as a % of fund value. To streamline this process, we will implement one table which houses a concatenation of data from both SecuirtyDesc and FundTrend, adding in the % column, to then be further transfromed into the final reports.

# Important Note

### As this report involves confidential data, sample datasets cannot be provided. The code should be evaluated on its quality, readability, and implementation approach.

### Implementation

## Imports

In [None]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from notebookutils import fs

## Get Data

In [None]:
# Gathered data from a OneLake lakehouse using pyspark SQL functions and converts them to pandas to be amended via Python
dfsec = spark.sql("SELECT * FROM Marlborough_Lakehouse.brz_securitydesc").toPandas()
dftren = spark.sql("SELECT * FROM Marlborough_Lakehouse.brz_fundtrendclass").toPandas()

## Remove Unnecessary Columns

In [None]:
# Trim each report down to only the columns required for the final report
dfsec = dfsec[['Security_Number',
'Shares_Par',
'Security_Description_Short',
'Strike_Price',
'Traded_Market_Value_Base',
'Trading_Currency',
'Position_Date',
'Account_Name',
'Security_ISIN']]

dftren = dftren[['Account_Name',
'Net_Income_Amount',
'OEIC_Uninvested_Principal_Cash_Bid',
'OEIC_Security_Value',
'Cancellation_Value',
'Net_Unrealized_G/L_On_FEC',
'Account_Class',
'Date_As_Date']]

# Process Data

In [None]:
#Rename column from dftren to match dfsec, for easier filtering
dftren.rename(columns={"Date_As_Date": "Position_Date"}, inplace=True)

#Filter dftren to only account class 0, which is the total AUM for each fund.
dftren = dftren[dftren['Account_Class'] == 0]

# Merge dfsec with a subset of dftren on 'Account_Number' and 'Position_Date'
# This brings in 'Cancellation_Value' from dftren where matching keys exist
# Code generated using information from https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html
merged_df = dfsec.merge(dftren[["Account_Number","Position_Date",'Cancellation_Value']], on=["Account_Number","Position_Date"], how="left")

#Calcualte the AUM percentage for each holding within dfsec, using the AUM from dftren (now combined)
merged_df['%'] = merged_df['Traded_Market_Value_Base'] / merged_df['Cancellation_Value']

# Set variables for funds and output to be used in a loop
funds = merged_df['Account_Name'].unique() # Will need to add testing here once mapping is available to ensure all desired accounts are captured.
output = pd.DataFrame()

# Using a f string to create the final output in the desired format and loop through each fund
for f in funds:
    # Loads each fund by account name into the output dataframe
    output = pd.concat([output, merged_df.loc[merged_df['Account_Name'] == f]], ignore_index=True)

    # Store the AUM value to be added into the summary row
    aum = merged_df.loc[merged_df['Account_Name'] == f, 'Cancellation_Value'].iloc[0]

    # Add a mostly blank summary row after each list of securities with the total AUM, as required in the final output
    summary = pd.DataFrame(columns=merged_df.columns)
    summary.loc[0, 'Account_Name'] = f
    summary.loc[0, 'Cancellation_Value'] = aum

    # Brings the final output together, each value per Account Name from the original dfsec table, summerised by the total AUM from the dftren table.
    output = pd.concat([output, summary])

# Load to new table within the Lakehouse

### For completeness only, do not review

In [None]:
# Default table upload process used within Fabric for the organisation
# Student did not write this code manually, only added it and amended it to show the final step in the process

spark_df = spark.createDataFrame(output)
spark_df.withColumn('processed_date',current_timestamp()).write.mode('overwrite').format('delta').saveAsTable('daily_portfolios')

### Deployment and Maintenance (Future Scope)

Whilst the transformation is available, due to the hard coded elements which will need to be developed and called upon from elsewhere within the data environment, this process cannot yet be deployed. When it can, it will done seamlessly, as the output should match the historic files already produced in the manual/VBA process.

This process also currently runs and overwrites all data daily. Whilst this works short term, this will need to be updated long term to only process new data as it arrives.