In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, greatest, sum as spark_sum, mean, count,substring, lpad, create_map, expr,trim,round
from functools import reduce
from MHI.MHI_FORMATS import *
from itertools import chain
from pyspark import StorageLevel
from pyspark.sql import DataFrame
from typing import List
import time
import argparse
import concurrent.futures
import os
import sys

In [None]:
#parser = argparse.ArgumentParser()
#parser.add_argument("--inFile", help="Input data path", default='/Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA/sid_2021_10K.csv')
#parser.add_argument("--refFile", help="Input data path", default='/Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA/MHAO_v2024_21_10K.TXT')
#args, unknown = parser.parse_known_args()

#data_path = args.inFile
#sas_result_file = args.refFile

data_path = sys.argv[1] #"D:\\skhatiwada\\LogicExecutor\\Data\\sid_2021_100K.csv"
output_path = sys.argv[2] #"D:\\skhatiwada\\LogicExecutor\\PySpark\\Output\\sid_2021_100K"
execution_time_output_path = sys.argv[3] #"D:\\skhatiwada\\LogicExecutor\\PySpark\\Output\\sid_2021_100K_Execution.txt"

#if os.path.exists(data_path) == False:
    #sys.exit('Data file path does not exist')
    
# if os.path.exists(sas_result_file) == False:
#     sys.exit('Ref file path does not exist')
    
key_columns = ["PAYCAT", "YEAR", "HOSPST", "POVCAT", "RACECAT"] 

In [None]:
spark = SparkSession.builder.appName("MHI_AREA") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.cores", "2") \
    .config("spark.sql.codegen.wholeStage", "false") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .getOrCreate()
        
sc = spark.sparkContext

# sc.setLogLevel("WARN") 
# sc.setLogLevel("ERROR")
sc.setLogLevel("FATAL")
print("Log Level Set to FATAL")

In [None]:
start_time = time.time()
# Load input data
#data_size = '8M'#'10K'
#path: /Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA/sid_2021_
#input_data = spark.read.csv(f"{data_path}/sid_2021_{data_size}.csv", header=True, inferSchema=True)
input_data = spark.read.csv(f"{data_path}", header=True, inferSchema=True)
# Check for the presence of PAY1 and RACE columns
columns = input_data.columns
pay1_provided = "PAY1" in columns
race_provided = "RACE" in columns

if not pay1_provided:
    print("WARNING: The input data does not have PAY1. The software creates a fake PAY1 as PAY1=999 for the programs to run")
if not race_provided:
    print("WARNING: The input data does not have RACE. The software creates a fake RACE as RACE=999 for the programs to run")

In [None]:
dx_columns = [c for c in input_data.columns if c.startswith("DX") and c[2:].isdigit()]
pr_columns = [c for c in input_data.columns if c.startswith("PR") and c[2:].isdigit()]

def mdx(df: DataFrame, column: str, fmt_list: set) -> DataFrame:
    return df.withColumn(column, when(reduce(lambda a, b: a | b, [col(c).isNotNull() & col(c).isin(fmt_list) for c in dx_columns]), 1).otherwise(0))

def mpr(df: DataFrame, column: str, fmt_list: set) -> DataFrame:
    return df.withColumn(column, when(reduce(lambda a, b: a | b, [col(c).isNotNull() & col(c).isin(fmt_list) for c in pr_columns]), 1).otherwise(0))

def create_fake_pay1_race(df, pay1_provided, race_provided):
    if not pay1_provided:
        df = df.withColumn("PAY1", lit(999))
    if not race_provided:
        df = df.withColumn("RACE", lit(999))
    return df

print(f"Total Execution Time Part 1: {time.time() - start_time}")

