## Data Wrangling

##### Import necessary modules

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import os

##### Configure the hadoop dir path

In [11]:
BASE_PATH = 'hdfs://localhost:9000/user/hadoop'
INPUT_PATH = f'{BASE_PATH}/inputs'
MERGED_PATH = f'{BASE_PATH}/merged'

print('input path {} , merge path {}'.format(INPUT_PATH, MERGED_PATH))

input path hdfs://localhost:9000/user/hadoop/inputs , merge path hdfs://localhost:9000/user/hadoop/merged


##### Function for create spark session that connect to hadoop

In [12]:
def create_spark_session():
    """
    Creates and configures a SparkSession with minimal memory settings.
    """
    spark = SparkSession.builder \
    .appName("Weather Data Combination") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.ui.showConsoleProgress", "true") \
    .getOrCreate()

    return spark


##### Create a function to load data from hadoop

In [13]:
def load_excel_files(spark, file_path):
    """
    Load all Excel files from the given base path.
    """
    df = spark.read.format("com.crealytics.spark.excel") \
                .option("header", "false") \
                .option("dataAddress", "'RUA Data'!A6") \
                .option("maxRowsInMemory", 1000) \
                .option("treatEmptyValuesAsNulls", "true") \
                .load(file_path)
    return df

##### Test read data as xlsx file

In [14]:
spark = create_spark_session()
hdfs_path = f'{INPUT_PATH}/APRIL-2021.xlsx'
df = load_excel_files(spark, hdfs_path)
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



##### Configure the files that we need to work with

In [15]:
file_paths = [f"{INPUT_PATH}/{filename}" for filename in [
    "APRIL-2021.xlsx", "APRIL-2022.xlsx", "AUGUST-2021.xlsx",
    "DECEMBER-2020.xlsx", "DECEMBER-2021.xlsx", "FEBRUARY-2021.xlsx",
    "FEBRUARY-2022.xlsx", "JANUARY-2021.xlsx", "JANUARY-2022.xlsx",
    "JULY-2021.xlsx", "MARCH-2021.xlsx", "MARCH-2022.xlsx",
    "MAY-2021.xlsx", "NOVEMBER-2020.xlsx", "NOVEMBER-2021.xlsx",
    "OCTOBER-2020.xlsx", "OCTOBER-2021.xlsx", "SEPTEMBER-2020.xlsx",
    "SEPTEMBER-2021.xlsx", "jUNE-2021.xlsx"
]]
print(file_paths)

['hdfs://localhost:9000/user/hadoop/inputs/APRIL-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/APRIL-2022.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/AUGUST-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/DECEMBER-2020.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/DECEMBER-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/FEBRUARY-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/FEBRUARY-2022.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/JANUARY-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/JANUARY-2022.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/JULY-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/MARCH-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/MARCH-2022.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/MAY-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/NOVEMBER-2020.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/NOVEMBER-2021.xlsx', 'hdfs://localhost:9000/user/hadoop/inputs/OCTOBER-2020.xlsx', 'hdfs://localhost

In [16]:
# Stop any existing SparkSession
if 'spark' in locals():
    spark.stop()
    
# Create new SparkSession with minimal memory settings
spark = SparkSession.builder \
    .appName("Weather Data Combination") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "2") \
    .config("spark.driver.maxResultSize", "1g") \
    .getOrCreate()

