In [1]:
# package python
from pprint import pprint 
import json
from datetime import date
from datetime import datetime
import math

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf
from pyspark.ml import Pipeline

spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("Spark-Learning")
         .config("spark.sql.broadcastTimeout", "36000")
         .getOrCreate())
spark

#spark session --> spark context --> sql context
sc = spark.sparkContext
sqlContext = SQLContext(spark.sparkContext)
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

DataFrame[key: string, value: string]

In [2]:
from pyspark.sql import Row

date_col = ["2018-12-22", "2017-01-08", "2015-08-25", "2015-03-12"]
sales = [17, 22, 48, 150]
l = [(x,y) for x,y in zip(date_col, sales)]
rdd = sc.parallelize(l)
sales = rdd.map(lambda x: Row(date=x[0], sales=int(x[1])))
df = sqlContext.createDataFrame(sales)
df.show()

+----------+-----+
|      date|sales|
+----------+-----+
|2018-12-22|   17|
|2017-01-08|   22|
|2015-08-25|   48|
|2015-03-12|  150|
+----------+-----+



In [7]:
from pyspark.ml.pipeline import Transformer

class DayExtractor(Transformer):
    def __init__(self, inputCol, outputCol='dayofmonth'):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
            this(Identifiable.randomUID("dayextractor"))
    def copy(extra):
            defaultCopy(extra)
    def check_input_type(self, schema):
            field = schema[self.inputCol]
            if (field.dataType != DateType()):
                raise Exception('DayExtractor input type %s did not match input type DateType' % field.dataType)
    def _transform(self, df):
            self.check_input_type(df.schema)
            return df.withColumn(self.outputCol, F.dayofmonth(df[self.inputCol]))
    
class MonthQuarterExtractor(Transformer):
    def __init__(self, inputCol='day', outputCol='monthquarter'):
        self.inputCol = inputCol
        self.outputCol = outputCol
        
    def this():
        this(Identifiable.randomUID("monthquarterextractor"))
    
    def copy(extra):
        defaultCopy(extra)
    
    def check_input_type(self, schema):
        field = schema[self.inputCol]
        if (field.dataType != IntegerType()):
              raise Exception('monthQuarterExtractor input type %s did not match input type IntegerType' % field.dataType)
                
    def _transform(self, df):
        self.check_input_type(df.schema)
        return df.withColumn(self.outputCol, F.when((df[self.inputCol] <= 8), 0)
                               .otherwise(F.when((df[self.inputCol] <= 16), 1)
                                .otherwise(F.when((df[self.inputCol] <= 24), 2)
                                 .otherwise(3))))
#============== year ================
class YearExtractor(Transformer):
    def __init__(self, inputCol, outputCol='year'):
        self.inputCol = inputCol
        self.outputCol = outputCol
        
    def this():
        this(Identifiable.randomUID("yearextractor"))
        
    def copy(extra):
            defaultCopy(extra)
    def check_input_type(self, schema):
            field = schema[self.inputCol]
            if (field.dataType != DateType()):
                    raise Exception('YearExtractor input type %s did not match input type DateType' % field.dataType)
    def _transform(self, df):
            self.check_input_type(df.schema)
            return df.withColumn(self.outputCol, F.year(df[self.inputCol]))

In [9]:
from pyspark.ml import Pipeline
df = df.withColumn('date', F.col('date').cast(DateType()))
dex = DayExtractor(inputCol='date')
yex = YearExtractor(inputCol='date')
mqex = MonthQuarterExtractor(inputCol = "dayofmonth")
FeaturesPipeline =  Pipeline(stages=[dex, yex, mqex])
Featpip = FeaturesPipeline.fit(df)
df= Featpip.transform(df)
df.show()

+----------+-----+----------+----+------------+
|      date|sales|dayofmonth|year|monthquarter|
+----------+-----+----------+----+------------+
|2018-12-22|   17|        22|2018|           2|
|2017-01-08|   22|         8|2017|           0|
|2015-08-25|   48|        25|2015|           3|
|2015-03-12|  150|        12|2015|           1|
+----------+-----+----------+----+------------+

