In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, when, round
spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.format('csv').options(header='true').load('./southeast.csv')

In [3]:
# Show the first few rows of the DataFrame
df.show(5)

+-----+----------+-----+--------------------------------+-----------------------------------------------------+-----------------------------------------------+------------------------------------------------+-----------------------+--------------------------------------------+------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------------+------------------------------------------------+----------------------------------------+----------------------------------------+-----------------------------------+------------------------------------+--------------------------+-------------------------------+------+-----+-----------------+------------+--------+------------+------+
|index|      Data| Hora|PRECIPITAÇÃO TOTAL, HORÁRIO (mm)|PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)|PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)|PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)|RADIAC

In [4]:
#change columns name

column_mapping = {
    "index": "id",
    "Data": "date",
    "Hora": "time",
    "PRECIPITAÇÃO TOTAL, HORÁRIO (mm)": "precipitation",
    "PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)": "atmospheric_pressure",
    "PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)": "max_atmospheric_pressure",
    "PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)": "min_atmospheric_pressure",
    "RADIACAO GLOBAL (Kj/m²)": "radiation",
    "TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)":"temperature",
    "TEMPERATURA DO PONTO DE ORVALHO (°C)": "dewpoint_temp",
    "TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)": "max_temp",
    "TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)": "min_temp",
    "TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)": "max_dewpoint_temp_",
    "TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)": "min_dewpoint_temp",
    "UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)": "max_humidity",
    "UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)": "min_humidity",
    "UMIDADE RELATIVA DO AR, HORARIA (%)": "humidity",
    "VENTO, DIREÇÃO HORARIA (gr) (° (gr))": "wind_direction",
    "VENTO, RAJADA MAXIMA (m/s)": "max_wind_speed",
    "VENTO, VELOCIDADE HORARIA (m/s)": "wind_speed"
}

for old_col, new_col in column_mapping.items():
    df = df.withColumnRenamed(old_col, new_col)

df_original = df


In [5]:
df.show(5)

+---+----------+-----+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+--------------+----------+------+-----+-----------------+------------+--------+------------+------+
| id|      date| time|precipitation|atmospheric_pressure|max_atmospheric_pressure|min_atmospheric_pressure|radiation|temperature|dewpoint_temp|max_temp|min_temp|max_dewpoint_temp_|min_dewpoint_temp|max_humidity|min_humidity|humidity|wind_direction|max_wind_speed|wind_speed|region|state|          station|station_code|latitude|   longitude|height|
+---+----------+-----+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+--------------+----------+------+-----+-----------------

In [6]:
# Preprocessing
# 1. Replace missing values (-9999.0) with None
from pyspark.sql.functions import col,when

replace_cols = ['id',
                'precipitation',
                'atmospheric_pressure',
                'max_atmospheric_pressure',
                'min_atmospheric_pressure',
                'radiation',
                'temperature',
                'dewpoint_temp',
                'max_temp',
                'min_temp',
                'max_dewpoint_temp_',
                'min_dewpoint_temp',
                'max_humidity',
                'min_humidity',
                'humidity',
                'wind_direction',
                'wind_speed',
                'region',
                'station',
                'station_code',
                'latitude',
                'longitude',
                'height']

# Use list comprehension to construct the replacement logic for each specified column
replacement_expr = [when(col(c) == -9999.0, None).otherwise(col(c)).alias(c) for c in replace_cols]

# Include 'Date' and 'Time' in the final select statement without applying replacement logic to them
final_select_expr = [col('date'), col('time')] + replacement_expr

# Apply the replacement logic and keep 'Date' and 'Time'
df_withNULL = df.select(final_select_expr)

df_withNULL.show(50)
df_replaceNULL = df_withNULL

+----------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+------------+--------+------------+------+
|      date| time| id|precipitation|atmospheric_pressure|max_atmospheric_pressure|min_atmospheric_pressure|radiation|temperature|dewpoint_temp|max_temp|min_temp|max_dewpoint_temp_|min_dewpoint_temp|max_humidity|min_humidity|humidity|wind_direction|wind_speed|region|          station|station_code|latitude|   longitude|height|
+----------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+------------+--------+------------+------+
|2000-05-07|00:00| 

In [7]:
df_withNULL.show(22)

+----------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+------------+--------+------------+------+
|      date| time| id|precipitation|atmospheric_pressure|max_atmospheric_pressure|min_atmospheric_pressure|radiation|temperature|dewpoint_temp|max_temp|min_temp|max_dewpoint_temp_|min_dewpoint_temp|max_humidity|min_humidity|humidity|wind_direction|wind_speed|region|          station|station_code|latitude|   longitude|height|
+----------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+------------+--------+------------+------+
|2000-05-07|00:00| 

