In [18]:
import sys
from pathlib import Path

parent_dir = Path().resolve().parent

# Перевірити, чи вже доданий шлях, і додати його, якщо ні
if str(parent_dir) not in sys.path:
    sys.path.insert(0, str(parent_dir))

parent_dir = str(parent_dir).replace("\\", "/") + "/"

In [19]:
from pyspark.sql import SparkSession

spark:SparkSession = SparkSession.builder.appName("process_ukrainian_price").getOrCreate()

### USD CURRENCY PROCESING

In [98]:
from data.schema.usd_currency_dataset import STOCK_PRICE_UNPROCESED_COLUMNS, STOCK_PRICE_UNPROCESED_SCHEMA
csv_file_path = parent_dir + "data/raw/usd_uah_currency.csv"

df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(STOCK_PRICE_UNPROCESED_SCHEMA)
    .load(csv_file_path)
)

In [99]:
df.show(5)

+----------+-------+-------+-------+-------+--------+
|      Date|  Price|   Open|   High|    Low|Change %|
+----------+-------+-------+-------+-------+--------+
|10/01/2024|41.3097|41.2118|41.4322|41.1114|    NULL|
|09/01/2024|   41.1|  41.15| 41.583|40.8925|    NULL|
|08/01/2024|   41.0|  41.05|41.5253| 40.441|    NULL|
|07/01/2024|   40.9|  40.55|41.7096|40.3965|    NULL|
|06/01/2024|   40.4|  40.65|40.8663|40.0075|    NULL|
+----------+-------+-------+-------+-------+--------+
only showing top 5 rows



In [112]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, year, month, avg, first, row_number, lit, to_date, when
from pyspark.sql.window import Window

from entities.base_period_model import ENG_BASE_PERIOD_VALUES


def transform_to_basis(df, basis_period):
    window_spec_month = Window.orderBy(STOCK_PRICE_UNPROCESED_COLUMNS.DATE)

    if basis_period == ENG_BASE_PERIOD_VALUES.DecemberPreviousYearPrice:
        window_spec_december = Window.orderBy("Year")
        december_prices = (
            df.filter(month("Date") == 12)
            .select(year("Date").alias("Year"), col("Price").alias("DecemberPrice"))
        )

        all_years = df.select(year("Date").alias("Year")).distinct()

        december_prices = all_years.join(
            december_prices, on="Year", how="left")

        december_prices = december_prices.withColumn(
            "PreviousDecemberPrice", lag(
                "DecemberPrice", 1).over(window_spec_december)
        )

        df = df.withColumn("Year", year("Date"))
        df = df.join(december_prices.select("Year", "DecemberPrice",
                     "PreviousDecemberPrice"), on="Year", how="left")

        df = df.withColumn(basis_period, when(col("PreviousDecemberPrice").isNotNull(
        ), col("Price") / col("PreviousDecemberPrice") * 100).otherwise(None))

    elif basis_period == ENG_BASE_PERIOD_VALUES.PreviousMonthPrice:
        df = df.withColumn("PreviousMonthPrice", lag(
            "Price", 1).over(window_spec_month))
        df = df.withColumn(basis_period, col("Price") /
                           col("PreviousMonthPrice") * 100)

    elif basis_period == ENG_BASE_PERIOD_VALUES.December2010Price:
        december_2010_price = df.filter(year("Date") == 2010).filter(
            month("Date") == 12).select("Price").first()[0]
        df = df.withColumn(basis_period, col(
            "Price") / december_2010_price * 100)

    elif basis_period == ENG_BASE_PERIOD_VALUES.SamePeriodPreviousYearPrice:
        df = df.withColumn("PreviousYearPrice", lag(
            "Price", 12).over(window_spec_month))
        df = df.withColumn(basis_period, col("Price") /
                           col("PreviousYearPrice") * 100)

    elif basis_period == ENG_BASE_PERIOD_VALUES.SameMonthPreviousYearPrice:
        df = df.withColumn("PreviousYearPrice", lag(
            "Price", 12).over(window_spec_month))
        df = df.withColumn(basis_period, col("Price") /
                           col("PreviousYearPrice") * 100)

    else:
        raise ValueError("Невідомий базисний період")

    return df


