In [None]:
import numpy as np
import pandas as pd
import os
import numpy as np
from pmdarima.arima import auto_arima
import pmdarima as pmd
import warnings
warnings.filterwarnings('ignore')
import mysql.connector
import findspark

import pyspark
from pyspark.sql import SparkSession
import json
from pyspark.sql.functions import col, lit, udf,trunc,concat_ws
from pyspark.sql.types import IntegerType,BooleanType,DateType
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display
from IPython.core.display import HTML, display
from builtins import max
from builtins import min
from tqdm import tqdm


In [2]:
pwd

'/home/VECVNET/zzasom/jupyter/01_Developement_work/01_Codes/Black Forecasting/Monte Carlo/Dec/1153/Builds'

In [3]:
# reading the path file which contains the path for credential json file
filepath = open("/home/VECVNET/CONFIG/path.json", "r")
path = json.load(filepath)
filepath.close()
path = list(path.values())[0]

In [4]:
%store -r enteredCred
user=enteredCred[0]
password=enteredCred[1]
db=enteredCred[2]
url=enteredCred[3]
driver=enteredCred[4]

In [5]:
with open(path + '/config.json') as f:
    data = json.load(f)
    growthRate = data['growthRate']
    cores_max = data['cores_max']
    executor_memory = data['executor_memory']
    driver_memory = data['driver_memory']
    debug_maxToStringFields = data['debug_maxToStringFields']
    default_parallelism = data['default_parallelism']
    driver_maxResultSize = data['driver_maxResultSize']
    spark_network_timeout = data['spark_network_timeout']

In [6]:
# initializing spark session
findspark.init()

In [7]:
# setting up the different config required in the server
# This configs help to run the spark session uninteruptedlly
# The config in the spark session can be readilly updated to improve the running process of the code
# The session would bind a port in spark UI and the progress and also be monitored.
# set warning as false to remove the messages

spark = SparkSession.builder.master("spark://10.106.111.117:7077") \
        .appName("Monte Carlo - purchasing group buyer counts") \
        .config("spark.cores.max",2) \
        .config("spark.driver.memory",driver_memory) \
        .config("spark.executor.memory", '20g') \
        .config("spark.debug.maxToStringFields",debug_maxToStringFields) \
        .config("spark.default.parallelism",default_parallelism) \
        .config("spark.driver.maxResultSize",driver_maxResultSize) \
        .config("spark.network.timeout",spark_network_timeout) \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/30 10:51:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
# importing arrow from spark for faster execution
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

#### code for loop creation 

In [9]:
import os
import pandas as pd
from datetime import datetime, timedelta
from functools import reduce

# === Constants ===
NUM_SIMULATIONS = 10000
INPUT_FOLDER = "/home/VECVNET/zzasom/jupyter/01_Development_work/Black_Forecasting/Deployment/Input"
OUTPUT_FOLDER = "/home/VECVNET/zzasom/jupyter/01_Development_work/Black_Forecasting/Deployment/Output"
BASE_DATE = datetime(2025, 5, 26)  # Fixed base date: 12 May 2025
DAYS_AHEAD = 3  # Today + next 3 days

# === Load PG Mapping File ===
pg_file_path = os.path.join(INPUT_FOLDER, "MRPcontrollerPurchasingGrp.csv")
if not os.path.exists(pg_file_path):
    raise FileNotFoundError("PG file not found.")

df_pg = pd.read_csv(pg_file_path)
df_pg['Material_PG_Concat'] = df_pg['Material'].astype(str) + "_" + df_pg['Purchasing Group Details'].astype(str)
df_pg_merge = df_pg[['Material', 'Purchasing Group Details', 'MRP controller details']].rename(
    columns={'Material': 'Material Code'}
)