In [8]:
#replace NULL value with the mean of the day (df_withNULL)

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df_withNULL.createOrReplaceTempView("data_tb")

df_null = spark.sql("SELECT * FROM data_tb")
df_null.show()

+----------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+------------+--------+------------+------+
|      date| time| id|precipitation|atmospheric_pressure|max_atmospheric_pressure|min_atmospheric_pressure|radiation|temperature|dewpoint_temp|max_temp|min_temp|max_dewpoint_temp_|min_dewpoint_temp|max_humidity|min_humidity|humidity|wind_direction|wind_speed|region|          station|station_code|latitude|   longitude|height|
+----------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+------------+--------+------------+------+
|2000-05-07|00:00| 

In [12]:
from pyspark.sql.functions import mean, col, round, when

#replace NULL value of column precipitation to the mean of the day
mean_precipitation = df_null.groupBy('date', 'station_code').agg(round(mean('precipitation'), 1).alias('mean_precipitation'))
mean_atmospheric_pressure = df_null.groupBy('date', 'station_code').agg(mean('atmospheric_pressure').alias('mean_atmospheric_pressure'))
mean_max_atmospheric_pressure = df_with_mean.groupBy('date', 'station_code').agg(mean('max_atmospheric_pressure').alias('mean_max_atmospheric_pressure'))
mean_min_atmospheric_pressure = df_with_mean.groupBy('date', 'station_code').agg(mean('min_atmospheric_pressure').alias('mean_min_atmospheric_pressure'))
mean_min_atmospheric_pressure = df_with_mean.groupBy('date', 'station_code').agg(mean('min_atmospheric_pressure').alias('mean_min_atmospheric_pressure'))
mean_radiation = df_with_mean.groupBy('date', 'station_code').agg(mean('radiation').alias('mean_radiation'))
mean_temperature = df_with_mean.groupBy('date', 'station_code').agg(mean('temperature').alias('mean_temperature'))
mean_dewpoint_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('dewpoint_temp').alias('mean_dewpoint_temp'))
mean_max_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('max_temp').alias('mean_max_temp'))
mean_min_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('min_temp').alias('mean_min_temp'))




df_with_mean = df_null.join(mean_precipitation, on=['date', 'station_code'], how='left_outer')
df_with_mean = df_with_mean.withColumn('precipitation', when(col('precipitation').isNull(), col('mean_precipitation')).otherwise(col('precipitation')))
df_with_mean = df_with_mean.drop('mean_precipitation')

# df_with_mean = df_with_mean.join(mean_atmospheric_pressure, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('atmospheric_pressure', when(col('atmospheric_pressure').isNull(), col('mean_atmospheric_pressure')).otherwise(col('atmospheric_pressure')))
# df_with_mean = df_with_mean.drop('mean_atmospheric_pressure')

# df_with_mean = df_with_mean.join(mean_max_atmospheric_pressure, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('max_atmospheric_pressure', when(col('max_atmospheric_pressure').isNull(), col('mean_max_atmospheric_pressure')).otherwise(col('max_atmospheric_pressure')))
# df_with_mean = df_with_mean.drop('mean_max_atmospheric_pressure')


# df_with_mean = df_with_mean.join(mean_min_atmospheric_pressure, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('min_atmospheric_pressure', when(col('min_atmospheric_pressure').isNull(), col('mean_min_atmospheric_pressure')).otherwise(col('min_atmospheric_pressure')))
# df_with_mean = df_with_mean.drop('mean_min_atmospheric_pressure')

# mean_radiation = df_with_mean.groupBy('date', 'station_code').agg(mean('radiation').alias('mean_radiation'))
# df_with_mean = df_with_mean.join(mean_radiation, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('radiation', when(col('radiation').isNull(), col('mean_radiation')).otherwise(col('radiation')))
# df_with_mean = df_with_mean.drop('mean_radiation')

# mean_temperature = df_with_mean.groupBy('date', 'station_code').agg(mean('temperature').alias('mean_temperature'))
# df_with_mean = df_with_mean.join(mean_temperature, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('temperature', when(col('temperature').isNull(), col('mean_temperature')).otherwise(col('temperature')))
# df_with_mean = df_with_mean.drop('mean_temperature')

# mean_dewpoint_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('dewpoint_temp').alias('mean_dewpoint_temp'))
# df_with_mean = df_with_mean.join(mean_dewpoint_temp, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('dewpoint_temp', when(col('dewpoint_temp').isNull(), col('mean_dewpoint_temp')).otherwise(col('dewpoint_temp')))
# df_with_mean = df_with_mean.drop('mean_dewpoint_temp')