In [None]:
#preprocessing data
input_data = input_data.withColumn("ICDVER", 
    when((col("YEAR") == 2015) & (col("DQTR") == 4), 33)
    .when((col("YEAR") == 2016) & (col("DQTR").isin(1, 2, 3)), 33)
    .when((col("YEAR") == 2016) & (col("DQTR") == 4), 34)
    .when((col("YEAR") == 2017) & (col("DQTR").isin(1, 2, 3)), 34)
    .when((col("YEAR") == 2017) & (col("DQTR") == 4), 35)
    .when((col("YEAR") == 2018) & (col("DQTR").isin(1, 2, 3)), 35)
    .when((col("YEAR") == 2018) & (col("DQTR") == 4), 36)
    .when((col("YEAR") == 2019) & (col("DQTR").isin(1, 2, 3)), 36)
    .when((col("YEAR") == 2019) & (col("DQTR") == 4), 37)
    .when((col("YEAR") == 2020) & (col("DQTR").isin(1, 2, 3)), 37)
    .when((col("YEAR") == 2020) & (col("DQTR") == 4), 38)
    .when((col("YEAR") == 2021) & (col("DQTR").isin(1, 2, 3)), 38)
    .when((col("YEAR") == 2021) & (col("DQTR") == 4), 39)
    .when((col("YEAR") == 2022) & (col("DQTR").isin(1, 2, 3)), 39)
    .when((col("YEAR") == 2022) & (col("DQTR") == 4), 40)
    .when((col("YEAR") == 2023) & (col("DQTR").isin(1, 2, 3)), 40)
    .when((col("YEAR") == 2023) & (col("DQTR") == 4), 41)
    .when((col("YEAR") == 2024) & (col("DQTR").isin(1, 2, 3)), 41)
    .otherwise(41)
)

# Create fake PAY1 and RACE if they are not in the input data
input_data = create_fake_pay1_race(input_data, pay1_provided, race_provided)

# Filter out records based on conditions
filtered_data = input_data.filter(
    (col("AGE") >= 12) & (col("AGE") <= 55) &
    col("SEX").isNotNull() &
    col("DX1").isNotNull() &
    col("DQTR").isNotNull() &
    col("YEAR").isNotNull()
)

#add columns AGECAT, RACECAT, PAYCAT, SEXCAT
filtered_data = filtered_data.withColumn("PAYCAT",
    when(col("PAY1") == 1, 1)
    .when(col("PAY1") == 2, 2)
    .when(col("PAY1") == 3, 3)
    .when(col("PAY1") == 4, 4)
    .when(col("PAY1") == 5, 5)
    .when(col("PAY1") == 5, 5)
    .otherwise(6)
)

filtered_data = filtered_data.withColumn("RACECAT",
    when(col("RACE") == 1, 1)
    .when(col("RACE") == 2, 2)
    .when(col("RACE") == 3, 3)
    .when(col("RACE") == 4, 4)
    .when(col("RACE") == 5, 5)
    .otherwise(6)
)

filtered_data = filtered_data.withColumn("AGECAT",
    when((col("AGE") < 18), 0)
    .when((col("AGE") >= 18) & (col("AGE") < 40), 1)
    .when((col("AGE") >= 40) & (col("AGE") < 65), 2)
    .when((col("AGE") >= 65) & (col("AGE") < 75), 3)
    .when(col("AGE") >= 75, 4)
    .otherwise(0)
)

filtered_data = filtered_data.withColumn("SEXCAT",
    when(col("SEX") == 1, 1)
    .when(col("SEX") == 2, 2)
    .otherwise(0)
)

print(f"Total Execution Time Part 2: {time.time() - start_time}")

In [None]:
#%%time
# KEY HOSPID YEAR DQTR AGE SEX
#               AGECAT SEXCAT PAYCAT RACECAT POVCAT LOS HOSPST
# 			  MYOCARD ANEURYSM ACUTE_RENAL RESP_DISTRESS AM_FLUID CARD_ARR CONV_CARD DISS_INTRA ECLAMPSIA 
# 			  HEART_FAIL PUERP PULM_ED ANES_COMP SEPSIS SHOCK SICKLE AIR_THROMB HYSTER TRACHEO VENT
# 			  MATH_ABORT DECEASED_FLAG ACUTE_RENAL3 DISS_INTRA3
#               TAMH01 TAMH02 TAMH03
# Calculate indicator
# Convert dictionary to PySpark mapping
mapping_expr = create_map([lit(x) for x in chain(*POVCAT.items())])
# Apply functions
filtered_data = mdx(filtered_data, "deliv_dx", DX_Delivery_Keys)
filtered_data = mpr(filtered_data, "deliv_pr", PR_Delivery_Keys)

