# Generic ETL Pipeline Demo

In [1]:
import os
import sys
import json
import dill
import pyspark
import datetime
import pandas as pd
import sqlalchemy
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
sys.path.append(os.path.join(os.getcwd(), '..'))
from kcu.functiontransform import FunctionTransform
from pathlib import Path

sqlurl = "sqlite:///" +  os.getcwd() + "/save_pandas.db"
table = "person_data"
pluginpath = os.getcwd() + "/../../plugins"

engine = sqlalchemy.create_engine(sqlurl, echo=False)
sqlite_connection = engine.connect()

df = pd.DataFrame({
    "id": [0, 1, 2],
    "alter": [3.4, 0.3, 7.2]
})

df.to_sql(table, sqlite_connection, if_exists="replace")

ids = df.id.tolist()

sess = SparkSession.builder \
            .config(
                "spark.jars",
                "{}/sqlite-jdbc-3.34.0.jar".format(pluginpath)) \
            .config(
                "spark.driver.extraClassPath",
                "{}/sqlite-jdbc-3.34.0.jar".format(pluginpath)) \
           .getOrCreate()

df = sess.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC', dbtable=table,
                 url='jdbc:' + sqlurl) \
        .load()

23/05/21 13:59:53 WARN Utils: Your hostname, florian-G11CD-K resolves to a loopback address: 127.0.1.1; using 192.168.178.39 instead (on interface enp3s0)
23/05/21 13:59:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/05/21 13:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
def feature_add_some(df, id, id_column, feature_column, some,
                  out_feature_column):
    import pyspark.sql.functions as F

    def df_from_list(sess, payload={}, add_index=True):
        temp_data = []
        temp_schema = []
        if add_index:
            temp_schema.append("idx_")
            temp_data.append([i for i in range(len(list(payload.values())[0]))])
        temp_data.extend(payload.values())
        temp_schema.extend(list(payload.keys()))
        return sess.createDataFrame(data=list(zip(*temp_data)), schema=temp_schema)

    anchor_df = df_from_list(
        sess,
        payload={
            id_column: id,
        },
    )

    df = (
        anchor_df.join(
            df,
            anchor_df[id_column] == df[id_column],
            how="left",
        )
        .drop(anchor_df[id_column])
    ).withColumn(out_feature_column, F.col(feature_column) + F.lit(some))
    return df

In [3]:
feature_alter_parameters = lambda ids: {
                "feature_column": "alter",
                "id_column": "id",
                "some": 1,
                "id": ids,
                "out_feature_column": "alter_postprocessed"}

feature_param_pairs = [
    (feature_add_some, feature_alter_parameters)
]

stages = []
dict_to_save = dict()

for fpp in feature_param_pairs:
    ft = FunctionTransform(
        default_value=fpp[0],
        parameter_value=fpp[1](ids)
    )
    stages.append(ft)
    dict_to_save[ft.uid] = fpp[1]

pipe = Pipeline(stages=stages)
newpipe = pipe.fit(df)
newpipe.write().overwrite().save("models/testpipe")

newpipe.transform(df).show()

with open('fpp.pickle', 'wb') as handle:
    dill.dump(dict_to_save, handle, protocol=dill.HIGHEST_PROTOCOL)


{'default_value': <function feature_add_some at 0x7f768373bf40>, 'parameter_value': {'feature_column': 'alter', 'id_column': 'id', 'some': 1, 'id': [0, 1, 2], 'out_feature_column': 'alter_postprocessed'}}


                                                                                

+----+-----+---+-----+-------------------+
|idx_|index| id|alter|alter_postprocessed|
+----+-----+---+-----+-------------------+
|   0|    0|  0|  3.4|                4.4|
|   1|    1|  1|  0.3|                1.3|
|   2|    2|  2|  7.2|                8.2|
+----+-----+---+-----+-------------------+



### KERNEL NEUSTARTEN

In [4]:
import os
import sys
import dill
import pyspark
import datetime
import pandas as pd
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
sys.path.append(os.path.join(os.getcwd(), '..'))
from kcu.functiontransform import FunctionTransform


sqlurl = "sqlite:///" +  os.getcwd() + "/save_pandas.db"
table = "person_data"
pluginpath = os.getcwd() + "/../../plugins"


sess = SparkSession.builder \
            .config(
                "spark.jars",
                "{}/sqlite-jdbc-3.34.0.jar".format(pluginpath)) \
            .config(
                "spark.driver.extraClassPath",
                "{}/sqlite-jdbc-3.34.0.jar".format(pluginpath)) \
           .getOrCreate()

df = sess.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC', dbtable=table,
                 url='jdbc:' + sqlurl) \
        .load()

ids = [2, 0]

newpipe = PipelineModel.load("models/testpipe")

with open('fpp.pickle', 'rb') as handle:
    fpp = dill.load(handle)

for entry in fpp:
    print(entry)
    for i in range(len(newpipe.stages)):
        if newpipe.stages[i].uid == entry:
            newpipe.stages[i] = newpipe.stages[i].setParameterValue(dill.dumps(fpp[entry](ids)).decode(encoding="raw_unicode_escape"))
            
transformed = newpipe.transform(df)

transformed.show()

23/05/21 14:02:30 WARN Utils: Your hostname, florian-G11CD-K resolves to a loopback address: 127.0.1.1; using 192.168.178.39 instead (on interface enp3s0)
23/05/21 14:02:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/05/21 14:02:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

{}
FunctionTransform_04cb01fe8f38
+----+-----+---+-----+-------------------+
|idx_|index| id|alter|alter_postprocessed|
+----+-----+---+-----+-------------------+
|   0|    2|  2|  7.2|                8.2|
|   1|    0|  0|  3.4|                4.4|
+----+-----+---+-----+-------------------+

