# Part 1. Data Preprocessing

## **I. Excel Sheet Extraction and CSV Conversion for PySpark Preprocessing**

##### The original raw data file is an Excel workbook (`.xlsx`) containing three separate sheets: `VNI`, `XAUUSD`, and `BTCUSD`. Since PySpark does not natively support reading Excel files, especially those with multiple sheets, it is necessary to first convert each sheet into its own CSV file. This script performs that conversion by reading each sheet and exporting it as an individual CSV. Once the data is in CSV format, each file can then be independently loaded into PySpark for preprocessing tasks such as cleaning, transformation, and analysis.


In [4]:
import pandas as pd
import openpyxl 

# Path to your Excel file
excel_file = 'C:/Users/ADMIN88/Documents/Big data analysis/Final project/price_raw.xlsx'

# Read all specified sheets into a dictionary (using openpyxl since it's an .xlsx file)
all_sheets = pd.read_excel(excel_file, sheet_name=['VNI', 'XAUUSD', 'BTCUSD'], engine='openpyxl')

# Loop through each sheet and save it as a separate CSV file
for sheet_name, df in all_sheets.items():
    csv_file = f"{sheet_name}.csv"
    df.to_csv(csv_file, index=False, encoding='utf-8-sig')
    print(f"✅ Sheet '{sheet_name}' has been saved as: {csv_file}")

print("🎉 All sheets have been successfully exported to CSV files!")

✅ Sheet 'VNI' has been saved as: VNI.csv
✅ Sheet 'XAUUSD' has been saved as: XAUUSD.csv
✅ Sheet 'BTCUSD' has been saved as: BTCUSD.csv
🎉 All sheets have been successfully exported to CSV files!


## **II. BTCUSD Data Cleaning and Preprocessing with PySpark**

### 1. Initialize SparkSession for BTC Data Preprocessing

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, when, col
from pyspark.sql.types import DoubleType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("BTC Data Cleaning") \
    .getOrCreate()

#### Import Libraries and Initialize SparkSession for BTC Data Cleaning

This section sets up the PySpark environment needed to perform data preprocessing tasks on the BTC (Bitcoin) historical price data. 

1. **Imports required PySpark libraries:**
   - `SparkSession`: Main entry point for working with DataFrames and the Spark SQL engine.
   - `regexp_replace`, `when`, `col`: Common PySpark functions for cleaning and transforming data.
   - `DoubleType`: Used to explicitly cast columns to `double` type.

2. **Initializes a Spark session:**
   - Names the application `"BTC Data Cleaning"` for tracking in the Spark UI.
   - Uses `.getOrCreate()` to avoid creating a new session if one already exists.

### 2. Data preprocessing

#### Load BTCUSD CSV File into a PySpark DataFrame

In this step, we load the raw historical BTCUSD data from a CSV file into a PySpark DataFrame for further processing.

1. **Read the CSV file into a DataFrame:**
   - Uses `spark.read.csv()` to read the file located at the specified path.
   - `header=True`: Treats the first row of the file as column headers.
   - `inferSchema=True`: Automatically infers data types for each column.

2. **Preview the DataFrame:**
   - `.show(10)`: Displays the first 10 rows of the DataFrame to give a quick look at the data structure and content.

In [2]:
# Read CSV file
btc_df = spark.read.csv("C:/Users/ADMIN88/Documents/Big data analysis/Final project/BTCUSD.csv", header=True, inferSchema=True)
# Show the first few rows
btc_df.show(10)

+----------+--------+--------+--------+--------+-------+--------+
|      Date|   Price|    Open|    High|     Low|   Vol.|Change %|
+----------+--------+--------+--------+--------+-------+--------+
|2025-05-10|103511.7|102974.7|104072.0|102837.2| 55.57K|  0.0052|
|2025-05-09|102975.1|103270.7|104308.7|102365.7| 87.34K| -0.0029|
|2025-05-08|103274.3| 97035.1|103885.4| 96901.9|110.60K|  0.0643|
|2025-05-07| 97035.1| 96817.9| 97647.5| 95798.4| 59.98K|  0.0022|
|2025-05-06| 96825.4| 94745.9| 96893.9| 93408.8| 55.76K|   0.022|
|2025-05-05| 94745.2| 94314.6| 95206.8| 93590.8| 59.21K|  0.0045|
|2025-05-04| 94316.9| 95885.5| 96297.8| 94206.1| 41.31K| -0.0164|
|2025-05-03| 95885.8| 96894.3| 96922.2| 95785.5| 36.24K| -0.0104|
|2025-05-02| 96894.4| 96497.3| 97881.8| 96359.3| 51.94K|  0.0041|
|2025-05-01| 96499.3| 94181.3| 97394.3| 94168.1| 74.56K|  0.0246|
+----------+--------+--------+--------+--------+-------+--------+
only showing top 10 rows



#### 📊 Dataset Preview: BTCUSD Historical Price Data

This dataset contains historical daily trading data for Bitcoin (BTC) priced in USD, ranging from 2015 to 2025. It includes price movements, trading volume, and daily percentage changes. The data appears to be structured and time-series oriented, suitable for financial analysis and machine learning preprocessing.

#### 🧾 Column Descriptions:

| Column     | Description                                                                 |
|------------|-----------------------------------------------------------------------------|
| `Date`     | The calendar date of the BTC trading session (format: YYYY-MM-DD)          |
| `Price`    | The closing price of Bitcoin on that date (in USD)                         |
| `Open`     | The opening price of Bitcoin for that date                                  |
| `High`     | The highest price Bitcoin reached during the trading day                    |
| `Low`      | The lowest price Bitcoin reached during the trading day                     |
| `Vol.`     | The trading volume for that day (in thousands, e.g., 55.57K = 55,570 BTC)   |
| `Change %` | The percentage change in closing price from the previous day                |


#### 🔄 Renaming Columns for Cleaner Processing

To prepare the dataset for smoother data manipulation and avoid issues caused by special characters, we renamed the following columns:

- **`Vol.` → `Vol`** :  The period `.` can lead to syntax issues in Spark SQL expressions or when referencing column names programmatically. Removing it ensures better compatibility.

- **`Change %` → `Change`** : The percentage symbol `%` may cause parsing errors or interfere with Spark functions. Renaming it eliminates potential problems during transformation or analysis.

In [3]:
# Rename columns
btc_df = btc_df.withColumnRenamed("Vol.", "Vol") \
               .withColumnRenamed("Change %", "Change")

# Show the result
btc_df.show(10)

+----------+--------+--------+--------+--------+-------+-------+
|      Date|   Price|    Open|    High|     Low|    Vol| Change|
+----------+--------+--------+--------+--------+-------+-------+
|2025-05-10|103511.7|102974.7|104072.0|102837.2| 55.57K| 0.0052|
|2025-05-09|102975.1|103270.7|104308.7|102365.7| 87.34K|-0.0029|
|2025-05-08|103274.3| 97035.1|103885.4| 96901.9|110.60K| 0.0643|
|2025-05-07| 97035.1| 96817.9| 97647.5| 95798.4| 59.98K| 0.0022|
|2025-05-06| 96825.4| 94745.9| 96893.9| 93408.8| 55.76K|  0.022|
|2025-05-05| 94745.2| 94314.6| 95206.8| 93590.8| 59.21K| 0.0045|
|2025-05-04| 94316.9| 95885.5| 96297.8| 94206.1| 41.31K|-0.0164|
|2025-05-03| 95885.8| 96894.3| 96922.2| 95785.5| 36.24K|-0.0104|
|2025-05-02| 96894.4| 96497.3| 97881.8| 96359.3| 51.94K| 0.0041|
|2025-05-01| 96499.3| 94181.3| 97394.3| 94168.1| 74.56K| 0.0246|
+----------+--------+--------+--------+--------+-------+-------+
only showing top 10 rows



