In [0]:
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import numpy as np

Get Raw BTP Core Data

In [0]:
#credentials

scope = "xxxxxxx"
username = "xxxxxxx"
password_key = "xxxxx"
password = dbutils.secrets.get(scope=scope, key=password_key)


jdbc_url = "xxx"
jdbc_driver = "xxx"
jdbc_properties = {
    "user" : username,
    "password" : password,
    "driver" : jdbc_driver,
    "encrypt": "true",
    "validateCertificate": "false"
}

In [0]:
#get data from BWP

def spark_read_hana(query):
    df = spark.read \
        .format("jdbc") \
        .option("driver", jdbc_driver) \
        .option("url", jdbc_url) \
        .option("user", username) \
        .option("password", password) \
        .option("encrypt", True) \
        .option("validateCertificate", False) \
        .option("query", query) \
        .load()
    df.show()
    return df

In [0]:
cacv_query = 'SELECT "MONTH", "CUSTOMER_ID", "IS_BUNDLE", "SOURCE", "ORDER_ID", "IS_OVERCONSUMPTION", "CONTRACT_START_DATE", "CONTRACT_END_DATE", "TENANT_EXTERNAL_ID", "ACCOUNT_ID", "MATERIAL", "METRIC_ID", "PLANNING_ENTITY_ID", "PLANNING_ENTITY_NAME", "SERVICE_SHORT_NAME", sum("TOTAL_CONSUMPTION") AS "TOTAL_CONSUMPTION", sum("TOTAL_USAGE") AS "TOTAL_USAGE", avg("CONSUMPTION_RATIO") AS "CONSUMPTION_RATIO", sum("TOTAL_CONSUMPTION_KEUR") AS "TOTAL_CONSUMPTION_KEUR", sum("MONTHLY_CONTRACT_NET_VALUE") AS "MONTHLY_CONTRACT_NET_VALUE", sum("OVERAGE_CONSUMPTION") AS "OVERAGE_CONSUMPTION", sum("TRAFFIC") AS "TRAFFIC", sum("LICENSE_COUNT") AS "LICENSE_COUNT" FROM "_SYS_BIC"."corp.clus.cacv.c4s/CL_CORP_CLUS_CACV_C4S_CONSUMED_ACV_UNITED" WHERE ( UPPER("MONTH") NOT LIKE  UPPER(\'%202408%\')) AND "SOURCE" LIKE \'BTP_CORE\' GROUP BY "MONTH", "CUSTOMER_ID", "IS_BUNDLE", "IS_OVERCONSUMPTION", "SOURCE","ORDER_ID", "CONTRACT_START_DATE", "CONTRACT_END_DATE", "TENANT_EXTERNAL_ID", "ACCOUNT_ID", "MATERIAL", "METRIC_ID","PLANNING_ENTITY_ID", "PLANNING_ENTITY_NAME", "SERVICE_SHORT_NAME"'

cacv_raw_df = spark_read_hana(cacv_query)