# mean_max_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('max_temp').alias('mean_max_temp'))
# df_with_mean = df_with_mean.join(mean_max_temp, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('max_temp', when(col('max_temp').isNull(), col('mean_max_temp')).otherwise(col('max_temp')))
# df_with_mean = df_with_mean.drop('mean_max_temp')

# mean_min_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('min_temp').alias('mean_min_temp'))
# df_with_mean = df_with_mean.join(mean_min_temp, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('min_temp', when(col('min_temp').isNull(), col('mean_min_temp')).otherwise(col('min_temp')))
# df_with_mean = df_with_mean.drop('mean_min_temp')

# mean_max_dewpoint_temp_ = df_with_mean.groupBy('date', 'station_code').agg(mean('max_dewpoint_temp_').alias('mean_max_dewpoint_temp_'))
# df_with_mean = df_with_mean.join(mean_max_dewpoint_temp_, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('max_dewpoint_temp_', when(col('max_dewpoint_temp_').isNull(), col('mean_max_dewpoint_temp_')).otherwise(col('max_dewpoint_temp_')))
# df_with_mean = df_with_mean.drop('mean_max_dewpoint_temp_')

# mean_min_dewpoint_temp = df_with_mean.groupBy('date', 'station_code').agg(mean('min_dewpoint_temp').alias('mean_min_dewpoint_temp'))
# df_with_mean = df_with_mean.join(mean_min_dewpoint_temp, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('min_dewpoint_temp', when(col('min_dewpoint_temp').isNull(), col('mean_min_dewpoint_temp')).otherwise(col('min_dewpoint_temp')))
# df_with_mean = df_with_mean.drop('mean_min_dewpoint_temp')

# mean_max_humidity = df_with_mean.groupBy('date', 'station_code').agg(mean('max_humidity').alias('mean_max_humidity'))
# df_with_mean = df_with_mean.join(mean_max_humidity, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('max_humidity', when(col('max_humidity').isNull(), col('mean_max_humidity')).otherwise(col('max_humidity')))
# df_with_mean = df_with_mean.drop('mean_max_humidity')

# mean_min_humidity = df_with_mean.groupBy('date', 'station_code').agg(mean('min_humidity').alias('mean_min_humidity'))
# df_with_mean = df_with_mean.join(mean_min_humidity, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('min_humidity', when(col('min_humidity').isNull(), col('mean_min_humidity')).otherwise(col('min_humidity')))
# df_with_mean = df_with_mean.drop('mean_min_humidity')

# mean_humidity = df_with_mean.groupBy('date', 'station_code').agg(mean('humidity').alias('mean_humidity'))
# df_with_mean = df_with_mean.join(mean_humidity, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('humidity', when(col('humidity').isNull(), col('mean_humidity')).otherwise(col('humidity')))
# df_with_mean = df_with_mean.drop('mean_humidity')

# mean_wind_direction = df_with_mean.groupBy('date', 'station_code').agg(mean('wind_direction').alias('mean_wind_direction'))
# df_with_mean = df_with_mean.join(mean_wind_direction, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('wind_direction', when(col('wind_direction').isNull(), col('mean_wind_direction')).otherwise(col('wind_direction')))
# df_with_mean = df_with_mean.drop('mean_wind_direction')

# mean_wind_speed = df_with_mean.groupBy('date', 'station_code').agg(mean('wind_speed').alias('mean_wind_speed'))
# df_with_mean = df_with_mean.join(mean_wind_speed, on=['date', 'station_code'], how='left_outer')
# df_with_mean = df_with_mean.withColumn('wind_speed', when(col('wind_speed').isNull(), col('mean_wind_speed')).otherwise(col('wind_speed')))
# df_with_mean = df_with_mean.drop('mean_wind_speed')

+----------+------------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+--------+------------+------+
|      date|station_code| time| id|precipitation|atmospheric_pressure|max_atmospheric_pressure|min_atmospheric_pressure|radiation|temperature|dewpoint_temp|max_temp|min_temp|max_dewpoint_temp_|min_dewpoint_temp|max_humidity|min_humidity|humidity|wind_direction|wind_speed|region|          station|latitude|   longitude|height|
+----------+------------+-----+---+-------------+--------------------+------------------------+------------------------+---------+-----------+-------------+--------+--------+------------------+-----------------+------------+------------+--------+--------------+----------+------+-----------------+--------+------------+------+
|2000-05-07|       

In [None]:
df_filled = df_with_mean.filter(col('date') == '2000-05-07')
df_filled.show()