# Importing Necessary Libraries

In [1]:
!pip install statsmodels dash dash_bootstrap_components pyspark





In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import plotly.express as px
from statsmodels.tsa.arima.model import ARIMA
import matplotlib.pyplot as plt
import warnings
from pyspark.sql import functions as F
from pyspark.sql.types import *
import dash_bootstrap_components as dbc
# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pandas as pd

# Importing data

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when
from pyspark.sql.types import StringType, IntegerType, DoubleType
import pandas as pd

# Create a Spark session
spark = SparkSession.builder \
    .appName("VA Dataset Preparation") \
    .getOrCreate()

# File paths
file_paths = {
    "house_index_data": "Dataset-VA\18100205.csv",
    "study_permit_by_country": "Dataset-VA\EN_ODP-TR-Study-IS_CITZ_sign_date(TR - SP CITZ).csv",
    "temp_worker_by_province": "EN_ODP-TR-Work-IMP_PT_NOC4.csv",
    "cpi_data" : "CPI_MONTHLY_BankOC.csv"
}

# Function to preprocess CPI data, including filtering for 2015 onwards and calculating yearly averages with 2 decimal places
def preprocess_cpi_data(file_path):
    # Load the CPI data
    cpi_df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Convert 'date' column to DateType and filter for years 2015 onwards
    cpi_df = cpi_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
    cpi_df = cpi_df.withColumn("Year", year(col("date"))).filter(col("Year") >= 2015)

    # Calculate yearly averages for each CPI-related column, rounding to 2 decimal places
    cpi_avg_df = cpi_df.groupBy("Year").agg(
        round(avg("Total CPI"), 2).alias("Avg_Total_CPI"),
        round(avg("Total_CPI_Seasonally_Adjusted)"), 2).alias("Avg_Total_CPI_Seasonally_Adjusted"),
        round(avg("STATIC_TOTALCPICHANGE"), 2).alias("Avg_STATIC_TOTALCPICHANGE"),
        round(avg("CPI_TRIM"), 2).alias("Avg_CPI_TRIM"),
        round(avg("CPI_MEDIAN"), 2).alias("Avg_CPI_MEDIAN")
    )

    return cpi_avg_df

# Function to read and preprocess CSV files
def read_and_preprocess_csv(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)

    # Replace '--' with 0 in all columns
    for column in df.columns:
        column_type = df.schema[column].dataType

        # Replace '--' with 0 for string or numeric columns
        if isinstance(column_type, (StringType, IntegerType, DoubleType)):
            df = df.withColumn(column, when(col(column) == '--', 0).otherwise(col(column)))

    df = df.na.fill(0)  # Replace any remaining nulls with 0
    return df

# Reusable function to perform common transformations
def transform_dataframe(df, rename_columns=None, skip_columns=0, convert_to_int=True):
    # Rename columns if necessary
    if rename_columns:
        for old_name, new_name in rename_columns.items():
            df = df.withColumnRenamed(old_name, new_name)

    # Convert columns to IntegerType (from `skip_columns` onward)
    for column in df.columns[skip_columns:]:
        column_type = df.schema[column].dataType

        # Only convert columns to IntegerType if they are numeric
        if isinstance(column_type, (IntegerType, DoubleType)):
            df = df.withColumn(column, col(column).cast("int"))

    return df

# Function to preprocess and group study permit data by Year and Country
def preprocess_study_permit_data(df):
    # Rename first column to 'Country'
    rename_columns = {df.columns[0]: "Country"}
    df = transform_dataframe(df, rename_columns=rename_columns)

    # Convert df2 to Pandas DataFrame for reshaping and further processing
    df_pandas = df.toPandas()

    # Reshape the dataset from wide to long format
    df_reshaped = df_pandas.melt(id_vars=["Country"], var_name="Date", value_name="Value")

    # Convert 'Value' column to numeric, errors='coerce' will convert non-numeric values to NaN
    df_reshaped['Value'] = pd.to_numeric(df_reshaped['Value'], errors='coerce')

    # Extract Year and Month from the Date column and reformat it to YYYY-MM
    df_reshaped['Year'] = df_reshaped['Date'].str[:2].astype(int) + 2000
    df_reshaped['Month'] = df_reshaped['Date'].str[3:6]
    df_reshaped['Date'] = pd.to_datetime(df_reshaped['Month'] + '-' + df_reshaped['Year'].astype(str), format='%b-%Y')

    # Drop unnecessary columns
    df_reshaped.drop(columns=["Month", "Year"], inplace=True)

    # Group by Country and Year, summing up the values
    df_grouped = df_reshaped.groupby([df_reshaped['Country'], df_reshaped['Date'].dt.year])['Value'].sum().reset_index()

    df_grouped.rename(columns={'Date': 'Year'}, inplace=True)

    # Convert the resulting DataFrame back to Spark DataFrame
    df_grouped_spark = spark.createDataFrame(df_grouped)

    return df_grouped_spark

