In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1g").\
        config("spark.executor.cores", 1).\
        getOrCreate()

hdfs = "hdfs://namenode:8020"

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/11 02:03:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# READ DATA

In [35]:
trip_data = spark.read.option("header", True).csv(f"{hdfs}/data/yellow_tripdata_2022-01.csv")

In [36]:
trip_data.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [37]:
trip_data.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2022-01-01 00:35:40|  2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.65|         0.0|                  0.3

# FEATURE ENGINEERING

In [38]:
from pyspark.sql import functions as F

trip_data = trip_data.select("VendorID", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "tip_amount")

trip_data.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- tip_amount: string (nullable = true)



In [39]:
trip_data = trip_data.\
withColumn("created", F.to_date(F.col("tpep_dropoff_datetime"))).\
withColumn("tpep_dropoff_datetime", F.date_format(
    F.to_timestamp(
        F.col("tpep_dropoff_datetime")
    ), "dd-MM-yyyy HH:00:00"
))

In [45]:
trip_stats = trip_data.\
groupby("VendorID", "tpep_dropoff_datetime").\
agg(
    F.avg(F.col("passenger_count")),
    F.avg(F.col("trip_distance")),
    F.avg(F.col("tip_amount")),
    F.count(F.col("VendorID")),
    F.min(F.col("trip_distance")),
    F.max(F.col("trip_distance"))
).\
withColumnRenamed("avg(passenger_count)", "avg_pass_count").\
withColumnRenamed("avg(trip_distance)", "avg_trip_dist").\
withColumnRenamed("avg(tip_amount)", "avg_tip").\
withColumnRenamed("count(VendorID)", "num_trip").\
withColumnRenamed("min(trip_distance)", "min_trip_dist").\
withColumnRenamed("max(trip_distance)", "max_trip_dist").\
withColumn("created", F.to_date(F.col("tpep_dropoff_datetime"), "dd-MM-yyyy HH:00:00"))

In [46]:
trip_stats.show()



+--------+---------------------+------------------+------------------+------------------+--------+-------------+-------------+----------+
|VendorID|tpep_dropoff_datetime|    avg_pass_count|     avg_trip_dist|           avg_tip|num_trip|min_trip_dist|max_trip_dist|   created|
+--------+---------------------+------------------+------------------+------------------+--------+-------------+-------------+----------+
|       1|  01-01-2022 00:00:00|1.4976580796252927| 2.637687861271676|2.1924046242774566|     865|          0.0|          9.8|2022-01-01|
|       1|  01-01-2022 01:00:00|1.4939673527324344| 3.248910751932531| 2.607554462403373|    1423|          0.0|          9.9|2022-01-01|
|       1|  01-01-2022 02:00:00|1.3895985401459854|3.0356820234868995|2.4877326106594446|    1107|          0.0|          9.8|2022-01-01|
|       1|  01-01-2022 03:00:00|1.4297297297297298| 3.200942126514135|2.2994751009421255|     743|          0.0|          9.8|2022-01-01|
|       1|  01-01-2022 04:00:00|1.

                                                                                

In [47]:
trip_stats.write.mode("overwrite").partitionBy("created").save(f"{hdfs}/g_features/trip_stats")

                                                                                

# REGISTER FEATURES TO FEATURE STORE

In [13]:
import os 
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource

os.environ["AZURE_TENANT_ID"]="f35cc17d-4ea3-4b5f-9c1e-e6770f7c7603"
os.environ["AZURE_CLIENT_ID"]="5baa3265-c1e8-44fb-bb35-c448ae261d4a"
os.environ["AZURE_CLIENT_SECRET"]="Src8Q~7jJtvkbnsWEzJOu4nS5LnqZOpD4Z_5ia0a"

hdfs = "hdfs://namenode:8020"

# Feature Source Definition
trip_stats_source = SparkSource(
    file_format="parquet",
    path=f"{hdfs}/g_features/trip_stats",
    timestamp_field="tpep_dropoff_datetime",
    created_timestamp_column="created",
    name="trip_stats_hourly"
)



In [15]:
from feast import Feature, FeatureView, ValueType
from datetime import timedelta
from feast import Entity

# Feature Definition
trip_stats_fv = FeatureView(
    name="trip_stats_hour.y",
    entities=["VendorID"],
    features=[
        Feature(name="avg_trip_dist", dtype=ValueType.FLOAT),
        Feature(name="avg_pass_count", dtype=ValueType.FLOAT),
        Feature(name="avg_tip", dtype=ValueType.FLOAT),
    ],
    batch_source=trip_stats_source,
    ttl=timedelta(days=2)
)

# Entity definition => entity == primary key 

vendor = Entity(name="VendorID", value_type=ValueType.INT64)




In [16]:
from feast import FeatureStore
os.environ["AZURE_TENANT_ID"]="f35cc17d-4ea3-4b5f-9c1e-e6770f7c7603"
os.environ["AZURE_CLIENT_ID"]="5baa3265-c1e8-44fb-bb35-c448ae261d4a"
os.environ["AZURE_CLIENT_SECRET"]="Src8Q~7jJtvkbnsWEzJOu4nS5LnqZOpD4Z_5ia0a"
# connect to FS Registry and apply 
fs = FeatureStore("/opt/workspace/feature_repo")
fs.apply([vendor, trip_stats_fv])

In [21]:
fs_sources = fs.list_data_sources()

fs_sources

[<feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource at 0x7f924143de20>]

In [22]:
feature_views = fs.list_feature_views()



In [23]:
feature_views

[<FeatureView(name = trip_stats_hour.y, entities = ['VendorID'], stream_source = None, batch_source = {
   "type": "BATCH_SPARK",
   "timestampField": "tpep_dropoff_datetime",
   "createdTimestampColumn": "created",
   "dataSourceClassType": "feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource",
   "name": "trip_stats_hourly",
   "sparkOptions": {
     "path": "hdfs://namenode:8020/g_features/trip_stats",
     "fileFormat": "parquet"
   }
 }, source = {
   "type": "BATCH_SPARK",
   "timestampField": "tpep_dropoff_datetime",
   "createdTimestampColumn": "created",
   "dataSourceClassType": "feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource",
   "name": "trip_stats_hourly",
   "sparkOptions": {
     "path": "hdfs://namenode:8020/g_features/trip_stats",
     "fileFormat": "parquet"
   }
 }, ttl = 2 days, 0:00:00, schema = [avg_trip_dist-Float32, avg_pass_count-Float32, avg_tip-Float32], features = [avg_trip_dist-Float32, avg_pass_c

In [24]:
import pandas

In [25]:
import yaml

In [1]:
import cookiecutter