In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder\
    .appName('MLOps')\
    .config('spark.sql.shuffle.partitions',400)\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/07 00:06:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/07 00:06:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
#a lot of pyspark & spark SQL ETL
input_1 = spark.read.csv('/hdfs/raw/flat/feed/raw')

processed_data = input_1.groupBy('key')\
    .agg(F.sum('value').alias('sum_col'))\
    .groupBy("Product","Country")\
    .pivot("Country")\
    .sum("sum(Amount)")\

processed_data.createOrReplaceTempView('processed')

In [None]:
#more ETL
spark.sql("sel * from the external table processed").write.format('delta').save('/to/hdfs_or_s3/path/silver')


In [None]:
%sql
CREATE DATABASE if not exists dbname;
use dbname;
CREATE TABLE IF NOT EXISTS tablename USING LOCATION '/to/hdfs_or_s3/path/silver';
--silver data

In [None]:
#data science - reading delta table
input_df = spark.read.table('dbname.tablename') 
display(input_df.summary())

In [None]:
#feature engineering - silver to gold - one hot encoding
countries = sorted(map( lambda r : r['country'], input_df.select('country').distinct().collect()))

with_countries_df = input_df
for country in countries:
    with_countries_df = with_countries_df.withColumn(f'Country_{country}', F.col('country') == country)

with_countries_df.drop('country')
with_countries_df.write.format('delta').save('/to/hdfs_or_s3/path/gold')

In [None]:
%sql
USE dbname;
CREATE TABLE IF NOT EXISTS gold_table USING LOCATION '/to/hdfs_or_s3/path/gold';
--glod data

In [None]:
#enable mlflow for autologging spark data sources before they are used
import mlflow.spark as mlflow_spark
mlflow_spark.autolog()

In [None]:
#modeling - training & testing data
import databricks.koalas as ks

input_ks = spark.read.table('dbname.gold_table').to_koalas()
input_ks = input_ks[input_ks['year'] <= 2016]

input_ks_train = input_ks[input_ks['year'] <= 2014]
input_ks_test = input_ks[input_ks['year'] > 2014]

X_ks_train = input_ks_train.drop('label_col', axis = 1)
y_ks_train = input_ks_train['label_col']

X_ks_test = input_ks_test.drop('label_col', axis = 1)
y_ks_test = input_ks_test['label_col']

X_train = X_ks_train.to_pandas()
y_train = y_ks_train.to_pandas()
#and so on incase pandas APIs needed


In [None]:
#hyperopt - parameter tuning at scale on spark cluster 
#using hyperopt building and training multiple models with different parameter can happen in parallel