# Function to preprocess and group temp worker data by Province, Class Title, and Year
def preprocess_temp_worker_data(df):
    # Rename first two columns to 'Province' and 'Class Title'
    rename_columns = {df.columns[0]: "Province", df.columns[1]: "Class Title"}
    df = transform_dataframe(df, rename_columns=rename_columns, skip_columns=2)

    # Convert df3 to Pandas DataFrame for reshaping and further processing
    df_pandas = df.toPandas()

    # Melt the DataFrame to long format
    df_long_melted = pd.melt(df_pandas, id_vars=['Province', 'Class Title'],
                             var_name='Date', value_name='Count')

    # Extract Year and Month from the 'Date' column (assuming it's in the format like '15-Jan')
    df_long_melted['Year'] = df_long_melted['Date'].str[:2].astype(int) + 2000  # Extract year (e.g., '15' -> '2015')
    df_long_melted['Month'] = df_long_melted['Date'].str[3:]  # Extract month (e.g., 'Jan')

    # Convert 'Count' to numeric, forcing any errors to NaN and then filling NaN with 0
    df_long_melted['Count'] = pd.to_numeric(df_long_melted['Count'], errors='coerce').fillna(0).astype(int)

    # Group by Province, Class Title, and Year, summing up the counts
    df_grouped = df_long_melted.groupby(['Province', 'Class Title', 'Year'], as_index=False)['Count'].sum()

    # Convert the resulting DataFrame back to Spark DataFrame
    df_grouped_spark = spark.createDataFrame(df_grouped)

    # Create a separate DataFrame to count occurrences of each "Class Title" for each year
    class_title_count_df = df.groupBy("Class Title").agg(
        *[F.sum(F.col(year)).alias(year) for year in df.columns[2:]]
    )

    return df_grouped_spark, class_title_count_df


def prepare_datasets(file_paths):
    processed_dfs = {}

    for key, file_path in file_paths.items():
        # Step 1: Read and preprocess CSV file
        df = read_and_preprocess_csv(file_path)

        # Load and preprocess each dataset based on its specific processing needs
        if key == "cpi_data":
            cpi_data = preprocess_cpi_data(file_path)
            processed_dfs["cpi_data_filtered"] = cpi_data

        elif key == "house_index_data":
            # House index data specific transformations
            df = transform_dataframe(df)  # Apply common transformations

            # Extract only the year part of REF_DATE
            if "REF_DATE" in df.columns:
                df = df.withColumn("REF_DATE", year(to_date(col("REF_DATE"), "yyyy-MM-dd")))

            # Filter by the year range (2015 to 2023)
            start_year = 2015
            end_year = 2023
            df = df.filter((col("REF_DATE") >= start_year) & (col("REF_DATE") <= end_year))

            # Drop unnecessary columns
            columns_to_drop = ['DGUID', 'STATUS', 'SYMBOL', 'TERMINATED', 'UOM', 'UOM_ID', 'SCALAR_ID', 'VECTOR', 'DECIMALS', 'SCALAR_FACTOR', 'COORDINATE']
            df = df.drop(*columns_to_drop)

            # Group by GEO, REF_DATE, and keep other columns while calculating average of 'Value'
            df = df.groupBy("REF_DATE", "GEO", "New housing price indexes").agg(
                F.avg("Value").alias("Avg_Value"),
                *[F.first(c).alias(c) for c in df.columns if c not in ["REF_DATE", "GEO", "VALUE", "New housing price indexes"]]  # Retain all other columns
            )

            # Round the avg_house_index to 2 decimal places
            df = df.withColumn("Avg_Value", F.round(col("Avg_Value"), 2))

            # Rename columns for readability
            df = df.withColumnRenamed("GEO", "Location") \
            .withColumnRenamed("REF_DATE", "Year") \
            .withColumnRenamed("New housing price indexes", "New Housing Price Index")

            # Handle cases where Location has no comma or extra spaces
            df = df.withColumn(
                "Province",
                F.when(
                    F.col("Location").contains(","),  # Check if Location contains a comma
                   F.when(F.col("Location").contains(","), F.split(F.trim(df["Location"]), ",")[1])  # Extract the part after the comma
                ).otherwise(F.col("Location"))  # If no comma, keep the Location value itself
            )

            # Optionally clean up by trimming spaces in Province
            df = df.withColumn("Province", F.trim(df["Province"]))

        elif key == "study_permit_by_country":
            # Process study permit data using the preprocessing function
            study_grouped = preprocess_study_permit_data(df)
            processed_dfs["study_permit_grouped_by_year"] = study_grouped

        elif key == "temp_worker_by_province":
            # Process temp worker data and obtain both grouped and class title count DataFrames
            temp_worker_grouped, class_title_count_df = preprocess_temp_worker_data(df)
            processed_dfs["temp_worker_grouped_by_year"] = temp_worker_grouped
            processed_dfs["class_title_count_per_year"] = class_title_count_df

        # Store processed dataframe in dictionary
        processed_dfs[key] = df

    return processed_dfs

