In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, hour, minute, sin, cos, radians
from pyspark.ml.feature import StandardScaler, MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType


team = 29

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()


table_name = "team29_projectdb.dataset_part"
df = spark.read.table(table_name)


print("Shape of DataFrame: ({}, {})".format(df.count(), len(df.columns)))

Shape of DataFrame: (771456, 26)


In [2]:
# Convert columns to right type
df = df.withColumn("date_time", df["date_time"].cast("timestamp"))
df = df.withColumn("city_id", df["city_id"].cast("int"))

df.show(5)

+-------------------+--------+--------+----------+--------+----------+----------+-----------------+--------+--------+--------+--------+---------+----------+----------+----------+---------+----------+--------+-------+--------+----+----------+--------+----------+-------+
|          date_time|max_temp|min_temp|total_snow|sun_hour|uv_index_1|uv_index_2|moon_illumunation|moonrise| moonset| sunrise|  sunset|dew_point|feels_like|heat_index|wind_chill|wind_gust|cloudcover|humidity| precip|pressure|temp|visibility|wind_dir|wind_speed|city_id|
+-------------------+--------+--------+----------+--------+----------+----------+-----------------+--------+--------+--------+--------+---------+----------+----------+----------+---------+----------+--------+-------+--------+----+----------+--------+----------+-------+
|2008-12-31 21:00:00|      31|      13|   0.00000|11.00000|         6|         1|               31|26220000|69360000|14820000|54540000|        7|        18|        18|        18|        7|  

In [3]:
# Define columns for scaling with standard scaling
sc_columns = [
    "max_temp",
    "min_temp",
    "feels_like",
    "heat_index",
    "wind_chill",
    "precip",
    "pressure",
    "temp"
]
# Define columns for scaling with min-max scaling

minmax_columns = [
    "sun_hour",
    "uv_index_1",
    "uv_index_2",
    "moon_illumunation",
    "dew_point",
    "wind_gust",
    "cloudcover",
    "humidity",
    "visibility",
    "wind_speed",
]

to_drop = [
    "sunrise",
    "sunset",
    "moonrise",
    "moonset"
]

df = df.drop(*to_drop)

In [4]:
from tqdm import tqdm

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in tqdm(minmax_columns):
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

100%|██████████| 10/10 [01:03<00:00,  6.31s/it]


In [5]:
# Iterating over columns to be scaled
for i in tqdm(sc_columns):
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    scaler = StandardScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")


df = df.drop(*sc_columns)
df = df.drop(*minmax_columns)

100%|██████████| 8/8 [00:48<00:00,  6.12s/it]


In [6]:
df.show(5)

+-------------------+----------+--------+-------+---------------+-----------------+-----------------+------------------------+----------------+----------------+-----------------+---------------+-----------------+-----------------+---------------+---------------+-----------------+-----------------+-----------------+-------------+---------------+-----------+
|          date_time|total_snow|wind_dir|city_id|sun_hour_Scaled|uv_index_1_Scaled|uv_index_2_Scaled|moon_illumunation_Scaled|dew_point_Scaled|wind_gust_Scaled|cloudcover_Scaled|humidity_Scaled|visibility_Scaled|wind_speed_Scaled|max_temp_Scaled|min_temp_Scaled|feels_like_Scaled|heat_index_Scaled|wind_chill_Scaled|precip_Scaled|pressure_Scaled|temp_Scaled|
+-------------------+----------+--------+-------+---------------+-----------------+-----------------+------------------------+----------------+----------------+-----------------+---------------+-----------------+-----------------+---------------+---------------+-----------------+--

In [7]:
# Convert wind_dir to radians and calculate sine and cosine values
df = df.withColumn("wind_dir_rad", radians(df["wind_dir"]))
df = df.withColumn("wind_dir_sin", sin(df["wind_dir_rad"]))
df = df.withColumn("wind_dir_cos", cos(df["wind_dir_rad"]))