#### 🧹 Cleaning the 'Vol' Column for Numeric Representation

The `Vol` column in the dataset contains values with suffixes like 'K', 'M', and 'B', representing thousands, millions, and billions respectively. To perform accurate calculations and analyses, these values need to be converted into their actual numeric form. Here's how we cleaned the column:

1. **Handling 'K' (Thousands)**:  
   If the `Vol` value ends with 'K', we remove the 'K' and multiply the remaining number by 1,000 to convert it to the actual number.

2. **Handling 'M' (Millions)**:  
   If the `Vol` value ends with 'M', we remove the 'M' and multiply the remaining number by 1,000,000 to convert it into millions.

3. **Handling 'B' (Billions)**:  
   If the `Vol` value ends with 'B', we remove the 'B' and multiply the remaining number by 1,000,000,000 to convert it into billions.

4. **Other Cases (No Suffix)**:  
   If the `Vol` value does not contain any suffix, we simply remove any commas and cast it into a numeric type (`DoubleType`).

This cleaning ensures that the `Vol` column is consistently represented as a numeric value, making it easier to perform analysis, comparisons, and calculations. By transforming the 'K', 'M', and 'B' suffixes into their actual values, we ensure that the data is more suitable for numerical processing and analysis.


In [4]:
# Clean 'Vol' column
btc_df = btc_df.withColumn(
    "Vol_clean",
    when(col("Vol").endswith("K"), regexp_replace(col("Vol"), "K", "").cast(DoubleType()) * 1_000)
    .when(col("Vol").endswith("M"), regexp_replace(col("Vol"), "M", "").cast(DoubleType()) * 1_000_000)
    .when(col("Vol").endswith("B"), regexp_replace(col("Vol"), "B", "").cast(DoubleType()) * 1_000_000_000)
    .otherwise(regexp_replace(col("Vol"), ",", "").cast(DoubleType()))
)

#### 🧹 Finalizing the 'Vol' Column Clean-up

After transforming the `Vol` column into numeric values and creating a new column `Vol_clean`, we need to finalize the changes. This involves removing the old `Vol` column and renaming the `Vol_clean` column to `Vol` for consistency.

1. **Dropping the Old 'Vol' Column**:  
   The old `Vol` column, which still contained non-numeric values (e.g., 'K', 'M', 'B'), is dropped to ensure that only the cleaned data is retained.

2. **Renaming 'Vol_clean' to 'Vol'**:  
   The `Vol_clean` column, which now contains numeric values, is renamed back to `Vol` to maintain the original column name while ensuring the data is in a usable format.

In [5]:
# Drop old 'Vol' colum
btc_df = btc_df.drop("Vol")

# Rename cleaned columns
btc_df = btc_df.withColumnRenamed("Vol_clean", "Vol") 

# ✅ Check: show sample
btc_df.show(10)

+----------+--------+--------+--------+--------+-------+--------+
|      Date|   Price|    Open|    High|     Low| Change|     Vol|
+----------+--------+--------+--------+--------+-------+--------+
|2025-05-10|103511.7|102974.7|104072.0|102837.2| 0.0052| 55570.0|
|2025-05-09|102975.1|103270.7|104308.7|102365.7|-0.0029| 87340.0|
|2025-05-08|103274.3| 97035.1|103885.4| 96901.9| 0.0643|110600.0|
|2025-05-07| 97035.1| 96817.9| 97647.5| 95798.4| 0.0022| 59980.0|
|2025-05-06| 96825.4| 94745.9| 96893.9| 93408.8|  0.022| 55760.0|
|2025-05-05| 94745.2| 94314.6| 95206.8| 93590.8| 0.0045| 59210.0|
|2025-05-04| 94316.9| 95885.5| 96297.8| 94206.1|-0.0164| 41310.0|
|2025-05-03| 95885.8| 96894.3| 96922.2| 95785.5|-0.0104| 36240.0|
|2025-05-02| 96894.4| 96497.3| 97881.8| 96359.3| 0.0041| 51940.0|
|2025-05-01| 96499.3| 94181.3| 97394.3| 94168.1| 0.0246| 74560.0|
+----------+--------+--------+--------+--------+-------+--------+
only showing top 10 rows



#### 🔄 Reordering Columns for Better Structure

In the preview of the dataset, we noticed that the `Vol` column currently appears after the `Change` column. For consistency and clarity in data presentation, we decided to reorder the columns so that the `Vol` column appears before the `Change` column.

1. **Reordering Columns**:  
   We defined the desired column order, specifying that `Vol` should come before `Change`. This ensures that the columns are logically arranged for easier analysis and reporting.

2. **Applying the New Order**:  
   Using the `select()` method, we applied the new column order to the DataFrame, which repositions `Vol` before `Change`.

In [6]:
# Reorder columns (move 'Vol' before 'Change')
desired_column_order = ['Date', 'Price', 'Open', 'High', 'Low', 'Vol', 'Change']
btc_df = btc_df.select(desired_column_order)

# Show the reordered DataFrame
btc_df.show(10)


+----------+--------+--------+--------+--------+--------+-------+
|      Date|   Price|    Open|    High|     Low|     Vol| Change|
+----------+--------+--------+--------+--------+--------+-------+
|2025-05-10|103511.7|102974.7|104072.0|102837.2| 55570.0| 0.0052|
|2025-05-09|102975.1|103270.7|104308.7|102365.7| 87340.0|-0.0029|
|2025-05-08|103274.3| 97035.1|103885.4| 96901.9|110600.0| 0.0643|
|2025-05-07| 97035.1| 96817.9| 97647.5| 95798.4| 59980.0| 0.0022|
|2025-05-06| 96825.4| 94745.9| 96893.9| 93408.8| 55760.0|  0.022|
|2025-05-05| 94745.2| 94314.6| 95206.8| 93590.8| 59210.0| 0.0045|
|2025-05-04| 94316.9| 95885.5| 96297.8| 94206.1| 41310.0|-0.0164|
|2025-05-03| 95885.8| 96894.3| 96922.2| 95785.5| 36240.0|-0.0104|
|2025-05-02| 96894.4| 96497.3| 97881.8| 96359.3| 51940.0| 0.0041|
|2025-05-01| 96499.3| 94181.3| 97394.3| 94168.1| 74560.0| 0.0246|
+----------+--------+--------+--------+--------+--------+-------+
only showing top 10 rows



#### 📋 Checking the Schema of the DataFrame

In [7]:
# Check the schema
btc_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Price: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Vol: double (nullable = true)
 |-- Change: double (nullable = true)



#### ✅ Schema Validation Result

After running `printSchema()`, we confirmed that all columns in the DataFrame have been correctly interpreted and cast to appropriate data types:

- `Date`: `date` — correctly parsed from string format.
- `Price`, `Open`, `High`, `Low`, `Vol`, `Change`: all are `double` — ensuring numerical operations can be performed efficiently.

#### 🔎 Missing value check in Dataset

In [8]:
from pyspark.sql.functions import col, sum

# Count nulls in each column
null_counts = btc_df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in btc_df.columns
])

# Collect the result to the driver
null_counts_dict = null_counts.collect()[0].asDict()

null_counts.show()

# Print detailed results
print("🔍 Null Value Check:")
for column, null_count in null_counts_dict.items():
    if null_count == 0:
        print(f"✅ Column '{column}' has no missing values.")
    else:
        print(f"⚠️ Column '{column}' has {null_count} missing value(s).")

+----+-----+----+----+---+---+------+
|Date|Price|Open|High|Low|Vol|Change|
+----+-----+----+----+---+---+------+
|   0|    0|   0|   0|  0|  0|     0|
+----+-----+----+----+---+---+------+

🔍 Null Value Check:
✅ Column 'Date' has no missing values.
✅ Column 'Price' has no missing values.
✅ Column 'Open' has no missing values.
✅ Column 'High' has no missing values.
✅ Column 'Low' has no missing values.
✅ Column 'Vol' has no missing values.
✅ Column 'Change' has no missing values.


#### 🔎 Duplicate Row Check in Dataset

In [9]:
# ✅ Check for duplicate rows
total_rows = btc_df.count()
distinct_rows = btc_df.distinct().count()
duplicate_count = total_rows - distinct_rows

# 📝 Print result
print("🔎 Duplicate Row Check:")
if duplicate_count == 0:
    print("✅ No duplicate rows found in the dataset.")
else:
    print(f"⚠️ Found {duplicate_count} duplicate row(s) in the dataset.")

🔎 Duplicate Row Check:
✅ No duplicate rows found in the dataset.


#### 📋 Statistical results summary

In [44]:
# Detailed summary statistics including percentiles
btc_df.summary().show()

+-------+-----------------+------------------+------------------+------------------+--------------------+--------------------+
|summary|            Price|              Open|              High|               Low|                 Vol|              Change|
+-------+-----------------+------------------+------------------+------------------+--------------------+--------------------+
|  count|             3783|              3783|              3783|              3783|                3783|                3783|
|   mean|22607.29299497749|22580.113269891546|23094.781205392585|22040.250489029742| 1.632940622521808E7|0.002199444885011892|
| stddev|25494.95324271896|25463.596431494963|25996.768121511122|  24916.4600436307|1.7787544331153402E8| 0.03641571685211734|
|    min|            164.9|             164.9|             212.6|             157.3|               260.0|             -0.3918|
|    25%|           2900.3|            2883.3|            2977.9|            2807.4|             56940.0|      

#### 📊 Key Inpretations: 

This summary table covers **3,783 daily records** of Bitcoin (BTC) trading data. It provides insights into BTC's price behavior, volatility, trading volume, and daily returns over a long time horizon—likely from the early days of Bitcoin trading through 2025.

- **Price Trends**:
  - The **average closing price** was approximately **$22,607.29**, reflecting substantial long-term value growth for BTC.
  - Prices ranged from as low as **$164.90** to a staggering **$106,157.20**, highlighting Bitcoin's **explosive price evolution** over the years.
  - The **median (50th percentile)** price was **$10,184.80**, while the 75th percentile jumped to **$36,982.10**, showing a **positively skewed distribution** influenced by recent bull markets.

- **Volatility**:
  - BTC exhibits **very high standard deviation** (~$25,495), reinforcing its reputation for volatility.
  - The volatility is consistent across `Open`, `High`, and `Low` prices, confirming that Bitcoin regularly experiences wide intraday price swings.

- **Volume**:
  - The **average daily trading volume** was **~16.33 million**, but the **maximum recorded volume reached 4.47 billion**, indicating **extreme spikes in market activity**, likely during speculative bubbles or market corrections.
  - High **stddev of volume (~178M)** suggests **inconsistent liquidity**, often driven by sentiment and news cycles.

- **Daily Change %**:
  - The **mean daily return** is **0.22%**, indicating a general upward bias in price action.
  - However, the **standard deviation** of daily returns is **3.64%**, showcasing **frequent and large fluctuations** in day-to-day returns.
  - Extreme values range from **-39.18%** to **+27.2%**, underscoring Bitcoin's susceptibility to sharp market moves.


#### 📅 Sorting by Date for Time Series Consistency
When working with time series data such as Bitcoin historical prices, maintaining chronological order is essential. Without explicitly sorting the dataset, the rows may appear in a random or inconsistent order, which can lead to incorrect insights during trend analysis, forecasting, or visualizations. To ensure the dataset follows the correct timeline, we sort the data by the Date column in ascending order:

In [10]:
# Sort by ascending date
df_order = btc_df.orderBy("Date")

# Print confirmation message
print("✅ The dataset has been sorted by date in ascending order.")

✅ The dataset has been sorted by date in ascending order.


### 3. Export Cleaned BTC Data to CSV and Stop Spark Session

In [12]:
# Save the result to a CSV file
df_order.toPandas().to_csv("C:/Users/ADMIN88/Documents/Big data analysis/Final project/BTC_cleaned.csv", index=False)

# Print confirmation message
print("✅ The CSV file has been saved successfully.")

# Stop the Spark session
spark.stop()

✅ The CSV file has been saved successfully.


#### 💾 Saving the Cleaned Dataset to CSV

After completing all necessary data cleaning and preprocessing steps, we export the final DataFrame to a CSV file for further analysis or sharing. The code performs the following:

1. **Convert to Pandas DataFrame**:  
   We convert the Spark DataFrame `df_order` to a Pandas DataFrame using `.toPandas()` for compatibility with the `.to_csv()` function.

2. **Export to CSV**:  
   The cleaned dataset is saved to the specified path

3. **Terminate Spark Session**:  
Finally, we stop the active Spark session with `spark.stop()` to release resources.

This step finalizes our data pipeline and prepares the cleaned dataset for downstream tasks such as visualization, modeling, or reporting.




## **III. XAU Data Cleaning and Preprocessing with PySpark**

### 1. Initialize SparkSession for XAU Data Preprocessing

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, when, col
from pyspark.sql.types import DoubleType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("XAU Data Cleaning") \
    .getOrCreate()

#### Import Libraries and Initialize SparkSession for XAU Data Cleaning

This section sets up the PySpark environment needed to perform data preprocessing tasks on the XAU (Gold) historical price data. 

1. **Imports required PySpark libraries:**
   - `SparkSession`: Main entry point for working with DataFrames and the Spark SQL engine.
   - `regexp_replace`, `when`, `col`: Common PySpark functions for cleaning and transforming data.
   - `DoubleType`: Used to explicitly cast columns to `double` type.

2. **Initializes a Spark session:**
   - Names the application `"XAU Data Cleaning"` for tracking in the Spark UI.
   - Uses `.getOrCreate()` to avoid creating a new session if one already exists.

### 2. Data preprocessing

#### Load XAUUSD CSV File into a PySpark DataFrame

In this step, we load the raw historical XAUUSD data from a CSV file into a PySpark DataFrame for further processing.