# Run the function to prepare the datasets
processed_dfs = prepare_datasets(file_paths)

# Example to access processed dataframes:
cpi_data_filtered = processed_dfs["cpi_data_filtered"]
house_index_data = processed_dfs["house_index_data"]
study_permit_data = processed_dfs["study_permit_by_country"]
temp_worker_data = processed_dfs["temp_worker_by_province"]
study_permit_grouped_by_year = processed_dfs["study_permit_grouped_by_year"]
temp_worker_grouped_by_year = processed_dfs["temp_worker_grouped_by_year"]
class_title_count_per_year = processed_dfs["class_title_count_per_year"]

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/C:/Users/ovaiz/Desktop/MACS/Semester 3/CSCI6612 - Visual Analytics/Project/Dataset-VA8100205.csv.

In [None]:
house_index_data.show(5)

In [5]:
study_permit_data.show(5)

+-----------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|        _c0|15-Jan|15-Feb|15-Mar|15-Apr|15-May|15-Jun|15-Jul|15-Aug|15-Sep|15-Oct|15-Nov|15-Dec|16-Jan|16-Feb|16-Mar|16-Apr|16-May|16-Jun|16-Jul|16-Aug|16-Sep|16-Oct|16-Nov|16-Dec|

In [7]:
study_permit_grouped_by_year.show(5)

Py4JJavaError: An error occurred while calling o1060.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more


In [None]:
temp_worker_data.show(5)

In [None]:
temp_worker_grouped_by_year.show(5)

In [None]:
cpi_data_filtered.show(5)

In [None]:
# Aggregate by Country and Date (Year), summing the 'Value' column for study permits
df_aggregated = study_permit_grouped_by_year.groupBy("Year").agg(
    F.sum("Value").alias("Number_of_Study_Permits_Issued")
)

# Join the aggregated study permit data with house_index_data on the 'Year' column
df_combined = house_index_data.join(df_aggregated, on="Year", how="left")

# Calculate the Impact Metric: Ratio of Study Permits Issued to Housing Price Index
df_combined = df_combined.withColumn(
    "Study_Permit_To_Value_Impact_Metric",
    F.col("Number_of_Study_Permits_Issued") / F.col("Avg_Value")
)

# Round the Study_Permit_To_Value_Impact_Metric to 2 decimal places
df_combined = df_combined.withColumn("Study_Permit_To_Value_Impact_Metric", F.round(F.col("Study_Permit_To_Value_Impact_Metric"), 2))

# Define the mapping with individual `when` clauses
df_combined = df_combined.withColumn(
    "Province",
    when(F.col("Province") == "Quebec part", "Quebec")
    .when(F.col("Province") == "Ontario part", "Ontario")
    .when(F.col("Province") == "Fredericton", "New Brunswick")
    .when(F.col("Province") == "Prairie Region", "Alberta")
    .when(F.col("Province") == "Atlantic Region", "Nova Scotia")  # Adjust if necessary
    .when(F.col("Province") == "Canada", None)  # Exclude "Canada" from provinces
    .otherwise(F.col("Province"))
)