#keep rows that has deliv_dx or deliv_pr 1
filtered_data = filtered_data.filter((col("deliv_dx") == 1) | (col("deliv_pr") == 1))
#print(filtered_data.count())
      
# Convert PSTCO to a 5-character FIPSTCO equivalent to `put(PSTCO,Z5.)`
filtered_data = filtered_data.withColumn("FIPSTCO", lpad(col("PSTCO"), 5, "0"))
#filtered_data = filtered_data.withColumn("FIPSTCO", expr("lpad(PSTCO, 5, 0)"))
#map POVCAT based on FIPSTCO
povcat = list(POVCAT.keys())
filtered_data = filtered_data.withColumn("POVCAT", when(col("FIPSTCO").isin(povcat), mapping_expr[col("FIPSTCO")]))

# Apply the mdx and mpr functions
output_data = filtered_data
output_data.persist(StorageLevel.MEMORY_AND_DISK) # added for optimization
#output_data = output_data.repartition(10, 'AGE')
output_data = mdx(output_data, "MYOCARD", DX_Acute_MyoCard_Infarct_Keys)
output_data = mdx(output_data, "ANEURYSM", DX_Aneurysm_Keys)
output_data = mdx(output_data, "ACUTE_RENAL", DX_Acute_Renal_Fail_Keys)
output_data = mdx(output_data, "ACUTE_RENAL3", DX_Acute_Renal_Fail_Keys) 
output_data = mpr(output_data, "ACUTE_RENAL_DIAL", DIALYIP_Keys) # additinal column for ACUTE_RENAL3
output_data = mdx(output_data, "RESP_DISTRESS", DX_Acute_Resp_Distress_Keys)
output_data = mdx(output_data, "AM_FLUID", DX_Amniotic_Fluid_Emb_Keys)
output_data = mdx(output_data, "CARD_ARR", DX_Card_Arrest_Vent_Fib_Keys)
output_data = mpr(output_data, "CONV_CARD", PR_Conv_Cardiac_Rhythm_Keys)
output_data = mdx(output_data, "DISS_INTRA", DX_Diss_Intravasc_Coagul_Keys)
output_data = mdx(output_data, "DISS_INTRA3", DX_Diss_Intravasc_Coagul3_Keys)
output_data = mdx(output_data, "ECLAMPSIA", DX_Eclampsia_Keys)
output_data = mdx(output_data, "HEART_FAIL", DX_Heart_Fail_Surgery_Keys)
output_data = mdx(output_data, "PUERP", DX_Puerp_Cerebrovascular_Keys)
output_data = mdx(output_data, "PULM_ED", DX_Pulmonary_Edema_Keys)
output_data = mdx(output_data, "ANES_COMP", DX_Severe_Anesth_Comp_Keys)
output_data = mdx(output_data, "SEPSIS", DX_Sepsis_Keys)
output_data = mdx(output_data, "SHOCK", DX_Shock_Keys)
output_data = mdx(output_data, "SICKLE", DX_Sickle_Cell_Crisis_Keys)
output_data = mdx(output_data, "AIR_THROMB", DX_Air_Thrombotic_Embolism_Keys)
output_data = mpr(output_data, "HYSTER", PR_Hysterectomy_Keys)
output_data = mpr(output_data, "TRACHEO", PR_Temp_Tracheostomy_Keys)
output_data = mpr(output_data, "VENT", PR_Ventilation_Keys)
output_data = mdx(output_data, "DX_ABORTION", DX_Abortion_Keys)
output_data = mpr(output_data, "PR_ABORTION", PR_Abortion_Keys)

output_data = output_data.withColumn("DECEASED_FLAG", when(col("DISP") == 20, 1).otherwise(0))
output_data = output_data.withColumn("ACUTE_RENAL3",when((col("ACUTE_RENAL3") == 1) & (col("ACUTE_RENAL_DIAL") == 1), 1).otherwise(None))

