## Set SparkSession and Feathr client

#### Imports

In [1]:
import glob
import os
from pathlib import Path

import feathr
import pandas as pd
from feathr import (
    BOOLEAN,
    FLOAT,
    INPUT_CONTEXT,
    INT32,
    BackfillTime,
    DerivedFeature,
    FeathrClient,
    Feature,
    FeatureAnchor,
    FeatureQuery,
    HdfsSource,
    MaterializationSettings,
    ObservationSettings,
    RedisSink,
    TypedKey,
    ValueType,
    WindowAggTransformation,
)
from feathr.datasets.utils import maybe_download
from feathr.utils.config import generate_config
from feathr.utils.job_utils import get_result_df
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand
from pyspark.sql.types import DoubleType, IntegerType

# PATH_TO_APP_DATA = "hdfs://namenode:9000/data"
PATH_TO_APP_DATA = "s3a://data-bucket"

print(f"Feathr version: {feathr.__version__}")

Feathr version: 1.0.0


#### SparkSession

In [None]:
spark = (
    SparkSession.builder.appName("write-synthetic-parquet-to-hdfs")  # type: ignore[attr-defined]
    .master("local[*]")
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9007")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config(
        "spark.jars",
        "/home/nazarov.aleksey64/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.3.4.jar,"
        "/home/nazarov.aleksey64/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.12.406.jar"
    )
    .getOrCreate()
)

    # .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.406")

26/01/26 09:34:43 WARN Utils: Your hostname, PF5L73QZ resolves to a loopback address: 127.0.1.1; using 192.168.123.113 instead (on interface wlp9s0f0)
26/01/26 09:34:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/26 09:36:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Feathr client

In [3]:
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"
os.environ['REDIS_PASSWORD'] = ""

jar_name = glob.glob("./*.jar")[0]
print(f"Found jar file at {jar_name}")

feathr_workspace_folder = Path("./feathr_config.yaml")

client = FeathrClient(str(feathr_workspace_folder))

2026-01-26 14:37:01.031 | INFO     | feathr.utils._env_config_reader:get:60 - Config secrets__azure_key_vault__name is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2026-01-26 14:37:01.036 | INFO     | feathr.utils._env_config_reader:get:60 - Config offline_store__s3__s3_enabled is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2026-01-26 14:37:01.038 | INFO     | feathr.utils._env_config_reader:get:60 - Config offline_store__adls__adls_enabled is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2026-01-26 14:37:01.040 | INFO     | feathr.utils._env_config_reader:get:60 - Config offline_store__wasb__wasb_enabled is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2026-01-26 14:37:01

2026-01-26 14:37:01.055 | INFO     | feathr.utils._env_config_reader:get:60 - Config feature_registry__purview__purview_name is not found in the environment variable, configuration file, or the remote key value store. Returning the default value: None.
2026-01-26 14:37:01.067 | INFO     | feathr.client:__init__:216 - Feathr client 1.0.0 initialized successfully.


Found jar file at ./feathr_2.12-1.0.0.jar


## Code

### Upload quick start data to hdfs

In [None]:
quick_start_data_list = !ls feathr_data/

for i in quick_start_data_list:
    df_name = i.split(".")[0]
    hdfs_path = f"{PATH_TO_APP_DATA}/{df_name}"

    df = spark.createDataFrame(pd.read_csv(f"feathr_data/{i}"))
    df.repartition(1).write.mode("overwrite").parquet(hdfs_path)
    
    last_path = hdfs_path.split("/")[-1]

    if "observation" in last_path:
        print(f"====== {last_path} ======")
        spark.read.parquet(f"{hdfs_path}").show(5)

26/01/26 09:37:06 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
26/01/26 09:37:08 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Try Feathr

#### Define Feathr source

In [52]:
batch_source = HdfsSource(
    name="user_observation",
    path=f"{PATH_TO_APP_DATA}/user_observation",
    event_timestamp_column="event_timestamp",
    timestamp_format="yyyy-MM-dd",
)

#### 