# Aggregate total count of temporary workers by Year and Province
workers_aggregated = temp_worker_grouped_by_year.groupBy("Year", "Province").agg(
    F.sum("Count").alias("Total_Workers_Count")
)

# Count the number of unique Class Titles by Year and Province
class_titles_count = temp_worker_grouped_by_year.groupBy("Year", "Province").agg(
    F.countDistinct("Class Title").alias("Number_of_Class_Titles")
)

# Find the most common Class Title per Year and Province
most_common_class = temp_worker_grouped_by_year.groupBy("Year", "Province").agg(
    F.first("Class Title").alias("Most_Common_Class_Title")
)

# Join the workers, class titles count, and common class data with the existing DataFrame
df_combined = df_combined \
    .join(workers_aggregated, on=["Year", "Province"], how="left") \
    .join(class_titles_count, on=["Year", "Province"], how="left") \
    .join(most_common_class, on=["Year", "Province"], how="left")

# Calculate the Impact Metric: Ratio of Workers Count to Housing Price Index
df_combined = df_combined.withColumn(
    "Workers_To_Housing_Impact_Ratio",
    F.col("Total_Workers_Count") / F.col("Avg_Value")
)

# Round the Workers_To_Housing_Impact_Ratio to 2 decimal places
df_combined = df_combined.withColumn("Workers_To_Housing_Impact_Ratio", F.round(F.col("Workers_To_Housing_Impact_Ratio"), 2))

# Add combined impact ratio (students and workers)
df_combined = df_combined.withColumn(
    "Combined_Impact_Ratio",
    (F.col("Study_Permit_To_Value_Impact_Metric") + F.col("Workers_To_Housing_Impact_Ratio")) / 2
)

# Round the Combined_Impact_Ratio to 2 decimal places
df_combined = df_combined.withColumn("Combined_Impact_Ratio", F.round(F.col("Combined_Impact_Ratio"), 2))

# Join with CPI data
df_combined = df_combined.join(cpi_data_filtered, on="Year", how="left")

# Display the combined DataFrame with all the added columns
df_combined.show(truncate=False)

In [None]:
# import dash
# from dash import dcc, html, Input, Output
# import pandas as pd
# import plotly.express as px
# import numpy as np

# # Assuming df_combined is a DataFrame, converting it to pandas DataFrame
# data = df_combined.toPandas()

# # Remove rows where 'Province' is null or NaN
# data = data.dropna(subset=["Province"])

# # Filter out only the numeric columns for correlation calculation
# numeric_data = data.select_dtypes(include=[np.number])
# correlation_matrix = numeric_data.corr()

# # Initialize the Dash app
# app = dash.Dash(__name__)

# app.layout = html.Div([
#     html.H1("Correlation Analysis for Housing Prices and Temporary Residents"),

#     # Dropdown to select province
#     html.Div([
#         html.Label("Select Province:"),
#         dcc.Dropdown(
#             id="province-dropdown",
#             options=[{"label": province, "value": province} for province in data["Province"].unique()],
#             value="British Columbia",
#             multi=False,
#             clearable=False,
#             style={'width': '50%'}
#         ),
#     ], style={'padding': '10px'}),

#     # Correlation heatmap
#     html.Div([
#         dcc.Graph(id="correlation-heatmap")
#     ], style={'padding': '20px', 'width': '90%', 'margin': 'auto'}),
# ])

# # Callback for the correlation heatmap
# @app.callback(
#     Output("correlation-heatmap", "figure"),
#     Input("province-dropdown", "value")
# )
# def update_correlation_heatmap(selected_province):
#     # Filter the data based on the selected province
#     province_data = data[data["Province"] == selected_province]
#     # Only numeric columns for correlation calculation
#     numeric_province_data = province_data.select_dtypes(include=[np.number])
#     corr_matrix = numeric_province_data.corr()