output_data = output_data.withColumn("TAMH01", when((col("DX_ABORTION") == 1)  | (col("PR_ABORTION") == 1), None).otherwise(greatest(
    col("MYOCARD"),
    col("ANEURYSM"),
    col("ACUTE_RENAL"),
    col("RESP_DISTRESS"),
    col("AM_FLUID"),
    col("CARD_ARR"),
    col("CONV_CARD"),
    col("DISS_INTRA"),
    col("ECLAMPSIA"),
    col("HEART_FAIL"),
    col("PUERP"),
    col("PULM_ED"),
    col("ANES_COMP"),
    col("SEPSIS"),
    col("SHOCK"),
    col("SICKLE"),
    col("AIR_THROMB"),
    col("HYSTER"),
    col("TRACHEO"),
    col("VENT")))) \
    .withColumn("TAMH02", when((col("DX_ABORTION") == 1)  | (col("PR_ABORTION") == 1), None).otherwise(greatest(
    col("MYOCARD"),
    col("ANEURYSM"),
    col("ACUTE_RENAL"),
    col("RESP_DISTRESS"),
    col("AM_FLUID"),
    col("CARD_ARR"),
    col("CONV_CARD"),
    col("DISS_INTRA"),
    col("ECLAMPSIA"),
    col("HEART_FAIL"),
    col("PUERP"),
    col("PULM_ED"),
    col("ANES_COMP"),
    col("SEPSIS"),
    col("SHOCK"),
    col("SICKLE"),
    col("AIR_THROMB"),
    col("HYSTER"),
    col("TRACHEO"),
    col("VENT"),
    col("DECEASED_FLAG")))) \
    .withColumn("TAMH03", when((col("DX_ABORTION") == 1)  | (col("PR_ABORTION") == 1), None).otherwise(greatest(
    col("MYOCARD"),
    col("ANEURYSM"),
    col("ACUTE_RENAL3"),
    col("RESP_DISTRESS"),
    col("AM_FLUID"),
    col("CARD_ARR"),
    col("CONV_CARD"),
    col("DISS_INTRA3"),
    col("ECLAMPSIA"),
    col("HEART_FAIL"),
    col("PUERP"),
    col("PULM_ED"),
    col("ANES_COMP"),
    col("SEPSIS"),
    col("SHOCK"),
    col("SICKLE"),
    col("AIR_THROMB"),
    col("HYSTER"),
    col("TRACHEO"),
    col("VENT"),
    col("DECEASED_FLAG"))))

#print('Calculation Complete')

#write output and stop spark session
#output_data.coalesce(1).write.mode("overwrite").csv("/Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA/sid_2021_10K-preprocessed-updated.csv", header=True)
print(f"Total Execution Time Part 3: {time.time() - start_time}")