# Використання функції для різних базисних періодів
basis_periods = [
    ENG_BASE_PERIOD_VALUES.DecemberPreviousYearPrice,
    ENG_BASE_PERIOD_VALUES.PreviousMonthPrice,
    ENG_BASE_PERIOD_VALUES.December2010Price,
    ENG_BASE_PERIOD_VALUES.SamePeriodPreviousYearPrice,
    ENG_BASE_PERIOD_VALUES.SameMonthPreviousYearPrice,
]

df = df.withColumn(STOCK_PRICE_UNPROCESED_COLUMNS.DATE, to_date(col(
    STOCK_PRICE_UNPROCESED_COLUMNS.DATE), 'MM/dd/yyyy'))

w = Window().orderBy(lit('A'))
df_final = df.select(STOCK_PRICE_UNPROCESED_COLUMNS.DATE,
                     STOCK_PRICE_UNPROCESED_COLUMNS.PRICE).withColumn('id', row_number().over(w))

for period in basis_periods:
    transformed_df = transform_to_basis(df_final, period)
    df_final = df_final.join(transformed_df.select('id', period), on=['id']).orderBy("Date",ascending=False)

df_final = df_final.drop('id')

In [116]:
df_final.show(1000)

+----------+-------+--------------------------------+---------------------+------------------+---------------------------------------+--------------------------------------+
|      Date|  Price|To December of the previous year|To the previous month|  To December 2010|To the same period of the previous year|To the same month of the previous year|
+----------+-------+--------------------------------+---------------------+------------------+---------------------------------------+--------------------------------------+
|2024-10-01|41.3097|              108.56688788569421|   100.51022273934626| 518.6402915357785|                     114.11519099883209|                    114.11519099883209|
|2024-09-01|   41.1|              108.01576688098815|   100.24389871736852| 516.0075039140756|                      111.3670229876218|                     111.3670229876218|
|2024-08-01|   41.0|              107.75295879655673|   100.24449503762933| 514.7520303145102|                     111.42818872213

In [114]:
df_final.write.mode("overwrite").option("header", True).option("delimiter", ",").csv(
    parent_dir + "data/processed/usd_uah_currency_with_basis.csv"
)

In [117]:
df_final.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Price: float (nullable = true)
 |-- To December of the previous year: double (nullable = true)
 |-- To the previous month: double (nullable = true)
 |-- To December 2010: double (nullable = true)
 |-- To the same period of the previous year: double (nullable = true)
 |-- To the same month of the previous year: double (nullable = true)



### Average Salary Processing

In [123]:
from data.schema.average_salary import AVERAGE_SALARY_INITIAL_COLUMNS, AVERAGE_SALARY_INITIAL_SCHEMA
csv_file_path = parent_dir + "data/raw/average_salary.csv"

df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(AVERAGE_SALARY_INITIAL_SCHEMA)
    .load(csv_file_path)
)

In [124]:
df.show(5)

+-------+------+
|   Date|Salary|
+-------+------+
|2003-01|332.11|
|2003-02|326.96|
|2003-03|346.18|
|2003-04|347.13|
|2003-05|353.19|
+-------+------+
only showing top 5 rows



In [125]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, year, month, avg, first, row_number, lit, to_date, when
from pyspark.sql.window import Window

from entities.base_period_model import ENG_BASE_PERIOD_VALUES