# Define schema with all required columns
schema = StructType([
    StructField("Line#", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Water Content (m3/m3)", FloatType(), True),
    StructField("Solar Radiation (W/m2)", FloatType(), True),
    StructField("Rain (mm)", FloatType(), True),
    StructField("Temperature (Celcius)", FloatType(), True),
    StructField("RH (%)", FloatType(), True),
    StructField("Wind Speed (m/s)", FloatType(), True),
    StructField("Gust Speed (m/s)", FloatType(), True),
    StructField("Wind Direction (Degree)", FloatType(), True),
    StructField("Dew Point (Celcius)", FloatType(), True)
])


def process_and_combine_files(file_paths, schema, output_path):
    """
    Process multiple Excel files and combine them into a single CSV file
    while ensuring all columns are present and properly typed.
    """
    combined_df = None
    
    for i, file_path in enumerate(file_paths):
        try:
            print(f"Processing file {i+1}/{len(file_paths)}: {file_path}")
            
            # Read Excel file
            current_df = spark.read.format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("dataAddress", "'RUA Data'!A6") \
                .option("maxRowsInMemory", 1000) \
                .option("treatEmptyValuesAsNulls", "true") \
                .schema(schema) \
                .load(file_path)
            
            # Select columns in specific order to ensure consistency
            current_df = current_df.select(
                "Line#",
                "Date",
                "Time",
                "Water Content (m3/m3)",
                "Solar Radiation (W/m2)",
                "Rain (mm)",
                "Temperature (Celcius)",
                "RH (%)",
                "Wind Speed (m/s)",
                "Gust Speed (m/s)",
                "Wind Direction (Degree)",
                "Dew Point (Celcius)"
            )
            
            # Combine DataFrames
            if combined_df is None:
                combined_df = current_df
            else:
                combined_df = combined_df.union(current_df)
            
            # Clear cache after each file
            current_df.unpersist()
            spark.catalog.clearCache()
            
            print(f"Successfully processed {file_path}")
            
        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")
            continue
    
    # Write combined data to CSV
    if combined_df is not None:
        combined_df.coalesce(1).write \
            .mode("overwrite") \
            .option("header", "true") \
            .option("compression", "none") \
            .csv(output_path)
    else:
        print("No data was successfully processed")

# Execute the combination process
try:
    process_and_combine_files(file_paths, schema, f'{MERGED_PATH}/combined_raw_data.csv')
    print("Data combination completed successfully")
except Exception as e:
    print(f"Error in main process: {str(e)}")
finally:
    # Clean up
    spark.catalog.clearCache()
    spark.stop()

Processing file 1/20: hdfs://localhost:9000/user/hadoop/inputs/APRIL-2021.xlsx
Successfully processed hdfs://localhost:9000/user/hadoop/inputs/APRIL-2021.xlsx
Processing file 2/20: hdfs://localhost:9000/user/hadoop/inputs/APRIL-2022.xlsx
Successfully processed hdfs://localhost:9000/user/hadoop/inputs/APRIL-2022.xlsx
Processing file 3/20: hdfs://localhost:9000/user/hadoop/inputs/AUGUST-2021.xlsx
Successfully processed hdfs://localhost:9000/user/hadoop/inputs/AUGUST-2021.xlsx
Processing file 4/20: hdfs://localhost:9000/user/hadoop/inputs/DECEMBER-2020.xlsx
Successfully processed hdfs://localhost:9000/user/hadoop/inputs/DECEMBER-2020.xlsx
Processing file 5/20: hdfs://localhost:9000/user/hadoop/inputs/DECEMBER-2021.xlsx
Successfully processed hdfs://localhost:9000/user/hadoop/inputs/DECEMBER-2021.xlsx
Processing file 6/20: hdfs://localhost:9000/user/hadoop/inputs/FEBRUARY-2021.xlsx
Successfully processed hdfs://localhost:9000/user/hadoop/inputs/FEBRUARY-2021.xlsx
Processing file 7/20: hdfs

24/11/30 14:33:20 WARN TaskSetManager: Stage 0 contains a task of very large size (19309 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Data combination completed successfully


##### Verify the file write successfully

In [23]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read CSV from HDFS") \
    .getOrCreate()

# Path to the CSV file on HDFS
file_path = f'{MERGED_PATH}/combined_raw_data.csv'

# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)

+-----+--------+-------------------+---------------------+----------------------+---------+---------------------+------+----------------+----------------+-----------------------+-------------------+
|Line#|    Date|               Time|Water Content (m3/m3)|Solar Radiation (W/m2)|Rain (mm)|Temperature (Celcius)|RH (%)|Wind Speed (m/s)|Gust Speed (m/s)|Wind Direction (Degree)|Dew Point (Celcius)|
+-----+--------+-------------------+---------------------+----------------------+---------+---------------------+------+----------------+----------------+-----------------------+-------------------+
|    2|21/04/01|2024-11-30 00:05:00|               0.2532|                   1.0|      0.0|                28.02|  81.0|             0.0|             0.0|                  215.0|              24.49|
|    3|21/04/01|2024-11-30 00:10:00|               0.2524|                   1.0|      0.0|                28.07|  81.0|             0.0|             1.3|                  170.0|              24.53|
|    