#     # Create the heatmap using Plotly
#     fig = px.imshow(
#         corr_matrix,
#         text_auto=True,
#         color_continuous_scale="Viridis",
#         labels={"color": "Correlation"},
#         title=f"Correlation Heatmap for {selected_province}",
#         width=800,  # Width of the heatmap in pixels
#         height=600  # Height of the heatmap in pixels
#     )
#     return fig

# if __name__ == "__main__":
#     app.run_server(debug=True)

In [None]:
df_combined.head(5)

In [None]:
import pandas as pd
import dash
import plotly.express as px
import plotly.graph_objects as go
from dash import dcc, html
from dash.dependencies import Input, Output
from statsmodels.tsa.arima.model import ARIMA
from sklearn.cluster import KMeans
from sklearn.ensemble import IsolationForest
import numpy as np

# Load your dataframe (df_combined)
# df_combined = df_combined.toPandas()

# Convert 'Year' to datetime
df_combined['Year'] = pd.to_datetime(df_combined['Year'], format='%Y')

# Initialize the Dash app
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Define the layout of the dashboard
app.layout = dbc.Container(
    [
        html.H1("Housing Price Analysis Dashboard", style={'text-align': 'center'}),
        html.Hr(),

        # 3x2 Grid Layout
        dbc.Row(
            [
                dbc.Col([  # Factors Affecting Housing Prices
                    html.H4("Factors Affecting Housing Prices"),
                    dcc.Dropdown(
                        id='province-dropdown',
                        options=[{'label': i, 'value': i} for i in df_combined['Province'].unique()],
                        multi=True,
                        value=['Ontario']
                    ),
                    dcc.Graph(id='scatter-factors')
                ], md=6),

                dbc.Col([  # Forecasting Housing Prices (ARIMA)
                    html.H4("Forecasting Housing Prices"),
                    dcc.Graph(id='forecast-chart'),
                    dcc.Slider(
                        id='forecast-slider',
                        min=1,
                        max=5,
                        step=1,
                        marks={i: str(i) for i in range(1, 6)},
                        value=1
                    )
                ], md=6),
            ],
            className="mb-4"
        ),

        # 2x2 Grid Layout for clustering and anomaly detection
        dbc.Row(
            [
                dbc.Col([  # Regional Clustering
                    html.H4("Regional Clustering"),
                    dcc.Graph(id='cluster-map'),
                ], md=6),

                dbc.Col([  # Anomaly Detection
                    html.H4("Anomaly Detection"),
                    dcc.Graph(id='anomaly-chart'),
                ], md=6),
            ],
            className="mb-4"
        ),
    ]
)

# Callback for visualizing factors affecting housing prices
@app.callback(
    Output('scatter-factors', 'figure'),
    [Input('province-dropdown', 'value')]
)
def update_scatter(selected_provinces):
    df_filtered = df_combined[df_combined['Province'].isin(selected_provinces)]
    fig = px.scatter(df_filtered, x='Study_Permit_To_Value_Impact_Metric',
                     y='New Housing Price Index', color='Province',
                     title="Study Permits vs Housing Price Index")
    return fig

# Callback for forecasting housing prices (ARIMA)
@app.callback(
    Output('forecast-chart', 'figure'),
    [Input('forecast-slider', 'value')]
)
def update_forecast(years_to_forecast):
    df_sorted = df_combined.sort_values('Year')
    model = ARIMA(df_sorted['New Housing Price Index'], order=(1, 1, 1))
    model_fit = model.fit()
    forecast = model_fit.forecast(steps=years_to_forecast)
    forecast_years = pd.date_range(start=df_sorted['Year'].max(), periods=years_to_forecast+1, freq='Y')[1:]

    fig = px.line(x=forecast_years, y=forecast, title="Forecasted Housing Prices")
    return fig

# Callback for clustering regions
@app.callback(
    Output('cluster-map', 'figure'),
    [Input('province-dropdown', 'value')]
)
def update_clustering(selected_provinces):
    df_filtered = df_combined[df_combined['Province'].isin(selected_provinces)]
    kmeans = KMeans(n_clusters=3)
    clusters = kmeans.fit_predict(df_filtered[['New Housing Price Index', 'Avg_Value', 'Total_Workers_Count']])

    df_filtered['Cluster'] = clusters
    fig = px.scatter(df_filtered, x='New Housing Price Index',
                     y='Avg_Value', color='Cluster',
                     title="Housing Price Clusters")
    return fig