def transform_to_basis(df, basis_period):
    window_spec_month = Window.orderBy(AVERAGE_SALARY_INITIAL_COLUMNS.DATE)

    if basis_period == ENG_BASE_PERIOD_VALUES.DecemberPreviousYearPrice:
        window_spec_december = Window.orderBy("Year")
        december_prices = (
            df.filter(month("Date") == 12)
            .select(year("Date").alias("Year"), col(AVERAGE_SALARY_INITIAL_COLUMNS.SALARY).alias("DecemberPrice"))
        )

        all_years = df.select(year("Date").alias("Year")).distinct()

        december_prices = all_years.join(
            december_prices, on="Year", how="left")

        december_prices = december_prices.withColumn(
            "PreviousDecemberPrice", lag(
                "DecemberPrice", 1).over(window_spec_december)
        )

        df = df.withColumn("Year", year("Date"))
        df = df.join(december_prices.select("Year", "DecemberPrice",
                     "PreviousDecemberPrice"), on="Year", how="left")

        df = df.withColumn(basis_period, when(col("PreviousDecemberPrice").isNotNull(
        ), col(AVERAGE_SALARY_INITIAL_COLUMNS.SALARY) / col("PreviousDecemberPrice") * 100).otherwise(None))

    elif basis_period == ENG_BASE_PERIOD_VALUES.PreviousMonthPrice:
        df = df.withColumn("PreviousMonthPrice", lag(
            AVERAGE_SALARY_INITIAL_COLUMNS.SALARY, 1).over(window_spec_month))
        df = df.withColumn(basis_period, col(AVERAGE_SALARY_INITIAL_COLUMNS.SALARY) /
                           col("PreviousMonthPrice") * 100)

    elif basis_period == ENG_BASE_PERIOD_VALUES.December2010Price:
        december_2010_price = df.filter(year("Date") == 2010).filter(
            month("Date") == 12).select(AVERAGE_SALARY_INITIAL_COLUMNS.SALARY).first()[0]
        df = df.withColumn(basis_period, col(
            AVERAGE_SALARY_INITIAL_COLUMNS.SALARY) / december_2010_price * 100)

    elif basis_period == ENG_BASE_PERIOD_VALUES.SamePeriodPreviousYearPrice:
        df = df.withColumn("PreviousYearPrice", lag(
            AVERAGE_SALARY_INITIAL_COLUMNS.SALARY, 12).over(window_spec_month))
        df = df.withColumn(basis_period, col(AVERAGE_SALARY_INITIAL_COLUMNS.SALARY) /
                           col("PreviousYearPrice") * 100)

    elif basis_period == ENG_BASE_PERIOD_VALUES.SameMonthPreviousYearPrice:
        df = df.withColumn("PreviousYearPrice", lag(
            AVERAGE_SALARY_INITIAL_COLUMNS.SALARY, 12).over(window_spec_month))
        df = df.withColumn(basis_period, col(AVERAGE_SALARY_INITIAL_COLUMNS.SALARY) /
                           col("PreviousYearPrice") * 100)

    else:
        raise ValueError("Невідомий базисний період")

    return df


# Використання функції для різних базисних періодів
basis_periods = [
    ENG_BASE_PERIOD_VALUES.DecemberPreviousYearPrice,
    ENG_BASE_PERIOD_VALUES.PreviousMonthPrice,
    ENG_BASE_PERIOD_VALUES.December2010Price,
    ENG_BASE_PERIOD_VALUES.SamePeriodPreviousYearPrice,
    ENG_BASE_PERIOD_VALUES.SameMonthPreviousYearPrice,
]

df = df.withColumn(AVERAGE_SALARY_INITIAL_COLUMNS.DATE, to_date(col(
    AVERAGE_SALARY_INITIAL_COLUMNS.DATE), 'yyyy-MM'))

w = Window().orderBy(lit('A'))
df_final = df.select(AVERAGE_SALARY_INITIAL_COLUMNS.DATE,
                     AVERAGE_SALARY_INITIAL_COLUMNS.SALARY).withColumn('id', row_number().over(w))

for period in basis_periods:
    transformed_df = transform_to_basis(df_final, period)
    df_final = df_final.join(transformed_df.select('id', period), on=[
                             'id']).orderBy("Date", ascending=False)

df_final = df_final.drop('id')

In [126]:
df_final.show(1000)

+----------+--------+--------------------------------+---------------------+------------------+---------------------------------------+--------------------------------------+
|      Date|  Salary|To December of the previous year|To the previous month|  To December 2010|To the same period of the previous year|To the same month of the previous year|
+----------+--------+--------------------------------+---------------------+------------------+---------------------------------------+--------------------------------------+
|2024-06-01|18806.63|              111.69929175489666|   109.47537178044293| 808.2824440336623|                     117.45056635560974|                    117.45056635560974|
|2024-05-01|17178.87|              102.03143404611028|   101.96396637857421| 738.3235433579562|                     121.75271352537615|                    121.75271352537615|
|2024-04-01|16847.98|              100.06616814736844|   109.16160288567814| 724.1024153735754|                     123.64565

In [127]:
df_final.write.mode("overwrite").option("header", True).option("delimiter", ",").csv(
    parent_dir + "data/processed/average_salary.csv"
)

In [128]:
df_final.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Salary: float (nullable = true)
 |-- To December of the previous year: double (nullable = true)
 |-- To the previous month: double (nullable = true)
 |-- To December 2010: double (nullable = true)
 |-- To the same period of the previous year: double (nullable = true)
 |-- To the same month of the previous year: double (nullable = true)