In [8]:
# # # Drop the original wind_dir column
df_scaled = df.drop("wind_dir")
df_scaled = df_scaled.drop("wind_dir_rad")

In [9]:
# # Extract date components
df_scaled = df_scaled.withColumn("year", year("date_time"))
df_scaled = df_scaled.withColumn("month", month("date_time"))
df_scaled = df_scaled.withColumn("day", dayofmonth("date_time"))
df_scaled = df_scaled.withColumn("hour", hour("date_time"))
df_scaled = df_scaled.withColumn("minute", minute("date_time"))

In [10]:
# Calculate sine and cosine values for date components
df_scaled = df_scaled.withColumn("date_time_year_sin", sin(2 * 3.141592653589793238 * (year(df["date_time"]) - 2000) / 100))
df_scaled = df_scaled.withColumn("date_time_year_cos", cos(2 * 3.141592653589793238 * (year(df["date_time"]) - 2000) / 100))

df_scaled = df_scaled.withColumn("date_time_month_sin", sin(2 * 3.141592653589793238 * month(df["date_time"]) / 12))
df_scaled = df_scaled.withColumn("date_time_month_cos", cos(2 * 3.141592653589793238 * month(df["date_time"]) / 12))

df_scaled = df_scaled.withColumn("date_time_day_sin", sin(2 * 3.141592653589793238 * dayofmonth(df["date_time"]) / 31))
df_scaled = df_scaled.withColumn("date_time_day_cos", cos(2 * 3.141592653589793238 * dayofmonth(df["date_time"]) / 31))

df_scaled = df_scaled.withColumn("date_time_hour_sin", sin(2 * 3.141592653589793238 * hour(df["date_time"]) / 24))
df_scaled = df_scaled.withColumn("date_time_hour_cos", cos(2 * 3.141592653589793238 * hour(df["date_time"]) / 24))

df_scaled = df_scaled.withColumn("date_time_minute_sin", sin(2 * 3.141592653589793238 * minute(df["date_time"]) / 60))
df_scaled = df_scaled.withColumn("date_time_minute_cos", cos(2 * 3.141592653589793238 * minute(df["date_time"]) / 60))

# Drop original date_time and date components columns
df_scaled = df_scaled.drop("date_time", "year", "month", "day", "hour", "minute")

In [11]:
df_scaled.columns

['total_snow',
 'city_id',
 'sun_hour_Scaled',
 'uv_index_1_Scaled',
 'uv_index_2_Scaled',
 'moon_illumunation_Scaled',
 'dew_point_Scaled',
 'wind_gust_Scaled',
 'cloudcover_Scaled',
 'humidity_Scaled',
 'visibility_Scaled',
 'wind_speed_Scaled',
 'max_temp_Scaled',
 'min_temp_Scaled',
 'feels_like_Scaled',
 'heat_index_Scaled',
 'wind_chill_Scaled',
 'precip_Scaled',
 'pressure_Scaled',
 'temp_Scaled',
 'wind_dir_sin',
 'wind_dir_cos',
 'date_time_year_sin',
 'date_time_year_cos',
 'date_time_month_sin',
 'date_time_month_cos',
 'date_time_day_sin',
 'date_time_day_cos',
 'date_time_hour_sin',
 'date_time_hour_cos',
 'date_time_minute_sin',
 'date_time_minute_cos']

In [12]:
print("Shape of DataFrame: ({}, {})".format(df_scaled.count(), len(df_scaled.columns)))

Shape of DataFrame: (771456, 32)


In [13]:
df.dropna()
print("Shape of DataFrame: ({}, {})".format(df_scaled.count(), len(df_scaled.columns)))

Shape of DataFrame: (771456, 32)


In [14]:
# One-hot encode categorical variable 'city_id'
# encoder = OneHotEncoder(inputCols=["city_id"], outputCols=["city_id_encoded"])
# df_encoded = encoder.fit(df_scaled).transform(df_scaled)