# Callback for anomaly detection
@app.callback(
    Output('anomaly-chart', 'figure'),
    [Input('province-dropdown', 'value')]
)
def update_anomalies(selected_provinces):
    df_filtered = df_combined[df_combined['Province'].isin(selected_provinces)]
    model = IsolationForest(contamination=0.1)
    df_filtered['Anomaly'] = model.fit_predict(df_filtered[['New Housing Price Index']])

    fig = px.scatter(df_filtered, x='Year',
                     y='New Housing Price Index', color='Anomaly',
                     title="Anomaly Detection in Housing Prices")
    return fig

# Run the app
if __name__ == '__main__':
    app.run_server(debug=True)

In [None]:
# import dash
# from dash import dcc, html, Input, Output
# import pandas as pd
# import plotly.express as px
# from sklearn.cluster import KMeans
# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA

# # Sample DataFrame - Replace this with your actual combined DataFrame
# # Assuming df_combined is your combined data with numeric columns for clustering
# df_combined = pd.DataFrame({
#     'Feature1': [1, 2, 3, 4, 5, 6],
#     'Feature2': [2, 3, 4, 5, 6, 7],
#     'Feature3': [5, 4, 3, 2, 1, 0],
# })

# # Initialize the Dash app
# app = dash.Dash(__name__)

# app.layout = html.Div([
#     html.H1("Clustering Analysis Dashboard"),

#     # Dropdown to select number of clusters
#     html.Div([
#         html.Label("Select Number of Clusters:"),
#         dcc.Slider(
#             id="num-clusters",
#             min=2,
#             max=10,
#             step=1,
#             value=3,
#             marks={i: str(i) for i in range(2, 11)},
#         ),
#     ], style={'padding': '10px'}),

#     # Dropdowns to select features for clustering
#     html.Div([
#         html.Label("Select Feature for X-axis:"),
#         dcc.Dropdown(
#             id="x-axis-feature",
#             options=[{"label": col, "value": col} for col in df_combined.columns],
#             value="Feature1",  # Default to the first feature
#             multi=False,
#             clearable=False,
#             style={'width': '45%', 'display': 'inline-block'}
#         ),
#         html.Label("Select Feature for Y-axis:"),
#         dcc.Dropdown(
#             id="y-axis-feature",
#             options=[{"label": col, "value": col} for col in df_combined.columns],
#             value="Feature2",  # Default to the second feature
#             multi=False,
#             clearable=False,
#             style={'width': '45%', 'display': 'inline-block', 'marginLeft': '10px'}
#         ),
#     ], style={'padding': '10px'}),

#     # Scatter plot for clustering visualization
#     html.Div([
#         dcc.Graph(id="cluster-plot")
#     ], style={'padding': '20px'}),
# ])

# # Callback to update the clustering plot
# @app.callback(
#     Output("cluster-plot", "figure"),
#     [Input("num-clusters", "value"),
#      Input("x-axis-feature", "value"),
#      Input("y-axis-feature", "value")]
# )
# def update_cluster_plot(num_clusters, x_feature, y_feature):
#     # Select the features for clustering
#     X = df_combined[[x_feature, y_feature]]

#     # Ensure the features are numeric and scale them
#     scaler = StandardScaler()
#     X_scaled = scaler.fit_transform(X)

#     # Perform clustering
#     kmeans = KMeans(n_clusters=num_clusters, random_state=0)
#     clusters = kmeans.fit_predict(X_scaled)
#     df_combined["Cluster"] = clusters

#     # Create the scatter plot
#     fig = px.scatter(
#         df_combined,
#         x=x_feature,
#         y=y_feature,
#         color="Cluster",
#         title=f"Clustering Analysis with {num_clusters} Clusters",
#         color_continuous_scale="Viridis",
#         labels={"Cluster": "Cluster"},
#     )

#     # Add cluster centers to the plot
#     cluster_centers = kmeans.cluster_centers_
#     fig.add_scatter(
#         x=cluster_centers[:, 0], y=cluster_centers[:, 1],
#         mode="markers",
#         marker=dict(color="red", size=10, symbol="x"),
#         name="Cluster Centers"
#     )

#     # Customize layout
#     fig.update_layout(
#         xaxis_title=x_feature,
#         yaxis_title=y_feature,
#         template="plotly_white"
#     )

