In [None]:
import pyspark #For accesing dataframe etc with the prefix.

from pyspark.ml import Pipeline
from pyspark.sql import DataFrame as SparkDataFrame, SparkSession
spark_session_builder:SparkSession.Builder = SparkSession.builder # type: ignore <-Ignore a wrong pylance warning, and make type detection work properly
spark:SparkSession = (
    spark_session_builder
    .master('local[3]')
    .appName('TimeSeriesForecastStoreSales Data Engineering')
    .getOrCreate()
)

people = spark.createDataFrame([
...     {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
...     {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
...     {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
...     {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
... ])





#https://stackoverflow.com/questions/44570118/join-two-spark-mllib-pipelines-together
#https://mallikarjuna_g.gitbooks.io/spark/content/spark-mllib/spark-mllib-transformers.html#custom-transformer
#https://mallikarjuna_g.gitbooks.io/spark/content/spark-mllib/spark-mllib-estimators.html
#https://mallikarjuna_g.gitbooks.io/spark/content/spark-mllib/spark-mllib-pipelines.html
# simple pipeline example: https://blog.insightdatascience.com/spark-pipelines-elegant-yet-powerful-7be93afcdd42
# if conditional pipeliens https://towardsdatascience.com/pipeline-oriented-data-analytics-with-spark-ml-part-2-3088d7a3c1b5
# how to pass params to scikit learn pipeline elements: https://spark.apache.org/docs/latest/ml-pipeline.html#parameters
# Pipeline end to end even checking predictions: https://medium.com/pythoneers/pipelines-in-spark-ml-7ffd57ecad18
# Naiva approach to paralelism in spark/python https://www.youtube.com/watch?v=oOFrUm6JC-0
# More spark paralelism https://stackoverflow.com/questions/74762704/understanding-spark-default-parallelism

# Paralelize spark: https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473 https://medium.com/analytics-vidhya/boosting-apache-spark-application-by-running-multiple-parallel-jobs-25d13ee7d2a6 https://thedatafreak.medium.com/spark-jobs-induce-parallelism-d94d6a51badb
# Cv paralelism https://bryancutler.github.io/cv-parallel/
# Read spark dags https://dzone.com/articles/reading-spark-dags

#Modern python open source stack, including spark and more:
#https://es.slideshare.net/slideshow/optimizing-your-sparkml-pipelines-using-the-latest-features-in-spark-23/103202560

#Similar to function transformers, you can easily implement user defined functions in spark
# You define the function as taking a single input and output, and add the annotation.
# You then call it with the name of the column to apply it to. maybe?
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html
#User defined table functions return an entire table as input instead of a column:
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udtf.html
#Pandas udf allow creating vectorized functions
#https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html
#calling a udf https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.call_udf.html


first_pipeline = Pipeline(stages=[]) #Dont allow paralelism here
second_pipeline = Pipeline(stages=[]) #Dont allow paralelism here
full_pipeline = Pipeline(stages=[first_pipeline, second_pipeline]) #Allow parallelism here
full_pipeline.fit(df, params_to_override={})

#Im pretty sure that pyspark.read as a first step could automatically get only necessary features for the rest of the process but im unsure if it will do it. That said you probably wouldnt save much.
#The load operation is not lazy evaluated if you set the inferSchema option to True. In this case, spark will launch a job to scan the file and infer the type of columns. 
#That probably means it is lazy if you pass the schema which could mean im right, and as long as you dont use a column it wont be loaded, but im unsure how to verify it.
#You can use sampling ratio to reduce the amount of rows loaded in order to get the schema too.
# You can view jobs at the local ui
#Spark transformations are lazy evaluated aswell.
#Paralelize isnt executed until you call an action either, aka its lazy.

#Actions are the only spark things that trigger computations
#to df, join,values, groupby, foreach, persist, cache, checkpoint, first, taken, collect, lookup, aggregation functions such as count, mean. show is also an action, save too.
# checkpint, columns, dtypes, explain, createTempView, schema, toDF
#Non exhaustive but pretty extensive list: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
#Once an action is called the chain of transformations is "put" into the Catalyst optimizer, gets optimized and then executed.

#cache is probably the best action to force spark to trigger computations if you feel you need to (although you probably shouldnt)

#Please note that while all actions force computations, they may not force th ecomputation of the entire dataset
#Show for example, computes only the rows it needs to show, unless coupled wiht order by in which case it will need to compute the entire dataset.

#select is not an action, and can be used to reduce the load necessary to only the requested columns.

# https://mallikarjuna_g.gitbooks.io/spark/content/spark-mllib/spark-mllib-pipelines.html#PipelineStage

# PipelineStage has multiple direct implementations : Pipeline, Transformers (incluiding UnaryTransformer which may be sparks equivalent to functiontransformer), 