In [0]:
import numpy as np
import pyspark as ps
import pyspark.pandas as pd
from pyspark.sql.functions import col
from pyspark.sql import SparkSession, Window
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.ansi.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
np.random.seed(42)

#Tout les colones requises pour le calcul doivent faire partie du meme dataframe
df = pd.DataFrame({
    'taux_hyp': np.random.rand(2000),
    'frequence': np.random.rand(2000),
    'maturity': np.random.randint(200,size=2000),
    'versement': np.random.rand(2000),
    'solde_capital': np.random.rand(2000),
    'tx_interet': 0,
    'cf_mortgage': 0
})

#Ajout des 1200 Cashflow
colones = dict()
for i in range(1200):
    colones[f'Cash Flow {i+1}'] = 0

df = df.reindex(columns=list(df.columns) + list(colones.keys()))

#Converger vers spark
df = df.to_spark()

#Retirer les maturity = 0
df = df.filter(col("maturity") > 0)

#Operations - Cree un nouveau DataFrame avec la nouvelle column calcule
df = df.withColumn("tx_interet", col("taux_hyp") / col("frequence"))
df = df.withColumn("cf_mortgage", col("tx_interet") * col("versement"))



display(df.head(100))


In [0]:
def projection_cf_mortgage(data):
 
    # Initialize the DataFrame
    cfs = np.zeros((len(data), 1200))
   
    # Iterate over each row in the data
    for j in range(len(data)):
        if data.iloc[j]['maturity']>0:
            tx_interet = data.iloc[j]['taux_hyp'] / data.iloc[j]['frequence']
            maturity = data.iloc[j]['maturity']
            pay_cap = data.iloc[j]['versement']
            notionnel_cul = np.zeros(maturity + 1)
            notionnel_cul[0] = data.iloc[j]['solde_capital']
           
            for k in range(maturity):
                notionnel_cul[k + 1] = notionnel_cul[k] - pay_cap + notionnel_cul[k]*tx_interet
                cfs[j, k] += pay_cap
 
            cfs[j, maturity - 1] += notionnel_cul[maturity]
 
    # Convert the NumPy array to a DataFrame
    cfs_df = pd.DataFrame(cfs, columns=[f'Cash Flow {i}' for i in range(1, cfs.shape[1] + 1)])
   
    # Concatenate the data with the cash flows
    result = pd.concat([data.reset_index(drop=True), cfs_df], axis=1)
    return result