1. **Read the CSV file into a DataFrame:**
   - Uses `spark.read.csv()` to read the file located at the specified path.
   - `header=True`: Treats the first row of the file as column headers.
   - `inferSchema=True`: Automatically infers data types for each column.

2. **Preview the DataFrame:**
   - `.show(10)`: Displays the first 10 rows of the DataFrame to give a quick look at the data structure and content.

In [23]:
# Read CSV file
xau_df = spark.read.csv("C:/Users/ADMIN88/Documents/Big data analysis/Final project/XAUUSD.csv", header=True, inferSchema=True)
# Show the first few rows
xau_df.show(10)

+----------+------+------+------+------+-------+--------+
|      Date| Price|  Open|  High|   Low|   Vol.|Change %|
+----------+------+------+------+------+-------+--------+
|2025-05-09|3344.0|3310.2|3352.7|3278.9|210.27K|  0.0115|
|2025-05-08|3306.0|3373.1|3422.0|3293.3|327.13K| -0.0253|
|2025-05-07|3391.9|3448.1|3448.2|3367.0|304.24K|  -0.009|
|2025-05-06|3422.8|3345.7|3444.5|3332.1|268.10K|  0.0303|
|2025-05-05|3322.3|3247.1|3346.7|3243.1|184.90K|  0.0244|
|2025-05-02|3243.3|3247.6|3277.0|3229.5|210.99K|  0.0065|
|2025-05-01|3222.2|3299.0|3300.6|3209.4|214.69K| -0.0292|
|2025-04-30|3319.1|3324.5|3337.6|3275.6|207.72K| -0.0043|
|2025-04-29|3333.6|3354.9|3359.3|3309.2|170.15K|  2.0E-4|
|2025-04-28|3333.0|3318.3|3347.7|3266.3|  1.30K|  0.0105|
+----------+------+------+------+------+-------+--------+
only showing top 10 rows



#### 📊 Dataset Preview: XAUUSD Historical Price Data

This dataset contains historical daily trading data for Gold (XAU) priced in USD, ranging from **January 2, 2015** to **May 9, 2025**. It captures daily market activity including price levels, trading volumes, and day-over-day percentage changes. 

#### 🧾 Column Descriptions:

| Column     | Description                                                                 |
|------------|-----------------------------------------------------------------------------|
| `Date`     | The calendar date of the XAU trading session (format: YYYY-MM-DD)          |
| `Price`    | The closing price of Gold on that date (in USD per ounce)                  |
| `Open`     | The opening price of Gold on that date                                      |
| `High`     | The highest price Gold reached during the trading day                       |
| `Low`      | The lowest price Gold reached during the trading day                        |
| `Vol.`     | The trading volume for that day (in thousands, e.g., 210.27K = 210,270 oz)  |
| `Change %` | The percentage change in closing price compared to the previous day         |


#### 🔄 Renaming Columns for Cleaner Processing

To prepare the dataset for smoother data manipulation and avoid issues caused by special characters, we renamed the following columns:

- **`Vol.` → `Vol`** :  The period `.` can lead to syntax issues in Spark SQL expressions or when referencing column names programmatically. Removing it ensures better compatibility.

- **`Change %` → `Change`** : The percentage symbol `%` may cause parsing errors or interfere with Spark functions. Renaming it eliminates potential problems during transformation or analysis.

In [24]:
# Rename columns
xau_df = xau_df.withColumnRenamed("Vol.", "Vol") \
               .withColumnRenamed("Change %", "Change")

# Show the result
xau_df.show(10)

+----------+------+------+------+------+-------+-------+
|      Date| Price|  Open|  High|   Low|    Vol| Change|
+----------+------+------+------+------+-------+-------+
|2025-05-09|3344.0|3310.2|3352.7|3278.9|210.27K| 0.0115|
|2025-05-08|3306.0|3373.1|3422.0|3293.3|327.13K|-0.0253|
|2025-05-07|3391.9|3448.1|3448.2|3367.0|304.24K| -0.009|
|2025-05-06|3422.8|3345.7|3444.5|3332.1|268.10K| 0.0303|
|2025-05-05|3322.3|3247.1|3346.7|3243.1|184.90K| 0.0244|
|2025-05-02|3243.3|3247.6|3277.0|3229.5|210.99K| 0.0065|
|2025-05-01|3222.2|3299.0|3300.6|3209.4|214.69K|-0.0292|
|2025-04-30|3319.1|3324.5|3337.6|3275.6|207.72K|-0.0043|
|2025-04-29|3333.6|3354.9|3359.3|3309.2|170.15K| 2.0E-4|
|2025-04-28|3333.0|3318.3|3347.7|3266.3|  1.30K| 0.0105|
+----------+------+------+------+------+-------+-------+
only showing top 10 rows



#### 🧹 Cleaning the 'Vol' Column for Numeric Representation

The `Vol` column in the dataset contains values with suffixes like 'K', 'M', and 'B', representing thousands, millions, and billions respectively. To perform accurate calculations and analyses, these values need to be converted into their actual numeric form. Here's how we cleaned the column:

1. **Handling 'K' (Thousands)**:  
   If the `Vol` value ends with 'K', we remove the 'K' and multiply the remaining number by 1,000 to convert it to the actual number.

2. **Handling 'M' (Millions)**:  
   If the `Vol` value ends with 'M', we remove the 'M' and multiply the remaining number by 1,000,000 to convert it into millions.

3. **Handling 'B' (Billions)**:  
   If the `Vol` value ends with 'B', we remove the 'B' and multiply the remaining number by 1,000,000,000 to convert it into billions.

4. **Other Cases (No Suffix)**:  
   If the `Vol` value does not contain any suffix, we simply remove any commas and cast it into a numeric type (`DoubleType`).

This cleaning ensures that the `Vol` column is consistently represented as a numeric value, making it easier to perform analysis, comparisons, and calculations. By transforming the 'K', 'M', and 'B' suffixes into their actual values, we ensure that the data is more suitable for numerical processing and analysis.


In [25]:
# Clean 'Vol' column
xau_df = xau_df.withColumn(
    "Vol_clean",
    when(col("Vol").endswith("K"), regexp_replace(col("Vol"), "K", "").cast(DoubleType()) * 1_000)
    .when(col("Vol").endswith("M"), regexp_replace(col("Vol"), "M", "").cast(DoubleType()) * 1_000_000)
    .when(col("Vol").endswith("B"), regexp_replace(col("Vol"), "B", "").cast(DoubleType()) * 1_000_000_000)
    .otherwise(regexp_replace(col("Vol"), ",", "").cast(DoubleType()))
)

#### 🧹 Finalizing the 'Vol' Column Clean-up

After transforming the `Vol` column into numeric values and creating a new column `Vol_clean`, we need to finalize the changes. This involves removing the old `Vol` column and renaming the `Vol_clean` column to `Vol` for consistency.

1. **Dropping the Old 'Vol' Column**:  
   The old `Vol` column, which still contained non-numeric values (e.g., 'K', 'M', 'B'), is dropped to ensure that only the cleaned data is retained.

2. **Renaming 'Vol_clean' to 'Vol'**:  
   The `Vol_clean` column, which now contains numeric values, is renamed back to `Vol` to maintain the original column name while ensuring the data is in a usable format.

In [26]:
# Drop old 'Vol' colum
xau_df = xau_df.drop("Vol")

