In [1]:
import pyspark
from utils import Utils

In [2]:
INGEST_DATA_REPO_PATHS = {
    'weather_data_path': "https://raw.githubusercontent.com/dmatrix/olt-mlflow/master/model_registery/notebooks/data/windfarm_data.csv",
    'serve_data_path': "https://raw.githubusercontent.com/dmatrix/olt-mlflow/master/model_registery/notebooks/data/score_windfarm_data.csv"

}

#### Create PySpark session

In [3]:
spark = pyspark.sql.SparkSession.builder.appName("DeltaLakeToFeast") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

#### Read data from the CSV files 

In [5]:
weather_data_path = INGEST_DATA_REPO_PATHS.get('weather_data_path')
serve_data_path = INGEST_DATA_REPO_PATHS.get('serve_data_path')
weather_data = Utils.load_csv_data(weather_data_path)
serve_data = Utils.load_csv_data(serve_data_path)

In [6]:
weather_data.head()

Unnamed: 0,year_month_day,temperature_00,wind_direction_00,wind_speed_00,temperature_08,wind_direction_08,wind_speed_08,temperature_16,wind_direction_16,wind_speed_16,power
0,2014-01-01,4.702022,106.74259,4.743292,7.189482,100.41638,6.593832,8.172301,99.288,5.967206,1959.3535
1,2014-01-02,7.695733,98.036705,6.142715,9.977118,94.03181,4.383676,9.690135,204.25444,1.696528,1266.6239
2,2014-01-03,9.608235,274.0612,10.514304,10.840864,242.87563,16.869741,8.991079,250.2683,12.038399,7545.6797
3,2014-01-04,6.955563,257.91022,7.18917,5.317223,254.2617,9.069233,3.021174,284.06537,4.590843,3791.0408
4,2014-01-05,0.830547,265.3944,4.263086,2.480239,104.79496,3.042063,4.227131,263.4169,3.899182,880.6115


#### Add `datetime` and `created` for the offline table for Feast to ingest from

In [8]:
weather_data = Utils.to_feast_fmt(weather_data)
serve_data = Utils.to_feast_fmt(serve_data)
weather_data.head(5)

Unnamed: 0,year_month_day,temperature_00,wind_direction_00,wind_speed_00,temperature_08,wind_direction_08,wind_speed_08,temperature_16,wind_direction_16,wind_speed_16,power,datetime,created
0,2014-01-01,4.702022,106.74259,4.743292,7.189482,100.41638,6.593832,8.172301,99.288,5.967206,1959.3535,2014-01-01,2021-05-26 11:47:21.800
1,2014-01-02,7.695733,98.036705,6.142715,9.977118,94.03181,4.383676,9.690135,204.25444,1.696528,1266.6239,2014-01-02,2021-05-26 11:47:21.800
2,2014-01-03,9.608235,274.0612,10.514304,10.840864,242.87563,16.869741,8.991079,250.2683,12.038399,7545.6797,2014-01-03,2021-05-26 11:47:21.800
3,2014-01-04,6.955563,257.91022,7.18917,5.317223,254.2617,9.069233,3.021174,284.06537,4.590843,3791.0408,2014-01-04,2021-05-26 11:47:21.800
4,2014-01-05,0.830547,265.3944,4.263086,2.480239,104.79496,3.042063,4.227131,263.4169,3.899182,880.6115,2014-01-05,2021-05-26 11:47:21.800


#### Convert to Spark DataFrame so we can save as Delta Lalke tables

In [13]:
spark_weather_data = Utils.create_spark_df(spark, weather_data)
spark_score_data = Utils.create_spark_df(spark, serve_data)
spark_weather_data.show(2)

+--------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+---------+-------------------+--------------------+
|year_month_day|   temperature_00| wind_direction_00|     wind_speed_00|   temperature_08|wind_direction_08|     wind_speed_08|   temperature_16|wind_direction_16|     wind_speed_16|    power|           datetime|             created|
+--------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+---------+-------------------+--------------------+
|    2014-01-01|4.702021725972501|106.74258999999999| 4.743291999999999|7.189482116699223|        100.41638|6.5938324999999995|8.172300847371405|           99.288|          5.967206|1959.3535|2014-01-01 00:00:00|2021-05-26 11:47:...|
|    2014-01-02|7.695733197530104|         98.036705|6.142715499

In [17]:
#### Create Delta Lake tables 

In [16]:
table_names = ["data/weather_forecast_features", "data/serve_weather_forecast_features"]
parquet_files = ["data/weather_forecast_features_parquet", "data/serve_weather_forecast_features_parquet"]
data_frames = [spark_weather_data, spark_score_data]
[Utils.create_delta_table(f, t) for t, f in zip(table_names, data_frames)]

[None, None]

#### Read the Delta table features back into a Spark DataFrame

In [21]:
for t in table_names:
    df = Utils.read_data(spark, t, "delta")
    print("Delta Table: {}".format(t))
    df.show(1)
    print(df.schema)

Delta Table: data/weather_forecast_features
+--------------+------------------+-----------------+------------------+-----------------+------------------+-------------+------------------+-----------------+-------------+---------+-------------------+--------------------+
|year_month_day|    temperature_00|wind_direction_00|     wind_speed_00|   temperature_08| wind_direction_08|wind_speed_08|    temperature_16|wind_direction_16|wind_speed_16|    power|           datetime|             created|
+--------------+------------------+-----------------+------------------+-----------------+------------------+-------------+------------------+-----------------+-------------+---------+-------------------+--------------------+
|    2020-05-23|5.8407705307006825|         258.7054|3.1650392999999997|8.923618189493816|223.65662000000003|    2.2269764|11.021574338277176|        276.32333|     7.281217|1632.3582|2020-05-23 00:00:00|2021-05-26 11:47:...|
+--------------+------------------+-----------------

#### Create equivalent parquet files


In [19]:
[Utils.save_data(f, t, "parquet") for t, f in zip(parquet_files, data_frames)]

[None, None]

#### Read the Parquet features back into a Spark DataFrame

In [22]:
for t in parquet_files:
    df = Utils.read_data(spark, t, "parquet")
    print("Parquet DataFrame: {}".format(t))
    df.show(1)
    print(df.schema)

Parquet DataFrame: data/weather_forecast_features_parquet
+--------------+------------------+-----------------+------------------+-----------------+------------------+-------------+------------------+-----------------+-------------+---------+-------------------+--------------------+
|year_month_day|    temperature_00|wind_direction_00|     wind_speed_00|   temperature_08| wind_direction_08|wind_speed_08|    temperature_16|wind_direction_16|wind_speed_16|    power|           datetime|             created|
+--------------+------------------+-----------------+------------------+-----------------+------------------+-------------+------------------+-----------------+-------------+---------+-------------------+--------------------+
|    2020-05-23|5.8407705307006825|         258.7054|3.1650392999999997|8.923618189493816|223.65662000000003|    2.2269764|11.021574338277176|        276.32333|     7.281217|1632.3582|2020-05-23 00:00:00|2021-05-26 11:47:...|
+--------------+------------------+---