In [None]:
import os
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql import functions as F
from pyspark.ml.feature import CountVectorizer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
head, tail = os.path.split(os.getcwd())
data_dir = os.path.join(head, 'data')
data_raw_dir = os.path.join(data_dir, 'raw')
DATA_INTERIM_DIR = os.path.join(data_dir, 'interim')
data_raw_dir

In [None]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [None]:
spark = init_spark()

In [None]:
clean_data = spark.read.parquet(os.path.join(DATA_INTERIM_DIR, 'cleaned.data'))
clean_data.head(2)

In [None]:
clean_data.printSchema()

In [None]:
clean_data.isNull().count()

In [None]:
features = clean_data.drop('DATE_ORIGINE','Distance_km','MOTIF_REMORQUAGE','Date_Time','Year', 'Spd_of_Max_Gust')
print(features.columns)
features.show(5)

In [None]:
for col_name in features.schema.names:
    if features.filter(features[col_name].isNull()).count() > 0:
        print(col_name)

In [None]:
vectorAssembler = VectorAssembler(inputCols = features.columns, outputCol = 'features')
training_df = vectorAssembler.transform(clean_data)
training_df = training_df.select(['features', 'Distance_km'])
training_df.show(5)

In [None]:
print(type(training_df))
train,test = training_df.randomSplit([0.75, 0.25])

In [None]:
test.show(1)

In [None]:
lr = LinearRegression(featuresCol = 'features', labelCol='Distance_km')
lr_model = lr.fit(train)

In [None]:
print("Coefficients: " + str(lr_model.coefficients))
print("\nIntercept: " + str(lr_model.intercept))

In [None]:
trainSummary = lr_model.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("\nr2: %f" % trainSummary.r2)

In [None]:
lr2 = LinearRegression(featuresCol = 'features', labelCol='Distance_km',maxIter=100, regParam=0.12, elasticNetParam=0.2)
lr_model2 = lr.fit(train)
print("Coefficients: " + str(lr_model2.coefficients))
print("\nIntercept: " + str(lr_model2.intercept))
trainSummary = lr_model2.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("\nr2: %f" % trainSummary.r2)

# one-hot encode

In [None]:
encoder_year = OneHotEncoder(inputCol="Year", outputCol="Year_OneHotEncoded")
encoder_month = OneHotEncoder(inputCol="Month", outputCol="Month_OneHotEncoded")
encoder_day = OneHotEncoder(inputCol="Day", outputCol="Day_OneHotEncoded")
encoder_year.setDropLast(False)
encoder_month.setDropLast(False)
encoder_day.setDropLast(False)
ohe_year = encoder_year.fit(clean_data)
ohe_month = encoder_month.fit(clean_data)
ohe_day = encoder_day.fit(clean_data)
encoded_df = ohe_year.transform(clean_data)
encoded_df = ohe_month.transform(encoded_df)
encoded_df = ohe_day.transform(encoded_df)
encoded_df.head(5)

In [None]:
vectorAssembler = VectorAssembler(inputCols = ['LONGITUDE_ORIGINE', 'LATITUDE_ORIGINE', 'Mean_Temp', 'Total_Rain', 'Total_Precip', 'Total_Snow', 'Spd_of_Max_Gust', 'Month', 'Day'], outputCol = 'features')
training_df = vectorAssembler.transform(clean_data)
training_df = training_df.select(['features', 'Distance_km'])
training_df.show(5)

In [None]:
splits = training_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# validate_df = splits[2]
train_df.printSchema()

In [None]:
lr = LinearRegression(featuresCol = 'features', labelCol='Distance_km', maxIter=100, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

## Convert column to array

In [None]:
clean_data = clean_data.withColumn("Year_Array", F.split(F.col("Year")," "))
clean_data = clean_data.withColumn("Month_Array", F.split(F.col("Month")," "))
clean_data = clean_data.withColumn("Day_Array", F.split(F.col("Day")," "))
clean_data.head(2)

https://www.hackdeploy.com/pyspark-one-hot-encoding-with-countvectorizer/

In [None]:
# Initialize a CountVectorizer.
year_vectorizer = CountVectorizer(inputCol="Year_Array", outputCol="Year_OneHotEncoded", vocabSize=6, minDF=1.0)
month_vectorizer = CountVectorizer(inputCol="Month_Array", outputCol="Month_OneHotEncoded", vocabSize=12, minDF=1.0)
day_vectorizer = CountVectorizer(inputCol="Day_Array", outputCol="Day_OneHotEncoded", vocabSize=31, minDF=1.0)
#Get a VectorizerModel
year_vectorizer_model = year_vectorizer.fit(clean_data)
month_vectorizer_model = month_vectorizer.fit(clean_data)
day_vectorizer_model = day_vectorizer.fit(clean_data)
df_ohe = year_vectorizer_model.transform(clean_data)
df_ohe = month_vectorizer_model.transform(df_ohe)
df_ohe = day_vectorizer_model.transform(df_ohe)
df_ohe.head(2)