# Rename cleaned columns
xau_df = xau_df.withColumnRenamed("Vol_clean", "Vol") 

# ✅ Check: show sample
xau_df.show(10)

+----------+------+------+------+------+-------+--------+
|      Date| Price|  Open|  High|   Low| Change|     Vol|
+----------+------+------+------+------+-------+--------+
|2025-05-09|3344.0|3310.2|3352.7|3278.9| 0.0115|210270.0|
|2025-05-08|3306.0|3373.1|3422.0|3293.3|-0.0253|327130.0|
|2025-05-07|3391.9|3448.1|3448.2|3367.0| -0.009|304240.0|
|2025-05-06|3422.8|3345.7|3444.5|3332.1| 0.0303|268100.0|
|2025-05-05|3322.3|3247.1|3346.7|3243.1| 0.0244|184900.0|
|2025-05-02|3243.3|3247.6|3277.0|3229.5| 0.0065|210990.0|
|2025-05-01|3222.2|3299.0|3300.6|3209.4|-0.0292|214690.0|
|2025-04-30|3319.1|3324.5|3337.6|3275.6|-0.0043|207720.0|
|2025-04-29|3333.6|3354.9|3359.3|3309.2| 2.0E-4|170150.0|
|2025-04-28|3333.0|3318.3|3347.7|3266.3| 0.0105|  1300.0|
+----------+------+------+------+------+-------+--------+
only showing top 10 rows



#### 🔄 Reordering Columns for Better Structure

In the preview of the dataset, we noticed that the `Vol` column currently appears after the `Change` column. For consistency and clarity in data presentation, we decided to reorder the columns so that the `Vol` column appears before the `Change` column.

1. **Reordering Columns**:  
   We defined the desired column order, specifying that `Vol` should come before `Change`. This ensures that the columns are logically arranged for easier analysis and reporting.

2. **Applying the New Order**:  
   Using the `select()` method, we applied the new column order to the DataFrame, which repositions `Vol` before `Change`.

In [27]:
# Reorder columns (move 'Vol' before 'Change')
desired_column_order = ['Date', 'Price', 'Open', 'High', 'Low', 'Vol', 'Change']
xau_df = xau_df.select(desired_column_order)

# Show the reordered DataFrame
xau_df.show(10)


+----------+------+------+------+------+--------+-------+
|      Date| Price|  Open|  High|   Low|     Vol| Change|
+----------+------+------+------+------+--------+-------+
|2025-05-09|3344.0|3310.2|3352.7|3278.9|210270.0| 0.0115|
|2025-05-08|3306.0|3373.1|3422.0|3293.3|327130.0|-0.0253|
|2025-05-07|3391.9|3448.1|3448.2|3367.0|304240.0| -0.009|
|2025-05-06|3422.8|3345.7|3444.5|3332.1|268100.0| 0.0303|
|2025-05-05|3322.3|3247.1|3346.7|3243.1|184900.0| 0.0244|
|2025-05-02|3243.3|3247.6|3277.0|3229.5|210990.0| 0.0065|
|2025-05-01|3222.2|3299.0|3300.6|3209.4|214690.0|-0.0292|
|2025-04-30|3319.1|3324.5|3337.6|3275.6|207720.0|-0.0043|
|2025-04-29|3333.6|3354.9|3359.3|3309.2|170150.0| 2.0E-4|
|2025-04-28|3333.0|3318.3|3347.7|3266.3|  1300.0| 0.0105|
+----------+------+------+------+------+--------+-------+
only showing top 10 rows



#### 📋 Checking the Schema of the DataFrame

In [28]:
# Check the schema
xau_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Price: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Vol: double (nullable = true)
 |-- Change: double (nullable = true)



#### ✅ Schema Validation Result

After running `printSchema()`, we confirmed that all columns in the DataFrame have been correctly interpreted and cast to appropriate data types:

- `Date`: `date` — correctly parsed from string format.
- `Price`, `Open`, `High`, `Low`, `Vol`, `Change`: all are `double` — ensuring numerical operations can be performed efficiently.

#### 🔎 Missing value check in Dataset

In [29]:
from pyspark.sql.functions import col, sum

# Count nulls in each column
null_counts = xau_df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in xau_df.columns
])

# Collect the result to the driver
null_counts_dict = null_counts.collect()[0].asDict()

null_counts.show()

# Print detailed results
print("🔍 Null Value Check:")
for column, null_count in null_counts_dict.items():
    if null_count == 0:
        print(f"✅ Column '{column}' has no missing values.")
    else:
        print(f"⚠️ Column '{column}' has {null_count} missing value(s).")

+----+-----+----+----+---+---+------+
|Date|Price|Open|High|Low|Vol|Change|
+----+-----+----+----+---+---+------+
|   0|    0|   0|   0|  0|  2|     0|
+----+-----+----+----+---+---+------+

🔍 Null Value Check:
✅ Column 'Date' has no missing values.
✅ Column 'Price' has no missing values.
✅ Column 'Open' has no missing values.
✅ Column 'High' has no missing values.
✅ Column 'Low' has no missing values.
⚠️ Column 'Vol' has 2 missing value(s).
✅ Column 'Change' has no missing values.


#### 📋 Handling with missing data

In [32]:
# Import necessary functions
from pyspark.sql import functions as F

# Calculate mean of the 'Vol' column
mean_vol = xau_df.select(F.mean("Vol")).collect()[0][0]

# Fill missing values in the 'Vol' column with the mean
xau_df = xau_df.fillna({'Vol': mean_vol})

# Print confirmation message
print(f"✅ Missing values in 'Vol' column have been filled with the mean value.")


✅ Missing values in 'Vol' column have been filled with the mean value.


#### ✅ Recheck if there are any remaining missing values in "Vol"

In [34]:
# Recheck if there are any remaining missing values in "Vol"
missing_count = xau_df.filter(F.col("Vol").isNull()).count()

if missing_count == 0:
    print("No missing values left in the 'Vol' column.")
else:
    print(f"There are {missing_count} missing values in the 'Vol' column.")

No missing values left in the 'Vol' column.


#### 🔎 Duplicate Row Check in Dataset

In [35]:
# ✅ Check for duplicate rows
total_rows = xau_df.count()
distinct_rows = xau_df.distinct().count()
duplicate_count = total_rows - distinct_rows

# 📝 Print result
print("🔎 Duplicate Row Check:")
if duplicate_count == 0:
    print("✅ No duplicate rows found in the dataset.")
else:
    print(f"⚠️ Found {duplicate_count} duplicate row(s) in the dataset.")

🔎 Duplicate Row Check:
✅ No duplicate rows found in the dataset.


#### 📋 Statistical results summary

In [36]:
# Detailed summary statistics including percentiles
xau_df.summary().show()

+-------+------------------+------------------+------------------+------------------+------------------+--------------------+
|summary|             Price|              Open|              High|               Low|               Vol|              Change|
+-------+------------------+------------------+------------------+------------------+------------------+--------------------+
|  count|              2653|              2653|              2653|              2653|              2653|                2653|
|   mean|1653.6124387485856|1653.5597625329815|1665.0171880889557| 1642.046683000381|129573.40626178801|4.353938937052375...|
| stddev|462.44189424631185| 462.0938485287363|466.74527542432367|457.76725241069994|130175.02274962829|0.009356735632787645|
|    min|            1049.6|            1051.5|            1062.7|            1045.4|             120.0|             -0.0499|
|    25%|            1269.3|            1268.5|            1275.9|            1262.4|            1150.0|             -