import pyspark.sql.functions as F 

categ = df_scaled.select('city_id').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('city_id') == cat,1).otherwise(0)\
            .alias(str(cat)) for cat in categ]
df_encoded = df_scaled.select(exprs+df_scaled.columns)

# Drop original 'city_id' column
df_encoded = df_encoded.drop("city_id")

print(df_encoded.columns)
print((df_encoded.count(), len(df_encoded.columns)))

['3', '4', '8', '6', '5', '7', '1', '2', 'total_snow', 'sun_hour_Scaled', 'uv_index_1_Scaled', 'uv_index_2_Scaled', 'moon_illumunation_Scaled', 'dew_point_Scaled', 'wind_gust_Scaled', 'cloudcover_Scaled', 'humidity_Scaled', 'visibility_Scaled', 'wind_speed_Scaled', 'max_temp_Scaled', 'min_temp_Scaled', 'feels_like_Scaled', 'heat_index_Scaled', 'wind_chill_Scaled', 'precip_Scaled', 'pressure_Scaled', 'temp_Scaled', 'wind_dir_sin', 'wind_dir_cos', 'date_time_year_sin', 'date_time_year_cos', 'date_time_month_sin', 'date_time_month_cos', 'date_time_day_sin', 'date_time_day_cos', 'date_time_hour_sin', 'date_time_hour_cos', 'date_time_minute_sin', 'date_time_minute_cos']
(771456, 39)


In [15]:
df_encoded.printSchema()

root
 |-- 3: integer (nullable = false)
 |-- 4: integer (nullable = false)
 |-- 8: integer (nullable = false)
 |-- 6: integer (nullable = false)
 |-- 5: integer (nullable = false)
 |-- 7: integer (nullable = false)
 |-- 1: integer (nullable = false)
 |-- 2: integer (nullable = false)
 |-- total_snow: decimal(10,5) (nullable = true)
 |-- sun_hour_Scaled: double (nullable = true)
 |-- uv_index_1_Scaled: double (nullable = true)
 |-- uv_index_2_Scaled: double (nullable = true)
 |-- moon_illumunation_Scaled: double (nullable = true)
 |-- dew_point_Scaled: double (nullable = true)
 |-- wind_gust_Scaled: double (nullable = true)
 |-- cloudcover_Scaled: double (nullable = true)
 |-- humidity_Scaled: double (nullable = true)
 |-- visibility_Scaled: double (nullable = true)
 |-- wind_speed_Scaled: double (nullable = true)
 |-- max_temp_Scaled: double (nullable = true)
 |-- min_temp_Scaled: double (nullable = true)
 |-- feels_like_Scaled: double (nullable = true)
 |-- heat_index_Scaled: double (

In [16]:
df_encoded.show(10)

+---+---+---+---+---+---+---+---+----------+---------------+-----------------+-----------------+------------------------+----------------+----------------+-----------------+---------------+-----------------+-----------------+---------------+---------------+-----------------+-----------------+-----------------+-------------+---------------+-----------+-------------------+-------------------+------------------+------------------+--------------------+-------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|  3|  4|  8|  6|  5|  7|  1|  2|total_snow|sun_hour_Scaled|uv_index_1_Scaled|uv_index_2_Scaled|moon_illumunation_Scaled|dew_point_Scaled|wind_gust_Scaled|cloudcover_Scaled|humidity_Scaled|visibility_Scaled|wind_speed_Scaled|max_temp_Scaled|min_temp_Scaled|feels_like_Scaled|heat_index_Scaled|wind_chill_Scaled|precip_Scaled|pressure_Scaled|temp_Scaled|       wind_dir_sin|       wind_dir_cos|date_time_

In [17]:
# Save the preprocessed data to a new table
table_name = "team29_projectdb.dataset_prepared_for_modeling"
df_encoded.write.mode("overwrite").saveAsTable(table_name)