In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
spark = SparkSession.builder \
    .appName("CSV Reader") \
    .getOrCreate()

In [2]:
df = spark.read.csv("unclean_data.csv", header=True, inferSchema=True)
df.show()

+----+---+---+-----+-----+-----+------+---------+
|YEAR| MO| DY|  T2M|WS50M|   TS| WD50M|ALLSKY_KT|
+----+---+---+-----+-----+-----+------+---------+
|2003|  1|  1|24.12| 3.21|24.65|130.81|     0.62|
|2003|  1|  2|24.12| 5.09|24.43|  82.0|     0.63|
|2003|  1|  3| 23.9| 6.34|24.47| 65.69|     0.57|
|2003|  1|  4|24.52| 6.48|25.08| 47.88|     0.59|
|2003|  1|  5|24.65|  6.7|25.18|  47.5|     0.64|
|2003|  1|  6|24.48| 6.55|25.03| 50.88|     0.62|
|2003|  1|  7|23.74| 5.98|24.33| 49.06|     0.64|
|2003|  1|  8|24.05| 5.04|24.83| 56.75|     0.64|
|2003|  1|  9| 24.4| 5.87|25.23| 53.56|     0.51|
|2003|  1| 10|24.41| 6.36|25.16| 55.44|     0.55|
|2003|  1| 11|24.06| 5.94|24.91|  51.5|     0.56|
|2003|  1| 12|23.68| 6.39|24.28| 44.38|     0.57|
|2003|  1| 13|23.92| 6.47|24.58| 46.69|     0.55|
|2003|  1| 14|23.82| 6.76|24.07| 42.06|     0.61|
|2003|  1| 15|24.11| 7.09|24.75| 47.38|     0.51|
|2003|  1| 16|23.88|  7.1|24.66| 39.94|     0.57|
|2003|  1| 17|23.33| 7.06|24.12|  44.0|      0.6|


In [3]:
file_path = "unclean_data.csv"  # Update with the actual file path
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Show initial data
data.show()

# Data cleaning steps

# 1. Replace -999 with None for missing values
cleaned_data = data.replace(-999, None)

# 2. Drop rows with any null values (optional)
cleaned_data = cleaned_data.na.drop()

# 3. Filter out unrealistic values (e.g., temperature below absolute zero)
cleaned_data = cleaned_data.filter((col("T2M") >= -273.15) & (col("TS") >= -273.15))

# 4. Remove outliers for wind speed (WS50M) beyond a reasonable range (e.g., > 30 m/s)
cleaned_data = cleaned_data.filter(col("WS50M") <= 30)

# 5. Filter wind direction (WD50M) to be within valid range (0 to 360 degrees)
cleaned_data = cleaned_data.filter((col("WD50M") >= 0) & (col("WD50M") <= 360))

# 6. Ensure All Sky Insolation Clearness Index (ALLSKY_KT) is between 0 and 1
cleaned_data = cleaned_data.filter((col("ALLSKY_KT") >= 0) & (col("ALLSKY_KT") <= 1))

# 7. Remove duplicate rows based on all columns
cleaned_data = cleaned_data.dropDuplicates()

# 8. Show cleaned data
cleaned_data.show()

# Save cleaned data to a new CSV file
cleaned_data.write.csv("path/to/cleaned_data.csv", header=True)

+----+---+---+-----+-----+-----+------+---------+
|YEAR| MO| DY|  T2M|WS50M|   TS| WD50M|ALLSKY_KT|
+----+---+---+-----+-----+-----+------+---------+
|2003|  1|  1|24.12| 3.21|24.65|130.81|     0.62|
|2003|  1|  2|24.12| 5.09|24.43|  82.0|     0.63|
|2003|  1|  3| 23.9| 6.34|24.47| 65.69|     0.57|
|2003|  1|  4|24.52| 6.48|25.08| 47.88|     0.59|
|2003|  1|  5|24.65|  6.7|25.18|  47.5|     0.64|
|2003|  1|  6|24.48| 6.55|25.03| 50.88|     0.62|
|2003|  1|  7|23.74| 5.98|24.33| 49.06|     0.64|
|2003|  1|  8|24.05| 5.04|24.83| 56.75|     0.64|
|2003|  1|  9| 24.4| 5.87|25.23| 53.56|     0.51|
|2003|  1| 10|24.41| 6.36|25.16| 55.44|     0.55|
|2003|  1| 11|24.06| 5.94|24.91|  51.5|     0.56|
|2003|  1| 12|23.68| 6.39|24.28| 44.38|     0.57|
|2003|  1| 13|23.92| 6.47|24.58| 46.69|     0.55|
|2003|  1| 14|23.82| 6.76|24.07| 42.06|     0.61|
|2003|  1| 15|24.11| 7.09|24.75| 47.38|     0.51|
|2003|  1| 16|23.88|  7.1|24.66| 39.94|     0.57|
|2003|  1| 17|23.33| 7.06|24.12|  44.0|      0.6|