#### 📈 Key Inpretations:

This summary table offers a statistical overview of 2,653 daily records for Gold (XAU) traded in USD from 2015 to 2025. The data includes key indicators such as daily price points, trading volume, and percentage changes. Here's a breakdown of the most notable observations:

- **Price and Volatility**:
  - The average closing price is **$1,653.61**, with a standard deviation of **$462.44**, indicating a moderate level of price volatility over the observed period.
  - Prices ranged from a **minimum of $1,049.60** to a **maximum of $3,425.30**, capturing significant long-term growth and fluctuations in the gold market.

- **Trading Volume**:
  - The average volume is around **129,573 ounces per day**, with a high standard deviation (**130,175**), reflecting substantial variation in daily trading activity.
  - Volume ranged from **just 120 oz** to a peak of **816,530 oz**, suggesting some days had exceptionally high market participation.

- **Price Movement Range**:
  - The median values (50%) for `Open`, `High`, `Low`, and `Price` all hover around **$1,627–$1,646**, aligning with the overall mean and indicating a relatively symmetric distribution of central prices.

- **Daily Change (%)**:
  - The mean daily percentage change is approximately **0.00435**, or **0.435%**, with a standard deviation of **0.0094**, reflecting mild daily price fluctuations.
  - The most extreme recorded daily percentage change is **+5.95%** and **-4.99%**, suggesting rare but impactful volatility events.

#### 📅 Sorting by Date for Time Series Consistency
When working with time series data such as Gold historical prices, maintaining chronological order is essential. Without explicitly sorting the dataset, the rows may appear in a random or inconsistent order, which can lead to incorrect insights during trend analysis, forecasting, or visualizations. To ensure the dataset follows the correct timeline, we sort the data by the Date column in ascending order:

In [37]:
# Sort by ascending date
df_order = xau_df.orderBy("Date")

# Print confirmation message
print("✅ The dataset has been sorted by date in ascending order.")

✅ The dataset has been sorted by date in ascending order.


### 3. Export Cleaned XAU Data to CSV and Stop Spark Session

In [38]:
# Save the result to a CSV file
df_order.toPandas().to_csv("C:/Users/ADMIN88/Documents/Big data analysis/Final project/XAU_cleaned.csv", index=False)

# Print confirmation message
print("✅ The CSV file has been saved successfully.")

# Stop the Spark session
spark.stop()

✅ The CSV file has been saved successfully.


#### 💾 Saving the Cleaned Dataset to CSV

After completing all necessary data cleaning and preprocessing steps, we export the final DataFrame to a CSV file for further analysis or sharing. The code performs the following:

1. **Convert to Pandas DataFrame**:  
   We convert the Spark DataFrame `df_order` to a Pandas DataFrame using `.toPandas()` for compatibility with the `.to_csv()` function.

2. **Export to CSV**:  
   The cleaned dataset is saved to the specified path

3. **Terminate Spark Session**:  
Finally, we stop the active Spark session with `spark.stop()` to release resources.

This step finalizes our data pipeline and prepares the cleaned dataset for downstream tasks such as visualization, modeling, or reporting.




## **IV. VNI Data Cleaning and Preprocessing with PySpark**

### 1. Initialize SparkSession for VNI Data Preprocessing

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, when, col
from pyspark.sql.types import DoubleType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("VNI Data Cleaning") \
    .getOrCreate()

#### Import Libraries and Initialize SparkSession for VNI Data Cleaning

This section sets up the PySpark environment needed to perform data preprocessing tasks on the VNI historical data. 

1. **Imports required PySpark libraries:**
   - `SparkSession`: Main entry point for working with DataFrames and the Spark SQL engine.
   - `regexp_replace`, `when`, `col`: Common PySpark functions for cleaning and transforming data.
   - `DoubleType`: Used to explicitly cast columns to `double` type.

2. **Initializes a Spark session:**
   - Names the application `"VNI Data Cleaning"` for tracking in the Spark UI.
   - Uses `.getOrCreate()` to avoid creating a new session if one already exists.

### 2. Data preprocessing

#### Load VNI CSV File into a PySpark DataFrame

In this step, we load the raw historical VNI data from a CSV file into a PySpark DataFrame for further processing.

1. **Read the CSV file into a DataFrame:**
   - Uses `spark.read.csv()` to read the file located at the specified path.
   - `header=True`: Treats the first row of the file as column headers.
   - `inferSchema=True`: Automatically infers data types for each column.

2. **Preview the DataFrame:**
   - `.show(10)`: Displays the first 10 rows of the DataFrame to give a quick look at the data structure and content.

In [40]:
# Read CSV file
vni_df = spark.read.csv("C:/Users/ADMIN88/Documents/Big data analysis/Final project/VNI.csv", header=True, inferSchema=True)
# Show the first few rows
vni_df.show(10)

+----------+-------+-------+-------+-------+-------+--------+
|      Date|  Price|   Open|   High|    Low|   Vol.|Change %|
+----------+-------+-------+-------+-------+-------+--------+
|2025-05-09| 1267.3| 1269.8|1279.61|1264.87|693.91M|  -0.002|
|2025-05-08| 1269.8|1250.37|1271.44|1250.37|780.07M|  0.0155|
|2025-05-07|1250.37|1241.95|1250.79| 1240.2|696.10M|  0.0068|
|2025-05-06|1241.95|1240.05|1251.02|1240.05|723.04M|  0.0015|
|2025-05-05|1240.05| 1226.3|1241.51| 1226.3|562.29M|  0.0112|
|2025-04-29| 1226.3| 1226.8| 1229.1| 1222.3|675.96M| -4.0E-4|
|2025-04-28| 1226.8| 1234.3| 1234.3|1222.56|679.79M|  -0.002|
|2025-04-25|1229.23|1223.35|1230.72|1220.67|863.02M|  0.0048|
|2025-04-24|1223.35|1214.78|1224.66|1210.37|798.64M|  0.0102|
|2025-04-23| 1211.0|1197.13|1216.28|1197.13|855.21M|  0.0116|
+----------+-------+-------+-------+-------+-------+--------+
only showing top 10 rows



#### 📊 Dataset Preview: VNI Index Historical Price Data

This dataset contains historical daily trading data for the **VN-Index (VNI)** — the benchmark stock index of Vietnam — spanning from **May 1, 2015, to May 9, 2025**. It captures essential market information such as opening and closing prices, intraday highs and lows, trading volume, and daily percentage changes. 

#### 🧾 Column Descriptions:

| Column     | Description                                                                 |
|------------|-----------------------------------------------------------------------------|
| `Date`     | The calendar date of the VNI trading session (format: YYYY-MM-DD)          |
| `Price`    | The closing value of the VN-Index on that date                              |
| `Open`     | The opening value of the VN-Index for that trading day                     |
| `High`     | The highest index value recorded during the trading session                |
| `Low`      | The lowest index value recorded during the trading session                 |
| `Vol.`     | The total trading volume on that day  |
| `Change %` | The daily percentage change in closing value compared to the previous day  |



