In [1]:
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from functools import reduce
from datetime import datetime

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.types import FloatType

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:2.4.8 pyspark-shell'

In [2]:
# Auxiliary functions


def load_parquet_from_HDFS(spark, fpath):
    return spark.read.parquet(fpath)


def load_avro_from_HDFS(spark, fpath):
    return spark.read.format("avro").load(fpath)


def load_weather_obs(spark, weather_fpath):
    # process weather data
    _weather_df = load_avro_from_HDFS(spark, weather_fpath)
    weather_cmask = ["Date_time"] + [c for c in _weather_df.columns if (LOCATION in c)]
    _weather_df = _weather_df.select(weather_cmask)
    _weather_df = _weather_df.withColumnRenamed("Date_time", "date")
    _weather_df = _weather_df.withColumn(
        "date", F.to_timestamp(F.col("date"), "yyyy-MM-dd HH:mm:ss").alias("date")
    )

    oldColumns = _weather_df.columns
    newColumns = [c.replace(LOCATION, "").replace("_", "") for c in oldColumns]

    weather_df = reduce(
        lambda _weather_df, idx: _weather_df.withColumnRenamed(
            oldColumns[idx], newColumns[idx]
        ),
        range(len(oldColumns)),
        _weather_df,
    )

    cols = REGRESSORS + [DT_COLUMN]

    return weather_df.select(cols)


def load_ggtrends_obs(spark, ggtrends_fpath):
    _ggtrends_df = load_avro_from_HDFS(spark, ggtrends_fpath)
    ggtrends_cmask = ["date"] + [c for c in _ggtrends_df.columns if (LOCATION in c)]
    _ggtrends_df = _ggtrends_df.select(ggtrends_cmask)
    oldColumns = _ggtrends_df.columns
    newColumns = [c.replace(LOCATION, "").replace("_", "") for c in oldColumns]
    ggtrends_df = reduce(
        lambda _ggtrends_df, idx: _ggtrends_df.withColumnRenamed(
            oldColumns[idx], newColumns[idx]
        ),
        range(len(oldColumns)),
        _ggtrends_df,
    )
    return ggtrends_df.select(
        F.to_timestamp(F.col("date"), "yyyy-MM-dd HH:mm:ss").alias(DT_COLUMN),
        F.col(WEBSITE).alias("y"),
    )


def merge_sources(weather_df, website_df):
    columns = [
        "date",
        "Temperature",
        "RelativeHumidity",
        "WindSpeed",
        "y",
    ]

    df = weather_df.join(website_df, how="inner", on="date").select(*columns)

    return df


def __unify_dtypes(df, ref_df):
    select_expr = [
        F.col(c).cast(t) for c, t in ref_df.dtypes
    ]

    return df.select(*select_expr)


def extend_mod_df(mod_df, new_obs):
    new_obs = __unify_dtypes(df=new_obs, ref_df=mod_df)
    _df = mod_df.unionByName(new_obs)
    df = _df.withColumn(
        "filter_col",
        F.when(
            F.date_format(F.col("date"), "HH:mm:ss").between("07:00:00", "23:00:00"),
            "day",
        ).otherwise("night"),
    )
    
    return df


def split_days_and_nights(df):
    df_day = df.filter(F.col("filter_col") == "day").drop("filter_F.col")
    df_night = df.filter(F.col("filter_col") == "night").drop("filter_F.col")
    return df_day, df_night


def train_test_split(df, columns):
    df = df.withColumn(
        "rank", F.percent_rank().over(Window.partitionBy().orderBy("date"))
    )
    df_train = df.where("rank <= .8").select(*columns).sort("date")
    df_test = df.where("rank > .8").select(*columns).sort("date")
    return df_train, df_test


def scale_df(df, columns_to_scale):
    assemblers = [
        VectorAssembler(inputCols=[col], outputCol=col + "_vec")
        for col in columns_to_scale
    ]
    scalers = [
        MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled")
        for col in columns_to_scale
    ]

    pipeline = Pipeline(stages=assemblers + scalers)

    scalerModel = pipeline.fit(df)
    _df_scaled = scalerModel.transform(df)
    names = {x + "_scaled": x for x in columns_to_scale}

    firstelement = F.udf(lambda v: float(v[0]), FloatType())
    columns_to_select = [F.col("date"), F.col("y")] + [
        firstelement(c).alias(c) for c in names.keys()
    ]
    _df_scaled = _df_scaled.select(columns_to_select)

    names["date"] = "ds"
    names["y"] = "y"

    df_scaled = _df_scaled.select([F.col(c).alias(names[c]) for c in names.keys()])
    return df_scaled


def get_latest_train_details(root_dir="hdfs://cluster-bda2-m/user/root"):
    ls = !hdfs dfs -ls modeling/in
    avail_files = [c for c in ls if f"{str.lower(WEBSITE)}_{str.lower(LOCATION)}_ver" in c]

    files_dict = {}

    for f in avail_files:
        date, time, name = f.split()[-3:]
        creation_timestamp = datetime.strptime(" ".join([date, time]), "%Y-%m-%d %H:%M")

        files_dict[name] = creation_timestamp
    
    newest_fname = max(files_dict, key=files_dict.get)

    return os.path.join(root_dir, newest_fname), files_dict[newest_fname]


def get_version():
    ls = !hdfs dfs -ls modeling/in
    existing_versions = [c for c in ls if f"{str.lower(WEBSITE)}_{str.lower(LOCATION)}_ver" in c]
    return len(existing_versions)+1