In [0]:
#convert spark df to pandas df
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Convert DecimalType columns to DoubleType
cacv_raw_df = cacv_raw_df.withColumn('TOTAL_CONSUMPTION', col('TOTAL_CONSUMPTION').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('TOTAL_USAGE', col('TOTAL_USAGE').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('CONSUMPTION_RATIO', col('CONSUMPTION_RATIO').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('MONTHLY_CONTRACT_NET_VALUE', col('MONTHLY_CONTRACT_NET_VALUE').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('TOTAL_CONSUMPTION_KEUR', col('TOTAL_CONSUMPTION_KEUR').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('TRAFFIC', col('TRAFFIC').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('LICENSE_COUNT', col('LICENSE_COUNT').cast(DoubleType()))
cacv_raw_df = cacv_raw_df.withColumn('OVERAGE_CONSUMPTION', col('OVERAGE_CONSUMPTION').cast(DoubleType()))

# Convert PySpark DataFrame to pandas DataFrame
cacv_raw_pd_df = cacv_raw_df.toPandas()

In [0]:
def convert_specific_columns_to_float(df, columns):
    for column in columns:
        if column in df.columns:
            try:
                df[column] = df[column].astype(float)
                print(f"Column '{column}' converted to float.")
            except ValueError:
                print(f"Column '{column}' cannot be converted to float. Non-numeric data found.")
        else:
            print(f"Column '{column}' not found in the dataframe.")
    return df

In [0]:
# List of columns to convert to float
columns_to_convert = ['TOTAL_CONSUMPTION','TOTAL_USAGE','CONSUMPTION_RATIO','TOTAL_CONSUMPTION_KEUR', 'MONTHLY_CONTRACT_NET_VALUE', 'TRAFFIC', 'LICENSE_COUNT', 'OVERAGE_CONSUMPTION']

cacv_raw_pd_df = convert_specific_columns_to_float(cacv_raw_pd_df, columns_to_convert)

In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/btp_core_raw.parquet'

# Save DataFrame to Parquet
cacv_raw_pd_df.to_parquet(dataset_path, engine='pyarrow', compression='snappy')

# Read DataFrame from Parquet
cacv_raw_pd_df = pd.read_parquet(dataset_path, engine='pyarrow')
print(cacv_raw_pd_df)

BTP Core Data Feature Enhancement

In [0]:
# Step 1: Convert 'MONTH' from YYYYMM to YYYY-MM-01 format and then to datetime
cacv_raw_pd_df['MONTH'] = cacv_raw_pd_df['MONTH'].apply(lambda x: pd.to_datetime(str(x) + '01', format='%Y%m%d', errors='coerce'))

# Step 2: Convert 'CONTRACT_START_DATE' and 'CONTRACT_END_DATE' to datetime as well
cacv_raw_pd_df['CONTRACT_START_DATE'] = pd.to_datetime(cacv_raw_pd_df['CONTRACT_START_DATE'], errors='coerce')
cacv_raw_pd_df['CONTRACT_END_DATE'] = pd.to_datetime(cacv_raw_pd_df['CONTRACT_END_DATE'], errors='coerce')

# Step 3: Calculate LATEST_CONTRACT as the difference in months between 'MONTH' and 'CONTRACT_START_DATE'
cacv_raw_pd_df['LATEST_CONTRACT'] = ((cacv_raw_pd_df['MONTH'].dt.year - cacv_raw_pd_df['CONTRACT_START_DATE'].dt.year) * 12 + 
                                          (cacv_raw_pd_df['MONTH'].dt.month - cacv_raw_pd_df['CONTRACT_START_DATE'].dt.month)).astype('Int64')

# Step 4: Calculate the contract duration in months between 'CONTRACT_START_DATE' and 'CONTRACT_END_DATE'
cacv_raw_pd_df['CONTRACT_DURATION'] = ((cacv_raw_pd_df['CONTRACT_END_DATE'].dt.year - cacv_raw_pd_df['CONTRACT_START_DATE'].dt.year) * 12 + 
                                            (cacv_raw_pd_df['CONTRACT_END_DATE'].dt.month - cacv_raw_pd_df['CONTRACT_START_DATE'].dt.month)).astype('Int64')

# Step 5: Filter the DataFrame to keep only the specified columns plus the new calculated ones
columns_to_keep = ['MONTH', 'CUSTOMER_ID', 'TOTAL_CONSUMPTION', 'TOTAL_USAGE', 'CONSUMPTION_RATIO', 
                   'MONTHLY_CONTRACT_NET_VALUE', 'OVERAGE_CONSUMPTION', 'TRAFFIC', 'LICENSE_COUNT', 
                   'ORDER_ID', 'LATEST_CONTRACT', 'CONTRACT_DURATION', 'IS_OVERCONSUMPTION', 'IS_BUNDLE', 'SERVICE_SHORT_NAME']

filtered_df = cacv_raw_pd_df[columns_to_keep]

# Step 6: Continue with further analysis or aggregation
print(filtered_df)


In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/btp_core_raw_feature_enhanced.parquet'

# Save DataFrame to Parquet
filtered_df.to_parquet(dataset_path, engine='pyarrow', compression='snappy')

# Read DataFrame from Parquet
btp_core_raw_feature_enhanced = pd.read_parquet(dataset_path, engine='pyarrow')
print(btp_core_raw_feature_enhanced)

BTP Core Data Aggregated on Month/Customer and Enhancement

In [0]:
# Step 1: Create Integration Suite and Cloud Integration columns based on SERVICE_SHORT_NAME
btp_core_raw_feature_enhanced['Integration_Suite'] = np.where(
    btp_core_raw_feature_enhanced['SERVICE_SHORT_NAME'].str.contains('Integration Suite', case=False), 1, 0)

btp_core_raw_feature_enhanced['Cloud_Integration'] = np.where(
    btp_core_raw_feature_enhanced['SERVICE_SHORT_NAME'].str.contains('Cloud Integration', case=False), 1, 0)

# Step 2: Group by MONTH and CUSTOMER_ID, aggregate, and set 1 or 0 for Integration Suite and Cloud Integration
aggregated_df = btp_core_raw_feature_enhanced.groupby(['MONTH', 'CUSTOMER_ID']).agg({
    'TOTAL_CONSUMPTION': 'sum',
    'TOTAL_USAGE': 'sum',
    'CONSUMPTION_RATIO': 'mean',
    'OVERAGE_CONSUMPTION': 'sum',
    'TRAFFIC': 'sum',
    'LICENSE_COUNT': 'sum',
    'MONTHLY_CONTRACT_NET_VALUE': 'sum',
    'LATEST_CONTRACT': 'min',  # Get the latest contract start date
    'CONTRACT_DURATION': ['sum', 'mean'],  # Get both the sum and the mean of contract durations
    'IS_OVERCONSUMPTION': lambda x: (x == 'Y').sum(),  # Count how many orders are overconsuming
    'IS_BUNDLE': lambda x: 1 if 'Y' in x.values else 0,  # Binary value for IS_BUNDLE
    'ORDER_ID': 'nunique',  # Count the number of distinct orders
    'Integration_Suite': 'max',  # Aggregate Integration Suite column
    'Cloud_Integration': 'max'   # Aggregate Cloud Integration column
}).reset_index()

# Step 3: Rename the columns to more meaningful names
aggregated_df.columns = ['_'.join(col).strip('_') for col in aggregated_df.columns.values]

# Step 4: Rename specific columns
aggregated_df.rename(columns={
    'ORDER_ID_nunique': 'ORDER_COUNT',
    'IS_OVERCONSUMPTION_<lambda>': 'OVERCONSUMPTION_COUNT',
    'IS_BUNDLE_<lambda>': 'BUNDLE_INDICATOR',
    'CONTRACT_DURATION_sum': 'CONTRACT_DURATION_SUM',
    'CONTRACT_DURATION_mean': 'CONTRACT_DURATION_MEAN',
    'Integration_Suite_max': 'Integration_Suite',
    'Cloud_Integration_max': 'Cloud_Integration'
}, inplace=True)

# Print the resulting DataFrame
print(aggregated_df)

In [0]:
# Create a new active Contract Field
aggregated_df['Active_Contract'] = 1


In [0]:
# Ensure the MONTH column is in datetime format
aggregated_df['MONTH'] = pd.to_datetime(aggregated_df['MONTH'])



In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/aggregated_btp_core.parquet'

# Save DataFrame to Parquet
aggregated_df.to_parquet(dataset_path, engine='pyarrow', compression='snappy')

# Read DataFrame from Parquet
aggregated_btp_core = pd.read_parquet(dataset_path, engine='pyarrow')
print(aggregated_btp_core)

Get Raw Account Main Data

In [0]:
#get MADA data
mada_query = 'SELECT * FROM "_SYS_BIC"."corp.clus.crossCacv.core/CL_CORP_CLUS_CROSS_CACV_MADA/dp/ACCOUNT_MAIN"'

mada_df = spark_read_hana(mada_query)

In [0]:
# Convert PySpark DataFrame to pandas DataFrame
mada_pd_df = mada_df.toPandas()

In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/Mada.parquet'

In [0]:
# Save DataFrame to Parquet
mada_pd_df.to_parquet(dataset_path, engine='pyarrow', compression='snappy')

In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/Mada.parquet'
# Read DataFrame from Parquet
Mada = pd.read_parquet(dataset_path, engine='pyarrow')
print(Mada)

Join Mada Account Main Data with BTP Core Data

In [0]:
# Perform an inner join on PBUP_AC and CUSTOMER_ID
merged_df = pd.merge(aggregated_btp_core, Mada, left_on='CUSTOMER_ID', right_on='PBUP_AC', how='left')

# Print the resulting DataFrame
print(merged_df)

In [0]:
# Define the columns to be dropped
columns_to_drop = [
    'PBUP_AC', 
    'CUSTOMER_TEXT', 
    'ISS_ID', 
    'PLANNING_ENTITY_ID', 
    'PLANNING_ENTITY_TEXT',
    'TOTAL_USAGE_sum', 
    'CONSUMPTION_RATIO_mean', 
    'OVERAGE_CONSUMPTION_sum', 
    'TRAFFIC_sum'
]

# Drop the specified columns from the DataFrame
filtered_merged_df = merged_df.drop(columns=columns_to_drop)

# Drop rows where CUSTOMER_ID is '0018894795', '0050124806', or '0050145084'
filtered_merged_df = filtered_merged_df[~filtered_merged_df['CUSTOMER_ID'].isin(['0018894795', '0050124806', '0050145084'])]

# Rename specified columns
filtered_merged_df = filtered_merged_df.rename(columns={
    'MONTH': 'DATE',
    'MONTHLY_CONTRACT_NET_VALUE_sum': 'MONTHLY_CONTRACT_NET_VALUE_SUM',
    'LICENSE_COUNT_sum': 'LICENSE_COUNT_SUM',
    'LATEST_CONTRACT_min': 'LATEST_CONTRACT_MIN',
    'Integration_Suite': 'INTEGRATION_SUITE',
    'Cloud_Integration': 'CLOUD_INTEGRATION',
    'Active_Contract': 'ACTIVE_CONTRACT',
    'TOTAL_CONSUMPTION_sum' : 'TOTAL_CONSUMPTION_SUM'
})

# Print the resulting DataFrame
print(filtered_merged_df)

# Optionally, print the remaining column names
print(filtered_merged_df.columns)


In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/df_aggregated_btp_core_with_mada.parquet'

# Save DataFrame to Parquet
filtered_merged_df.to_parquet(dataset_path, engine='pyarrow', compression='snappy')

Dataframe with populated months filled with 0

In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/df_aggregated_btp_core_with_mada.parquet'


# Read DataFrame from Parquet
filtered_merged_df = pd.read_parquet(dataset_path, engine='pyarrow')
print(filtered_merged_df)

In [0]:
# Convert DATE to datetime if it's not already
filtered_merged_df['DATE'] = pd.to_datetime(filtered_merged_df['DATE'])

# Get the range of DATEs in the dataset
all_DATEs = pd.date_range(start=filtered_merged_df['DATE'].min(), end=filtered_merged_df['DATE'].max(), freq='MS')

# Create a complete DataFrame with all CUSTOMER_ID and DATE combinations
customer_DATE_combinations = pd.MultiIndex.from_product(
    [filtered_merged_df['CUSTOMER_ID'].unique(), all_DATEs], names=['CUSTOMER_ID', 'DATE'])

complete_df = pd.DataFrame(index=customer_DATE_combinations).reset_index()

# Merge the complete DataFrame with the original data to fill in missing DATEs with NaN
complete_df = complete_df.merge(filtered_merged_df, on=['CUSTOMER_ID', 'DATE'], how='left')


In [0]:
# List of columns to be filled with stable values
stable_columns = [
    'CUSTOMER_ID', 'ISS_TEXT', 
    'GLOBAL_REGION', 'COUNTRY', 'SAP_MASTER_CODE'
]

# Group by CUSTOMER_ID and apply forward fill followed by backward fill on the stable columns
complete_df[stable_columns] = complete_df.groupby('CUSTOMER_ID')[stable_columns].ffill().bfill()


# Print the resulting DataFrame to verify the filling
complete_df


In [0]:
# Fill with 0
complete_df.fillna(0, inplace=True)

In [0]:
# Ensure the DataFrame is sorted by CUSTOMER_ID and DATE in ascending order
complete_df = complete_df.sort_values(by=['CUSTOMER_ID', 'DATE'], ascending=[True, True]).reset_index(drop=True)

# Create lag features for TOTAL_CONSUMPTION
for lag in range(1, 4):  # Lag features for the last 3 months
    complete_df[f'TOTAL_CONSUMPTION_LAG_{lag}'] = complete_df.groupby('CUSTOMER_ID')['TOTAL_CONSUMPTION_SUM'].shift(lag)

# Create rolling averages (3-month and 6-month) and ensure proper index alignment
complete_df['TOTAL_CONSUMPTION_ROLLING_3'] = (
    complete_df.groupby('CUSTOMER_ID')['TOTAL_CONSUMPTION_SUM']
    .rolling(window=3)
    .mean()
    .reset_index(level=0, drop=True)  # Reset index to align with original DataFrame
)

complete_df['TOTAL_CONSUMPTION_ROLLING_6'] = (
    complete_df.groupby('CUSTOMER_ID')['TOTAL_CONSUMPTION_SUM']
    .rolling(window=6)
    .mean()
    .reset_index(level=0, drop=True)  # Reset index to align with original DataFrame
)

# Create sine and cosine of month for seasonality
complete_df['MONTH'] = complete_df['DATE'].dt.month  # Extract the month
complete_df['MONTH_SIN'] = np.sin(2 * np.pi * complete_df['MONTH'] / 12)
complete_df['MONTH_COS'] = np.cos(2 * np.pi * complete_df['MONTH'] / 12)

# Create a trend feature (linear progression of months)
complete_df['TREND'] = complete_df.groupby('CUSTOMER_ID').cumcount() + 1  # Cumulative count of months per customer

# Create difference features (change in consumption from the previous month)
complete_df['TOTAL_CONSUMPTION_DIFF_1'] = complete_df.groupby('CUSTOMER_ID')['TOTAL_CONSUMPTION_SUM'].diff(1)

# Create cumulative sum of consumption (ensure proper index alignment)
complete_df['TOTAL_CONSUMPTION_CUMSUM'] = (
    complete_df.groupby('CUSTOMER_ID')['TOTAL_CONSUMPTION_SUM']
    .cumsum()
    .reset_index(level=0, drop=True)
)

# Create Exponential Moving Average (3-month EMA)
complete_df['TOTAL_CONSUMPTION_EMA_3'] = (
    complete_df.groupby('CUSTOMER_ID')['TOTAL_CONSUMPTION_SUM']
    .ewm(span=3, adjust=False)
    .mean()
    .reset_index(level=0, drop=True)
)

# Create other features like YEAR if needed
complete_df['YEAR'] = complete_df['DATE'].dt.year


In [0]:
# Fill with 0
complete_df.fillna(0, inplace=True)

In [0]:
# Define the relative path to the Dataset folder
dataset_path = '../Datasets/df_aggregated_month_populated_btp_core.parquet'

# Save DataFrame to Parquet
complete_df.to_parquet(dataset_path, engine='pyarrow', compression='snappy')

# Read DataFrame from Parquet
df_aggregated_month_populated_btp_core = pd.read_parquet(dataset_path, engine='pyarrow')
print(df_aggregated_month_populated_btp_core)

Subset of Dataframe with customers having an active contract for whole period

In [0]:
# Group by customer and count active contracts for each customer
customer_active_counts = df_aggregated_month_populated_btp_core.groupby('CUSTOMER_ID')['ACTIVE_CONTRACT'].sum()

# Filter customers who have 31 active months
customers_with_full_active_contracts = customer_active_counts[customer_active_counts == 31].index

# Filter the original DataFrame to keep only those customers
df_filtered_active_customers = df_aggregated_month_populated_btp_core[df_aggregated_month_populated_btp_core['CUSTOMER_ID'].isin(customers_with_full_active_contracts)]

# Save the filtered DataFrame to a new Parquet file
filtered_dataset_path = '../Datasets/df_filtered_active_customers.parquet'
df_filtered_active_customers.to_parquet(filtered_dataset_path, engine='pyarrow', compression='snappy')

# Print the filtered DataFrame
print(df_filtered_active_customers)