In [None]:
# Import required libraries
import logging
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


In [None]:

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize the Spark session
logger.info("Starting Spark session")
spark = SparkSession.builder \
    .appName("Python Spark Home Property") \
    .master("local[*]") \
    .config("spark.master", "local") \
    .config("spark.driver.memory", "20g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memoryOverhead", "4g") \
    .config("spark.sql.debug.maxToStringFields", "1000") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.default.parallelism", "16") \
    .getOrCreate()


In [None]:

spark.sparkContext.setLogLevel("INFO")

# Define the file paths
home_property_path = "C:/Users/armen/Desktop/HomeProperty"
base_filename = "Property_202503_test.csv"


In [None]:

# Read the input CSV files
logger.info("Reading input CSV files")
df_home_property = spark.read.option("header", True).option("inferSchema", True).csv(f"{home_property_path}/{base_filename}")
logger.info("Read Property_202503_test.csv successfully")


In [None]:

# Clean the data
cleaned_df = df_home_property.withColumnRenamed("PROPERTY INDICATOR CODE", "PROPERTY_INDICATOR_CODE") \
    .filter(F.col("PROPERTY_INDICATOR_CODE").isNotNull())


In [None]:

# Read the Data Dictionary CSV
df_data_dictionary = spark.read.option("header", True).option("inferSchema", True).csv(f"{home_property_path}/Property_DataDictionary.csv")

# Join the DataFrames
joined_df = cleaned_df.join(df_data_dictionary, F.col("Property Indicator") == F.col("PROPERTY_INDICATOR_CODE"))


In [None]:

# Get distinct values of PROPERTY_INDICATOR_CODE
logger.info("Processing each PROPERTY_INDICATOR_CODE value")

distinct_values = df_data_dictionary.select("Property Indicator").distinct() \
    .rdd.map(lambda row: row[0]) \
    .filter(lambda x: x is not None and x.strip() != '') \
    .collect()


In [None]:

logger.info(f"Distinct PROPERTY_INDICATOR_CODE values: {distinct_values}")

# Process each distinct value and write the results
for property_indicator_code in distinct_values:
    try:
        output_path = f"{home_property_path}/property_indicator_codes/property_indicator_code_{property_indicator_code}"
        logger.info(f"Writing data for PROPERTY_INDICATOR_CODE = {property_indicator_code} to: {output_path}")

        filtered_df = joined_df.filter(F.col("PROPERTY_INDICATOR_CODE") == property_indicator_code)
        filtered_df.write.mode("overwrite").option("header", "true").csv(output_path)

        logger.info(f"Finished writing data for PROPERTY_INDICATOR_CODE = {property_indicator_code}")
    except Exception as e:
        logger.error(f"Error processing {property_indicator_code}: {e}")

logger.info("Finished processing all PROPERTY_INDICATOR_CODE values.")


In [None]:

# Stop the Spark session
logger.info("Stopping Spark session")
spark.stop()