In [None]:
#%%time
#This is code aggregate the code by columns
def report_aggregations(
    input_data: DataFrame, 
    group_columns: List[str], 
    measure_columns: List[str]
) -> DataFrame:
    """
    Efficiently performs multiple aggregation operations across different grouping levels.
    
    This function optimizes performance by:
    1. Using broadcast variables for small dimensions
    2. Caching intermediate results to avoid recomputation
    3. Using parallel execution where possible
    4. Optimizing the aggregation pipeline
    
    Args:
        input_data: The Spark DataFrame to aggregate
        group_columns: Columns to group by in separate aggregations
        measure_columns: Columns to compute statistics on
        
    Returns:
        DataFrame with all aggregations combined
    """
    # Cache the input data to avoid recomputation
    output_data = input_data.cache()
    
    # Create aggregation expressions
    sum_exprs = [spark_sum(mcol).alias(mcol) for mcol in measure_columns]
    count_exprs = [count(mcol).alias(f"P{mcol[1:]}") for mcol in measure_columns]
    mean_exprs = [mean(mcol).alias(f"O{mcol[1:]}") for mcol in measure_columns]
    all_exprs = sum_exprs + count_exprs + mean_exprs
    
    # Function to compute a single aggregation
    def compute_aggregation(grouping_cols=None):
        if grouping_cols:
            if not isinstance(grouping_cols, list):
                grouping_cols = [grouping_cols]
            return output_data.groupBy(*grouping_cols).agg(*all_exprs)
        else:
            return output_data.agg(*all_exprs)
    
    # Approach 1: Using concurrent.futures for parallel execution
    with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(group_columns) + 1)) as executor:
        # Submit global summary task
        future_global = executor.submit(compute_aggregation)
        
        # Submit individual column grouping tasks
        future_to_col = {executor.submit(compute_aggregation, col): col for col in group_columns}
        
        # Collect results
        result_dfs = []
        
        # Get global summary result
        try:
            global_df = future_global.result()
            # Add null values for grouping columns to global summary
            for col_name in group_columns:
                global_df = global_df.withColumn(col_name, lit(None))
            result_dfs.append(global_df)
        except Exception as e:
            print(f"Global aggregation failed: {e}")
            
        # Get column grouping results
        for future in concurrent.futures.as_completed(future_to_col):
            col_name = future_to_col[future]
            try:
                col_df = future.result()
                # Add null values for other grouping columns
                for other_col in [c for c in group_columns if c != col_name]:
                    col_df = col_df.withColumn(other_col, lit(None))
                result_dfs.append(col_df)
            except Exception as e:
                print(f"Aggregation for {col_name} failed: {e}")
    
    
    # Use unionByName with allowMissingColumns=True to handle different schemas
    final_result = reduce(
        lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), 
        result_dfs
    )
    
    # Apply scalar multiplications all at once instead of sequentially
    for col_name in ["OAMH01", "OAMH02", "OAMH03"]:
        if col_name in final_result.columns:
            final_result = final_result.withColumn(col_name, col(col_name) * lit(10000))
    
    # Sort the result once at the end
    # Use a list of columns that actually exist in the result
    existing_sort_columns = [c for c in group_columns if c in final_result.columns]
    sorted_result = final_result.orderBy(*existing_sort_columns)
    
    # Cache the final result
    sorted_result = sorted_result.persist(StorageLevel.MEMORY_AND_DISK)
    
    return sorted_result

# Execute the optimized function with your parameters
group_columns = key_columns#["PAYCAT", "YEAR", "HOSPST", "POVCAT", "RACECAT"]
measure_columns = ["TAMH01", "TAMH02", "TAMH03"]

# Run the optimized function
result = report_aggregations(output_data, group_columns, measure_columns)

# Show the result
#result.show()

In [None]:
result.coalesce(1).write.mode("overwrite").csv(output_path, header=True)

In [None]:
#%%time
# # Replace these with your actual column names and values
# #custom_stratum = "your_custom_stratum_column"  # Replace with actual column name
# group_columns = ["PAYCAT", "YEAR", "HOSPST", "POVCAT", "RACECAT"]
# measure_columns = ["TAMH01", "TAMH02", "TAMH03"]

# # Create a list to store all the aggregation results
# result_dfs = []

# # Create aggregation expressions for sum, count, and mean
# sum_exprs = [spark_sum(mcol).alias(mcol) for mcol in measure_columns]
# count_exprs = [count(mcol).alias(f"P{mcol[1:]}") for mcol in measure_columns]
# mean_exprs = [mean(mcol).alias(f"O{mcol[1:]}") for mcol in measure_columns]

# # Combine all expressions
# all_exprs = sum_exprs + count_exprs + mean_exprs

# # First do the global summary (ways 0)
# global_summary = output_data.agg(*all_exprs)

# result_dfs.append(global_summary)

# # Then do individual column groupings (ways 1)
# for group_column in group_columns:
#     col_summary = output_data.groupBy(group_column).agg(*all_exprs)
#     result_dfs.append(col_summary)

# # Union all the results
# final_result = result_dfs[0]
# for i in range(1, len(result_dfs)):
#     final_result = final_result.unionByName(result_dfs[i], allowMissingColumns=True)

# # Sort the result by all the grouping columns
# sorted_result = final_result.orderBy(
#      "PAYCAT", "YEAR", "HOSPST", "POVCAT", "RACECAT"
# )

# # for measure_col in measure_columns:
# #     sorted_result = sorted_result.filter(sorted_result[measure_col].isNotNull())
# #     # For empty strings
# #     sorted_result = sorted_result.filter(sorted_result[measure_col] != "")