#### 🔄 Renaming Columns for Cleaner Processing

To prepare the dataset for smoother data manipulation and avoid issues caused by special characters, we renamed the following columns:

- **`Vol.` → `Vol`** :  The period `.` can lead to syntax issues in Spark SQL expressions or when referencing column names programmatically. Removing it ensures better compatibility.

- **`Change %` → `Change`** : The percentage symbol `%` may cause parsing errors or interfere with Spark functions. Renaming it eliminates potential problems during transformation or analysis.

In [41]:
# Rename columns
vni_df = vni_df.withColumnRenamed("Vol.", "Vol") \
               .withColumnRenamed("Change %", "Change")

# Show the result
vni_df.show(10)

+----------+-------+-------+-------+-------+-------+-------+
|      Date|  Price|   Open|   High|    Low|    Vol| Change|
+----------+-------+-------+-------+-------+-------+-------+
|2025-05-09| 1267.3| 1269.8|1279.61|1264.87|693.91M| -0.002|
|2025-05-08| 1269.8|1250.37|1271.44|1250.37|780.07M| 0.0155|
|2025-05-07|1250.37|1241.95|1250.79| 1240.2|696.10M| 0.0068|
|2025-05-06|1241.95|1240.05|1251.02|1240.05|723.04M| 0.0015|
|2025-05-05|1240.05| 1226.3|1241.51| 1226.3|562.29M| 0.0112|
|2025-04-29| 1226.3| 1226.8| 1229.1| 1222.3|675.96M|-4.0E-4|
|2025-04-28| 1226.8| 1234.3| 1234.3|1222.56|679.79M| -0.002|
|2025-04-25|1229.23|1223.35|1230.72|1220.67|863.02M| 0.0048|
|2025-04-24|1223.35|1214.78|1224.66|1210.37|798.64M| 0.0102|
|2025-04-23| 1211.0|1197.13|1216.28|1197.13|855.21M| 0.0116|
+----------+-------+-------+-------+-------+-------+-------+
only showing top 10 rows



#### 🧹 Cleaning the 'Vol' Column for Numeric Representation

The `Vol` column in the dataset contains values with suffixes like 'K', 'M', and 'B', representing thousands, millions, and billions respectively. To perform accurate calculations and analyses, these values need to be converted into their actual numeric form. Here's how we cleaned the column:

1. **Handling 'K' (Thousands)**:  
   If the `Vol` value ends with 'K', we remove the 'K' and multiply the remaining number by 1,000 to convert it to the actual number.

2. **Handling 'M' (Millions)**:  
   If the `Vol` value ends with 'M', we remove the 'M' and multiply the remaining number by 1,000,000 to convert it into millions.

3. **Handling 'B' (Billions)**:  
   If the `Vol` value ends with 'B', we remove the 'B' and multiply the remaining number by 1,000,000,000 to convert it into billions.

4. **Other Cases (No Suffix)**:  
   If the `Vol` value does not contain any suffix, we simply remove any commas and cast it into a numeric type (`DoubleType`).

This cleaning ensures that the `Vol` column is consistently represented as a numeric value, making it easier to perform analysis, comparisons, and calculations. By transforming the 'K', 'M', and 'B' suffixes into their actual values, we ensure that the data is more suitable for numerical processing and analysis.


In [42]:
# Clean 'Vol' column
vni_df = vni_df.withColumn(
    "Vol_clean",
    when(col("Vol").endswith("K"), regexp_replace(col("Vol"), "K", "").cast(DoubleType()) * 1_000)
    .when(col("Vol").endswith("M"), regexp_replace(col("Vol"), "M", "").cast(DoubleType()) * 1_000_000)
    .when(col("Vol").endswith("B"), regexp_replace(col("Vol"), "B", "").cast(DoubleType()) * 1_000_000_000)
    .otherwise(regexp_replace(col("Vol"), ",", "").cast(DoubleType()))
)

#### 🧹 Finalizing the 'Vol' Column Clean-up

After transforming the `Vol` column into numeric values and creating a new column `Vol_clean`, we need to finalize the changes. This involves removing the old `Vol` column and renaming the `Vol_clean` column to `Vol` for consistency.

1. **Dropping the Old 'Vol' Column**:  
   The old `Vol` column, which still contained non-numeric values (e.g., 'K', 'M', 'B'), is dropped to ensure that only the cleaned data is retained.

2. **Renaming 'Vol_clean' to 'Vol'**:  
   The `Vol_clean` column, which now contains numeric values, is renamed back to `Vol` to maintain the original column name while ensuring the data is in a usable format.

In [43]:
# Drop old 'Vol' colum
vni_df = vni_df.drop("Vol")

# Rename cleaned columns
vni_df = vni_df.withColumnRenamed("Vol_clean", "Vol") 

# ✅ Check: show sample
vni_df.show(10)

+----------+-------+-------+-------+-------+-------+--------+
|      Date|  Price|   Open|   High|    Low| Change|     Vol|
+----------+-------+-------+-------+-------+-------+--------+
|2025-05-09| 1267.3| 1269.8|1279.61|1264.87| -0.002|6.9391E8|
|2025-05-08| 1269.8|1250.37|1271.44|1250.37| 0.0155|7.8007E8|
|2025-05-07|1250.37|1241.95|1250.79| 1240.2| 0.0068| 6.961E8|
|2025-05-06|1241.95|1240.05|1251.02|1240.05| 0.0015|7.2304E8|
|2025-05-05|1240.05| 1226.3|1241.51| 1226.3| 0.0112|5.6229E8|
|2025-04-29| 1226.3| 1226.8| 1229.1| 1222.3|-4.0E-4|6.7596E8|
|2025-04-28| 1226.8| 1234.3| 1234.3|1222.56| -0.002|6.7979E8|
|2025-04-25|1229.23|1223.35|1230.72|1220.67| 0.0048|8.6302E8|
|2025-04-24|1223.35|1214.78|1224.66|1210.37| 0.0102|7.9864E8|
|2025-04-23| 1211.0|1197.13|1216.28|1197.13| 0.0116|8.5521E8|
+----------+-------+-------+-------+-------+-------+--------+
only showing top 10 rows



#### 🔄 Reordering Columns for Better Structure

In the preview of the dataset, we noticed that the `Vol` column currently appears after the `Change` column. For consistency and clarity in data presentation, we decided to reorder the columns so that the `Vol` column appears before the `Change` column.

1. **Reordering Columns**:  
   We defined the desired column order, specifying that `Vol` should come before `Change`. This ensures that the columns are logically arranged for easier analysis and reporting.

2. **Applying the New Order**:  
   Using the `select()` method, we applied the new column order to the DataFrame, which repositions `Vol` before `Change`.

In [44]:
# Reorder columns (move 'Vol' before 'Change')
desired_column_order = ['Date', 'Price', 'Open', 'High', 'Low', 'Vol', 'Change']
vni_df = vni_df.select(desired_column_order)

# Show the reordered DataFrame
vni_df.show(10)