#     return fig

# if __name__ == "__main__":
#     app.run_server(debug=True)

In [None]:
# # Get all unique values in the 'Province' column
# unique_provinces = df_combined.select("Province").distinct().rdd.flatMap(lambda x: x).collect()
# print(unique_provinces)

In [None]:
# # Check for null values in each column of the DataFrame
# df_combined.select([F.count(F.when(col(c).isNull(), c)).alias(c) for c in df_combined.columns]).show()

#### Trend Analysis

In [None]:
# # Convert to Pandas DataFrame for easier manipulation
# pdf = house_index_data.toPandas()

# # Group by REF_DATE and GEO, taking the average value
# time_series_data = pdf.groupby(['REF_DATE', 'GEO']).agg({'VALUE': 'mean'}).reset_index()

# fig = px.line(time_series_data, x='REF_DATE', y='VALUE', color='GEO',
#               title='Average Housing Price Index Over Time',
#               labels={'VALUE': 'Average Housing Price Index', 'REF_DATE': 'Date'})
# fig.show()

#### Compare Housing Price Indexes Across Regions

In [None]:
# # Aggregating data by GEO
# region_comparison = pdf.groupby('GEO').agg({'VALUE': 'mean'}).reset_index()

# # Plotting comparison of housing prices by region
# fig2 = px.bar(region_comparison, x='GEO', y='VALUE',
#                title='Average Housing Price Index by Region',
#                labels={'VALUE': 'Average Housing Price Index', 'GEO': 'Region'})
# fig2.show()

In [None]:
# # Set REF_DATE as index
# # time_series_data = time_series_data.asfreq('MS')
# time_series_data.set_index('REF_DATE', inplace=True)
# ts = time_series_data[time_series_data['GEO'] == 'Canada']['VALUE']

# # Fit ARIMA model (parameters (p, d, q) should be tuned)
# model = ARIMA(ts, order=(1, 1, 1))
# model_fit = model.fit()

# # Make predictions
# forecast = model_fit.forecast(steps=12)  # Forecasting next 12 months

# # Plot the results
# plt.figure(figsize=(10, 5))
# plt.plot(ts, label='Historical Data')
# plt.plot(forecast, label='Forecast', color='red')
# plt.title('ARIMA Forecast for Canada Housing Price Index')
# plt.xlabel('Date')
# plt.ylabel('Housing Price Index')
# plt.legend()
# plt.show()

In [None]:
# # Initialize Spark session
# spark = SparkSession.builder \
#     .appName("Housing Price Clustering") \
#     .getOrCreate()

# # Assuming df1 is your input DataFrame
# # Group by GEO and calculate the average VALUE
# region_data = house_index_data.groupBy("GEO").agg(avg("VALUE").alias("avg_value"))

# # Convert to Pandas DataFrame for clustering
# region_pdf = region_data.toPandas()

# # Create a feature vector from avg_value
# assembler = VectorAssembler(inputCols=["avg_value"], outputCol="features")
# region_data_vector = assembler.transform(region_data)

# # Apply KMeans clustering
# kmeans = KMeans(k=3, seed=1)  # 3 clusters
# model = kmeans.fit(region_data_vector)

# # Make predictions
# predictions = model.transform(region_data_vector)

# # Convert predictions to Pandas DataFrame
# predictions_pdf = predictions.select("GEO", "avg_value", "prediction").toPandas()

# # Define cluster labels
# label_map = {0: 'High', 1: 'Low', 2: 'Medium'}
# predictions_pdf['cluster_label'] = predictions_pdf['prediction'].map(label_map)

# # Plotting the clusters based on region
# fig = px.scatter(predictions_pdf, x='GEO', y='avg_value', color='cluster_label',
#                  title='K-Means Clustering of Housing Price Index by Region',
#                  labels={'avg_value': 'Average Housing Price Index', 'GEO': 'Region'},
#                  color_discrete_sequence=px.colors.qualitative.Set1)

# # Customize layout for better visibility
# fig.update_traces(marker=dict(size=12))
# fig.update_layout(xaxis_title='Region', yaxis_title='Average Housing Price Index')

# # Show plot
# fig.show()

In [None]:
# Stop the Spark session
# spark.stop()