# === Function to Process a Single Date ===
def process_day(date_obj, filter_from_df):
    date_str = date_obj.strftime('%Y%m%d')
    file_name = f"MC_day_wise_{NUM_SIMULATIONS}_{date_str}.xlsx"
    file_path = os.path.join(OUTPUT_FOLDER, file_name)

    if not os.path.exists(file_path):
        print(f"[{date_str}] File not found: {file_name}")
        return None, None

    try:
        xls = pd.ExcelFile(file_path)
        for sheet_name in xls.sheet_names:
            df_mc = pd.read_excel(xls, sheet_name=sheet_name)
            if not df_mc.empty:
                break
        else:
            print(f"[{date_str}] All sheets are empty.")
            return None, None

        # Merge PG info
        df_mc = pd.merge(df_mc, df_pg_merge, on='Material Code', how='inner')
        df_mc['Material_PG_Concat'] = df_mc['Material Code'].astype(str) + "_" + df_mc['Purchasing Group Details'].astype(str)
        df_mc['Stockout Probability (%)'] = pd.to_numeric(df_mc['Stockout Probability (%)'], errors='coerce')

        # Filter
        high_risk_df = df_mc[(df_mc['Stockout Probability (%)'] >= 50) & (df_mc['GIT'] == 0)]
        filtered_df = high_risk_df[['Running Date', 'Material Code', 'GIT', 'Purchasing Group Details', 'Material_PG_Concat']]

        if filter_from_df is not None:
            filtered_df = filtered_df[filtered_df['Material_PG_Concat'].isin(filter_from_df['Material_PG_Concat'])]

        # Count high-risk parts per PG
        col_label = f"Buyer_counts_{date_obj.strftime('%d%b')}"
        pg_counts = (
            filtered_df.groupby('Purchasing Group Details')
            .size()
            .reset_index(name=col_label)
        )

        return filtered_df, pg_counts

    except Exception as e:
        print(f"[{date_str}] Error processing: {e}")
        return None, None

# === Process Multiple Days ===
filtered_df_acc = None
pg_counts_all = []

for i in range(DAYS_AHEAD + 1):  # today to today + 3
    run_date = BASE_DATE + timedelta(days=i)
    filtered_df_acc, pg_count = process_day(run_date, filtered_df_acc)
    if pg_count is not None:
        pg_counts_all.append(pg_count)

# === Merge and Compute Efficiency ===
if pg_counts_all:
    merged_pg_counts = reduce(
        lambda left, right: pd.merge(left, right, on='Purchasing Group Details', how='outer'),
        pg_counts_all
    )

    # Fill NaN with 0 and convert to int
    count_cols = [col for col in merged_pg_counts.columns if col.startswith("Buyer_counts_")]
    merged_pg_counts[count_cols] = merged_pg_counts[count_cols].fillna(0).astype(int)

    if len(count_cols) >= 2:
        first_col, last_col = count_cols[0], count_cols[-1]
        merged_pg_counts['pending %'] = (merged_pg_counts[last_col] / merged_pg_counts[first_col].replace(0, 1)) * 100
        merged_pg_counts['Efficiency %'] = 100 - merged_pg_counts['pending %']

        merged_pg_counts['pending %'] = merged_pg_counts['pending %'].round(2).astype(str) + '%'
        merged_pg_counts['Efficiency %'] = merged_pg_counts['Efficiency %'].round(2).astype(str) + '%'

    print(merged_pg_counts.head(2))
else:
    print("No data processed.")


  Purchasing Group Details  Buyer_counts_26May  Buyer_counts_27May  \
0               Akshay Raj                  35                  27   
1           Amarjeet Yadav                  37                  28   

   Buyer_counts_28May  Buyer_counts_29May pending % Efficiency %  
0                  20                  12    34.29%       65.71%  
1                  15                  13    35.14%       64.86%  


In [10]:
merged_pg_counts.head(7)

Unnamed: 0,Purchasing Group Details,Buyer_counts_26May,Buyer_counts_27May,Buyer_counts_28May,Buyer_counts_29May,pending %,Efficiency %
0,Akshay Raj,35,27,20,12,34.29%,65.71%
1,Amarjeet Yadav,37,28,15,13,35.14%,64.86%
2,Arun Kumar Shah,38,24,15,11,28.95%,71.05%
3,Ashish Purohit,12,9,7,7,58.33%,41.67%
4,Deepak Deshpande,32,29,28,26,81.25%,18.75%
5,Deepak Raipure,48,32,21,13,27.08%,72.92%
6,Girish R Sonawane,52,31,21,16,30.77%,69.23%


In [11]:
merged_pg_counts.to_excel('BF_26May_Dashboard_file.xlsx', index=False)


In [12]:
# Convert Pandas DataFrame to PySpark DataFrame
spark_df = spark.createDataFrame(merged_pg_counts)


In [13]:
# Write PySpark DataFrame to the database
spark_df.write.mode("overwrite").format("jdbc").options(
    driver=driver,
    user=user,
    password=password,
    url=url,
    dbtable="Black_Forecasting_Dashboard"
).save()

                                                                                

In [None]:
print("final output store in the table")

In [14]:
spark.stop()