def get_new_ggtrends(ref_date):
    ls = !hdfs dfs -ls ggtrends

    avail_files = [c for c in ls if "ggtrends_obs" in c]

    files_dict = {}

    for f in avail_files:
        date, time, name = f.split()[-3:]
        creation_timestamp = datetime.strptime(" ".join([date, time]), "%Y-%m-%d %H:%M")

        files_dict[name] = creation_timestamp

    new_obs = []

    for (key, value) in files_dict.items():
       # Check if key is even then add pair to new dictionary
       if value < ref_date:  # TODO: replace '<' with '>'
            new_obs.append(key)
    return new_obs


def get_new_weather_obs(ref_date):
    ls = !hdfs dfs -ls weather

    avail_files = [c for c in ls if "weather_obs" in c]

    files_dict = {}

    for f in avail_files:
        date, time, name = f.split()[-3:]
        creation_timestamp = datetime.strptime(" ".join([date, time]), "%Y-%m-%d %H:%M")

        files_dict[name] = creation_timestamp

    new_obs = []

    for (key, value) in files_dict.items():
       # Check if key is even then add pair to new dictionary
       if value < ref_date:  # TODO: replace '<' with '>'
            new_obs.append(key)
    return new_obs


In [3]:
# Parameters section

LOCATION = "Hamburg"
WEBSITE = "Instagram"
REGRESSORS = ["Temperature", "RelativeHumidity", "WindSpeed"]
TARGET_COLUMN = "y"
DT_COLUMN = "date"

columns = [
    "date",
    "Temperature",
    "RelativeHumidity",
    "WindSpeed",
    "y"]

columns_to_scale = columns[1:-1]

In [4]:
spark = SparkSession \
    .builder \
    .appName("Time series data analysis with Spark") \
    .config("spark.redis.ssl", "true") \
    .getOrCreate()

In [5]:
# Getting latest files paths
prev_mod_df_fpath, prev_mod_df_creat_dt = get_latest_train_details()
ggtrends_fpaths = get_new_ggtrends(prev_mod_df_creat_dt)
weather_fpaths = get_new_weather_obs(prev_mod_df_creat_dt)

In [6]:
# Data loading
prev_mod_frame = load_parquet_from_HDFS(spark, prev_mod_df_fpath)
prev_mod_frame = prev_mod_frame.select(columns)

weather_df = load_weather_obs(spark, weather_fpaths)
ggtrends_df = load_ggtrends_obs(spark, ggtrends_fpaths)

In [7]:
# TEMPORARY: overwrite column for merge
from pyspark.sql.window import Window  # TODO: delete

w = Window().orderBy(F.lit('A'))  # TODO: delete
__weather_df = weather_df.drop("date").withColumn("date", F.row_number().over(w))  # TODO: delete
__ggtrends_df = ggtrends_df.drop("date").withColumn("date", F.row_number().over(w))  # TODO: delete

# Merge Google Trends and Weather observed values
obs_df = merge_sources(__weather_df, __ggtrends_df)  # TODO: replace __weather_df, __ggtrends_df with weather_df, ggtrends_df
obs_df.printSchema()

# TEMPORARY: restore column after merge
obs_df = obs_df.drop("date").join(ggtrends_df, how="inner", on="y")  # TODO: delete

root
 |-- date: integer (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- RelativeHumidity: string (nullable = true)
 |-- WindSpeed: string (nullable = true)
 |-- y: string (nullable = true)



In [8]:
merged_df = extend_mod_df(mod_df = prev_mod_frame, new_obs=obs_df)

In [9]:
# Save for further reestimation
version = get_version()

merged_df.select(prev_mod_frame.columns).write.mode("overwrite").parquet(
    f"hdfs://cluster-bda2-m/user/root/modeling/in/{str.lower(WEBSITE)}_{str.lower(LOCATION)}_ver{version}.parquet"
)

In [10]:
# Day and night data separation
df_day, df_night = split_days_and_nights(merged_df)

In [11]:
# Train, test split
df_day_train, df_day_test = train_test_split(df_day, columns)
df_night_train, df_night_test = train_test_split(df_night, columns)

In [12]:
# Scaling
df_day_train_scaled = scale_df(df_day_train, columns_to_scale)
df_day_test_scaled = scale_df(df_day_test, columns_to_scale)
df_night_train_scaled = scale_df(df_night_train, columns_to_scale)
df_night_test_scaled = scale_df(df_night_test, columns_to_scale)

In [16]:
# Save data in HDFS
df_day_train_scaled.write.mode("overwrite").parquet(
    f"hdfs://cluster-bda2-m/user/root/modeling/in/{str.lower(WEBSITE)}_{str.lower(LOCATION)}_day_train_ver{version}.parquet"
)
df_day_test_scaled.write.mode("overwrite").parquet(
    f"hdfs://cluster-bda2-m/user/root/modeling/in/{str.lower(WEBSITE)}_{str.lower(LOCATION)}_day_test_ver{version}.parquet"
)
df_night_train_scaled.write.mode("overwrite").parquet(
    f"hdfs://cluster-bda2-m/user/root/modeling/in/{str.lower(WEBSITE)}_{str.lower(LOCATION)}_night_train_ver{version}.parquet"
)
df_night_test_scaled.write.mode("overwrite").parquet(
    f"hdfs://cluster-bda2-m/user/root/modeling/in/{str.lower(WEBSITE)}_{str.lower(LOCATION)}_night_test_ver{version}.parquet"
)