+----------+-------+-------+-------+-------+--------+-------+
|      Date|  Price|   Open|   High|    Low|     Vol| Change|
+----------+-------+-------+-------+-------+--------+-------+
|2025-05-09| 1267.3| 1269.8|1279.61|1264.87|6.9391E8| -0.002|
|2025-05-08| 1269.8|1250.37|1271.44|1250.37|7.8007E8| 0.0155|
|2025-05-07|1250.37|1241.95|1250.79| 1240.2| 6.961E8| 0.0068|
|2025-05-06|1241.95|1240.05|1251.02|1240.05|7.2304E8| 0.0015|
|2025-05-05|1240.05| 1226.3|1241.51| 1226.3|5.6229E8| 0.0112|
|2025-04-29| 1226.3| 1226.8| 1229.1| 1222.3|6.7596E8|-4.0E-4|
|2025-04-28| 1226.8| 1234.3| 1234.3|1222.56|6.7979E8| -0.002|
|2025-04-25|1229.23|1223.35|1230.72|1220.67|8.6302E8| 0.0048|
|2025-04-24|1223.35|1214.78|1224.66|1210.37|7.9864E8| 0.0102|
|2025-04-23| 1211.0|1197.13|1216.28|1197.13|8.5521E8| 0.0116|
+----------+-------+-------+-------+-------+--------+-------+
only showing top 10 rows



#### 📋 Checking the Schema of the DataFrame

In [45]:
# Check the schema
vni_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Price: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Vol: double (nullable = true)
 |-- Change: double (nullable = true)



#### ✅ Schema Validation Result

After running `printSchema()`, we confirmed that all columns in the DataFrame have been correctly interpreted and cast to appropriate data types:

- `Date`: `date` — correctly parsed from string format.
- `Price`, `Open`, `High`, `Low`, `Vol`, `Change`: all are `double` — ensuring numerical operations can be performed efficiently.

#### 🔎 Missing value check in Dataset

In [46]:
from pyspark.sql.functions import col, sum

# Count nulls in each column
null_counts = vni_df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in vni_df.columns
])

# Collect the result to the driver
null_counts_dict = null_counts.collect()[0].asDict()

null_counts.show()

# Print detailed results
print("🔍 Null Value Check:")
for column, null_count in null_counts_dict.items():
    if null_count == 0:
        print(f"✅ Column '{column}' has no missing values.")
    else:
        print(f"⚠️ Column '{column}' has {null_count} missing value(s).")

+----+-----+----+----+---+---+------+
|Date|Price|Open|High|Low|Vol|Change|
+----+-----+----+----+---+---+------+
|   0|    0|   0|   0|  0|  0|     0|
+----+-----+----+----+---+---+------+

🔍 Null Value Check:
✅ Column 'Date' has no missing values.
✅ Column 'Price' has no missing values.
✅ Column 'Open' has no missing values.
✅ Column 'High' has no missing values.
✅ Column 'Low' has no missing values.
✅ Column 'Vol' has no missing values.
✅ Column 'Change' has no missing values.


#### 🔎 Duplicate Row Check in Dataset

In [47]:
# ✅ Check for duplicate rows
total_rows = vni_df.count()
distinct_rows = vni_df.distinct().count()
duplicate_count = total_rows - distinct_rows

# 📝 Print result
print("🔎 Duplicate Row Check:")
if duplicate_count == 0:
    print("✅ No duplicate rows found in the dataset.")
else:
    print(f"⚠️ Found {duplicate_count} duplicate row(s) in the dataset.")

🔎 Duplicate Row Check:
✅ No duplicate rows found in the dataset.


#### 📋 Statistical results summary

In [48]:
# Detailed summary statistics including percentiles
vni_df.summary().show()

+-------+------------------+-----------------+-----------------+------------------+--------------------+--------------------+
|summary|             Price|             Open|             High|               Low|                 Vol|              Change|
+-------+------------------+-----------------+-----------------+------------------+--------------------+--------------------+
|  count|              2581|             2581|             2581|              2581|                2581|                2581|
|   mean| 987.2235761332812|987.1161139093396|993.4640759395588|  980.269651297945|2.5384480379697792E7|3.930647036032540...|
| stddev|261.97210258799754|262.1617610204898| 263.935589281456|259.84101849529037|1.5117566805804768E8|0.011492879930301337|
|    min|            521.88|           511.13|           525.83|            511.13|             61680.0|             -0.0668|
|    25%|            767.49|            767.3|           769.99|            762.53|            159910.0|             -

#### 📈 Key Inpretations:

The table below presents descriptive statistics for the VN-Index (VNI) dataset, spanning **2,581 daily records** from 2015 to 2025. Here's a breakdown of key insights derived from the summary:

- **Price Levels**:
  - The **average closing price** of the VN-Index was approximately **987.22**, with values ranging from a minimum of **521.88** to a maximum of **1,528.57**.
  - This reflects significant market growth over the decade and possible cyclical fluctuations.

- **Volatility & Range**:
  - The **standard deviation** of around **261.97** for `Price` and similar values across `Open`, `High`, and `Low` suggest moderate-to-high volatility in the VN-Index over time.
  - The **interquartile range (IQR)** — from 25th percentile (~767.49) to 75th percentile (~1213.93) — further supports notable variation in price movements.

- **Volume**:
  - The `Vol.` column shows high dispersion in daily trading volume, with an average of **~25.38 million** shares per day and a maximum approaching **1.99 billion**.
  - The large standard deviation (≈151M) indicates occasional surges in trading activity, likely due to macroeconomic events or investor sentiment shifts.

- **Daily Change %**:
  - The `Change` column reflects a **mean daily return** of approximately **0.0039 (or 0.39%)**, with a **maximum daily gain** of **6.77%** and a **maximum loss** of **-6.68%**.
  - The relatively low median (0.11%) and standard deviation (~1.15%) point toward small daily fluctuations on most trading days, punctuated by occasional sharp moves.


#### 📅 Sorting by Date for Time Series Consistency
When working with time series data such as VNI historical data, maintaining chronological order is essential. Without explicitly sorting the dataset, the rows may appear in a random or inconsistent order, which can lead to incorrect insights during trend analysis, forecasting, or visualizations. To ensure the dataset follows the correct timeline, we sort the data by the Date column in ascending order:

In [49]:
# Sort by ascending date
df_order = vni_df.orderBy("Date")

# Print confirmation message
print("✅ The dataset has been sorted by date in ascending order.")

✅ The dataset has been sorted by date in ascending order.


### 3. Export Cleaned VNI Data to CSV and Stop Spark Session

In [50]:
# Save the result to a CSV file
df_order.toPandas().to_csv("C:/Users/ADMIN88/Documents/Big data analysis/Final project/VNI_cleaned.csv", index=False)

# Print confirmation message
print("✅ The CSV file has been saved successfully.")

# Stop the Spark session
spark.stop()

✅ The CSV file has been saved successfully.


#### 💾 Saving the Cleaned Dataset to CSV

After completing all necessary data cleaning and preprocessing steps, we export the final DataFrame to a CSV file for further analysis or sharing. The code performs the following:

1. **Convert to Pandas DataFrame**:  
   We convert the Spark DataFrame `df_order` to a Pandas DataFrame using `.toPandas()` for compatibility with the `.to_csv()` function.

2. **Export to CSV**:  
   The cleaned dataset is saved to the specified path

3. **Terminate Spark Session**:  
Finally, we stop the active Spark session with `spark.stop()` to release resources.

This step finalizes our data pipeline and prepares the cleaned dataset for downstream tasks such as visualization, modeling, or reporting.