# # sorted_result.show()

# sorted_result = sorted_result.withColumn("OAMH01", col("OAMH01") * lit(10000))
# sorted_result = sorted_result.withColumn("OAMH02", col("OAMH02") * lit(10000))
# sorted_result = sorted_result.withColumn("OAMH03", col("OAMH03") * lit(10000))

# #print(sorted_result)
# sorted_result.show()
# Save the sorted result
#sorted_result.coalesce(1).write.mode("overwrite").csv(f"/Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA/MHI-report-{data_size}.csv", header=True)

In [None]:
end_time=time.time()
print(f"Total Execution Time: {end_time - start_time}")

#sas_result_file=f"MHAO_v2024_21_{data_size}.txt"
#/Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA
# sas_outcome = spark.read.option("header", True).csv(f"{sas_result_file}")#spark.read.option("header", True).csv(f"{data_path}/{sas_result_file}")

# # Remove extra spaces from all string columns
# for col_name in sas_outcome.columns:
#     sas_outcome = sas_outcome.withColumn(col_name, trim(col(col_name)))  # Remove leading/trailing spaces


# column_mapping = {
#     "Race": "RACECAT",
#     "Poverty": "POVCAT",
#     "State": "HOSPST",
#     "Year":"YEAR",
#     "Payer":"PAYCAT",
#     "TAMH01":"TAMH01",
#     "TAMH02":"TAMH02",
#     "TAMH03":"TAMH03",
#     "PAMH01":"PAMH01",
#     "PAMH02":"PAMH02",
#     "PAMH03":"PAMH03",
#     "OAMH01":"OAMH01",
#     "OAMH02":"OAMH02",
#     "OAMH03":"OAMH03"
# }

# # Rename columns in the text DataFrame using the dictionary
# for old_col, new_col in column_mapping.items():
#     sas_outcome = sas_outcome.withColumnRenamed(old_col, new_col)
 
# # columns to compare
# non_key_columns = [col for col in result.columns if col not in key_columns]

# null_replacements = ["."]  # Define text values to treat as null

# for col_name in key_columns:
#     sas_outcome = sas_outcome.withColumn(col_name, when(col(col_name) == ".", "").otherwise(col(col_name)))
#     result = result.withColumn(col_name, when(col(col_name).isNull() , "").otherwise(col(col_name)))

# # result.show()
# # sas_outcome.show()

# # Perform inner join on key columns
# combined_outcome = result.alias("result").join(sas_outcome.alias("ref"), key_columns, "inner")

# #combined_outcome.show()

# # Compare non-key columns
# for col_name in non_key_columns:
#     combined_outcome = combined_outcome.withColumn(
#         f"{col_name}_match",
#         when(round(col(f"result.{col_name}"),5) == round(col(f"ref.{col_name}"),5), lit(1)).otherwise(lit(0))
#     )

# # combined_outcome.show()
# # combined_outcome.write.mode("overwrite").csv(f"/Users/mshaque/Workarea/Projects/qi-pyspark-poc/DATA/Comparison-{data_size}.csv", header=True)

# # Calculate accuracy by #rows
# total_records = combined_outcome.count()
# matching_records = combined_outcome.filter(expr(" and ".join([f"{col}_match = 1" for col in non_key_columns]))).count()
# accuracy = (matching_records / total_records) * 100 if total_records > 0 else 0


# print(f"Accuracy: {accuracy:.2f}% ({matching_records}/{total_records})")

# # Calculate Accuracy by data points
# total_data_points = combined_outcome.count() * len(non_key_columns)
# matching_data_points = combined_outcome.select([spark_sum(col(f"{col_header}_match")) for col_header in non_key_columns]).collect()[0]
# matching_data_points = sum(matching_data_points)

# accuracy = (matching_data_points / total_data_points) * 100 if total_data_points > 0 else 0
# print(f"Accuracy by Data Points: {accuracy:.2f}% ({matching_data_points}/{total_data_points})")


In [None]:
spark.stop()

In [None]:
total_time = end_time - start_time
print(execution_time_output_path)
fexecution = open(execution_time_output_path, "w")
fexecution.write(str(total_time * 1000))
fexecution.close()