In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=e965dcbf60dd930be92fd38e9551ec16afc18e9442c3ffb36b507b69219829ac
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [18]:
import holidays
from pyspark.sql.types import *
from pyspark.ml import Transformer
from pyspark.sql.functions import lit, udf
from pyspark.ml.param.shared import HasInputCols, HasOutputCol
from datetime import date

In [26]:
# Create a SparkSession
#spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

data = [('2022-02-01',1, 1, 13),('2023-01-02', 1, 1, 11), ('2024-04-23', 1, 1, 14), ('2023-03-04', 1, 1, 10), ('2021-05-05', 1, 1, 10) ]
cols = ['date', 'company', 'product', 'barcode']

df = spark.createDataFrame(data, cols).withColumn('date', F.to_date('date', 'yyy-MM-dd'))

In [27]:
df.show()

+----------+-------+-------+-------+
|      date|company|product|barcode|
+----------+-------+-------+-------+
|2022-02-01|      1|      1|     13|
|2023-01-02|      1|      1|     11|
|2024-04-23|      1|      1|     14|
|2023-03-04|      1|      1|     10|
|2021-05-05|      1|      1|     10|
+----------+-------+-------+-------+



In [19]:
class AddDateFeaturesTransformer(Transformer, HasInputCols, HasOutputCol):
    def __init__(self, inputCol=None, outputCols=None, country_code=None):
        super(AddDateFeaturesTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCols = outputCols
        self.country_code = country_code

    def is_holiday(self, date_str: date, country_code: str='TR'):
        date_str = str(date_str)
        country_holidays = holidays.CountryHoliday(country_code)
        date_obj = date.fromisoformat(date_str)
        if date_obj in country_holidays:
            return 1
        else:
            return 0

    def _transform(self, df):
        is_holiday = udf(self.is_holiday, IntegerType())

        df = df.withColumn(self.outputCols[0], F.year(self.inputCol)) \
        .withColumn(self.outputCols[1], F.month(self.inputCol)) \
        .withColumn(self.outputCols[2], F.dayofweek(self.inputCol)) \
        .withColumn(self.outputCols[3], is_holiday(self.inputCol))

        return df

In [31]:
AddDateFeaturesTransformer(inputCol='date', outputCols=['year', 'month', 'dayofweek', 'is_holiday'],
                           country_code='TR').transform(df).show()

+----------+-------+-------+-------+----+-----+---------+----------+
|      date|company|product|barcode|year|month|dayofweek|is_holiday|
+----------+-------+-------+-------+----+-----+---------+----------+
|2022-02-01|      1|      1|     13|2022|    2|        3|         0|
|2023-01-02|      1|      1|     11|2023|    1|        2|         0|
|2024-04-23|      1|      1|     14|2024|    4|        3|         1|
|2023-03-04|      1|      1|     10|2023|    3|        7|         0|
|2021-05-05|      1|      1|     10|2021|    5|        4|         0|
+----------+-------+-------+-------+----+-----+---------+----------+

