In [1]:
import findspark
findspark.init("/opt/manual/spark/")

In [2]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)

In [3]:
spark = SparkSession.builder \
.appName("Spark_ML_HW_advertising") \
.master("yarn") \
.enableHiveSupport() \
.getOrCreate()

In [4]:
df = spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.option("sep",",") \
.load("file:///home/train/datasets/Advertising.csv")

In [5]:
df.limit(5).toPandas()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


In [6]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [7]:
df.cache()

DataFrame[ID: int, TV: double, Radio: double, Newspaper: double, Sales: double]

In [8]:
df.count()

200

# null values



In [9]:
avg_dict = {}

In [10]:
df_count = df.count()

In [11]:
# Seperate categorical and numeric null including columns
cat_nulls = []

num_nulls = []

In [12]:
def null_count(df, col_name):
    nc = df.filter( 
        (F.col(col_name).isNull()) | 
        (F.col(col_name) == "") | 
        (F.col(col_name) == "NA")
    ).count()
    
    return nc

In [13]:
for col_name in df.dtypes:
    
    nc = null_count(df, col_name[0])
    
    
    if(  nc > 0 ):
        print("{} {} type has {} null values, % {}".format(col_name[0], col_name[1], nc, (nc/df_count)))
        if col_name[1] == 'string':
            cat_nulls.append(col_name[0])
        else:
            num_nulls.append(col_name[0])

In [14]:
print(cat_nulls)

[]


In [15]:
print(num_nulls)

[]


In [16]:
# no any null column founded


# Group columns

In [17]:
categoric_cols = []

numeric_cols = []

discarted_cols = ['ID', 'TV', 'Radio', 'Newspaper']


label_col = ['Sales']

# VectorAssembler

In [18]:
from pyspark.ml.feature import VectorAssembler

In [19]:
assembler = VectorAssembler().setHandleInvalid("skip") \
.setInputCols(discarted_cols) \
.setOutputCol('unscaled_features')

# Feature Scaling

In [20]:
from pyspark.ml.feature import StandardScaler

In [21]:
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("scaled_features")

# Estimator

In [22]:
from pyspark.ml.regression import RandomForestRegressor

In [23]:
estimator = RandomForestRegressor(numTrees=400) \
.setFeaturesCol("unscaled_features") \
.setLabelCol(label_col[0])

# Pipeline

In [24]:
from pyspark.ml import Pipeline

In [25]:
pipeline_obj = Pipeline().setStages([assembler, estimator])

# Split test train

In [26]:
train_df, test_df = df.randomSplit([.8, .2], seed=142)

In [27]:
print(train_df.count())
print(test_df.count())

161
39


In [28]:
train_df.show(5)

+---+-----+-----+---------+-----+
| ID|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
+---+-----+-----+---------+-----+
only showing top 5 rows



# Train model

In [29]:
pipeline_model = pipeline_obj.fit(train_df)

# Prediction

In [30]:
transformed_df = pipeline_model.transform(test_df)

In [31]:
transformed_df.select(label_col[0],'prediction').show(truncate=False)

+-----+------------------+
|Sales|prediction        |
+-----+------------------+
|18.5 |17.715681269671787|
|4.8  |7.514044193931963 |
|8.6  |9.67198538394973  |
|19.0 |18.922156649101318|
|9.7  |9.807370970230103 |
|18.9 |18.074029811510524|
|17.4 |16.202892903674602|
|16.6 |15.638277841580125|
|10.6 |10.384047162888058|
|9.7  |9.705710429069129 |
|22.6 |21.304019185388587|
|21.2 |20.85319822180858 |
|24.2 |23.192859322308692|
|11.3 |11.242467359929606|
|15.2 |15.941876721288942|
|11.5 |11.880594248306684|
|11.7 |13.11886633855837 |
|14.7 |14.439603525992029|
|20.7 |19.741722818849336|
|21.8 |20.839537345103672|
+-----+------------------+
only showing top 20 rows



In [32]:
transformed_df.show(5)

+---+-----+-----+---------+-----+--------------------+------------------+
| ID|   TV|Radio|Newspaper|Sales|   unscaled_features|        prediction|
+---+-----+-----+---------+-----+--------------------+------------------+
|  4|151.5| 41.3|     58.5| 18.5|[4.0,151.5,41.3,5...|17.715681269671787|
|  9|  8.6|  2.1|      1.0|  4.8|   [9.0,8.6,2.1,1.0]| 7.514044193931963|
| 11| 66.1|  5.8|     24.2|  8.6|[11.0,66.1,5.8,24.2]|  9.67198538394973|
| 15|204.1| 32.9|     46.0| 19.0|[15.0,204.1,32.9,...|18.922156649101318|
| 25| 62.3| 12.6|     18.3|  9.7|[25.0,62.3,12.6,1...| 9.807370970230103|
+---+-----+-----+---------+-----+--------------------+------------------+
only showing top 5 rows



# Evaluate the model

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

In [34]:
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName='r2')

In [35]:
evaluator.evaluate(transformed_df)

0.9135148698784985

# Hyperparameter Tuning


In [36]:
from pyspark.ml.tuning import CrossValidatorModel, ParamGridBuilder, CrossValidator

In [37]:
paramGrid = ParamGridBuilder() \
.addGrid(estimator.maxDepth, [5, 8]) \
.addGrid(estimator.maxBins, [30,32,33,35]) \
.addGrid(estimator.numTrees, [15,20,25]) \
.build()

In [38]:
crossval = CrossValidator(estimator=pipeline_obj,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5) 

In [39]:
cvModel = crossval.fit(train_df)

In [40]:
type(cvModel)

pyspark.ml.tuning.CrossValidatorModel

In [41]:
cvModel.params

[Param(parent='CrossValidatorModel_6b094fd56be1', name='estimator', doc='estimator to be cross-validated'),
 Param(parent='CrossValidatorModel_6b094fd56be1', name='estimatorParamMaps', doc='estimator param maps'),
 Param(parent='CrossValidatorModel_6b094fd56be1', name='evaluator', doc='evaluator used to select hyper-parameters that maximize the validator metric'),
 Param(parent='CrossValidatorModel_6b094fd56be1', name='foldCol', doc="Param for the column name of user specified fold number. Once this is specified, :py:class:`CrossValidator` won't do random k-fold split. Note that this column should be integer type with range [0, numFolds) and Spark will throw exception on out-of-range fold numbers."),
 Param(parent='CrossValidatorModel_6b094fd56be1', name='numFolds', doc='number of folds for cross validation'),
 Param(parent='CrossValidatorModel_6b094fd56be1', name='seed', doc='random seed.')]

In [42]:
bestModel = cvModel.bestModel

In [43]:
evaluator.evaluate(bestModel.transform(test_df))

0.9328937316552548

# Save model to disk

In [81]:
bestModel.write().overwrite().save("file:///home/train/saved_models/pipeline_model")

In [92]:
!ls -l home/train/saved_models/pipeline_model


ls: cannot access home/train/saved_models/pipeline_model: No such file or directory


In [53]:
type(bestModel)

pyspark.ml.pipeline.PipelineModel

# load model from disk

In [55]:
from pyspark.ml.pipeline import PipelineModel

In [82]:
pipeline_model_loaded = PipelineModel.load("file:///home/train/saved_models/pipeline_model")

# prediction

In [71]:
transformed_pipeline_model = pipeline_model_loaded.transform(test_df)

In [75]:
predicted_df = transformed_pipeline_model.select(*discarted_cols,'prediction')

In [76]:
predicted_df.show()

+---+-----+-----+---------+------------------+
| ID|   TV|Radio|Newspaper|        prediction|
+---+-----+-----+---------+------------------+
|  4|151.5| 41.3|     58.5|17.238333333333333|
|  9|  8.6|  2.1|      1.0| 6.834166666666666|
| 11| 66.1|  5.8|     24.2|10.371085470085466|
| 15|204.1| 32.9|     46.0|18.453333333333333|
| 25| 62.3| 12.6|     18.3| 9.699146825396824|
| 29|248.8| 27.1|     22.9| 19.20052631578947|
| 34|265.6| 20.0|      0.3|15.780166666666664|
| 41|202.5| 22.3|     31.6|15.969859649122807|
| 47| 89.7|  9.9|     35.7| 10.32390293040293|
| 50| 66.9| 11.7|     36.8|10.501680708180706|
| 53|216.4| 41.7|     39.6|21.993869047619047|
| 54|182.6| 46.2|     58.7|19.848499999999994|
| 62|261.3| 42.7|     54.7|23.967535714285713|
| 83| 75.3| 20.3|     32.5|11.666176470588237|
| 86|193.2| 18.4|     65.7|15.633026315789474|
| 95|107.4| 14.0|     10.9|12.282176470588235|
| 97|197.6|  3.5|      5.9| 12.92797222222222|
|104|187.9| 17.2|     17.9|15.123248538011694|
|105|238.2| 3

In [77]:
predicted_df.count()

39

In [101]:
predicted_df2 = predicted_df.withColumn("current_date", F.current_timestamp())

In [103]:
predicted_df2.show(n=5, truncate=False)

+---+-----+-----+---------+------------------+-----------------------+
|ID |TV   |Radio|Newspaper|prediction        |current_date           |
+---+-----+-----+---------+------------------+-----------------------+
|4  |151.5|41.3 |58.5     |17.238333333333333|2021-10-13 17:35:30.349|
|9  |8.6  |2.1  |1.0      |6.834166666666666 |2021-10-13 17:35:30.349|
|11 |66.1 |5.8  |24.2     |10.371085470085466|2021-10-13 17:35:30.349|
|15 |204.1|32.9 |46.0     |18.453333333333333|2021-10-13 17:35:30.349|
|25 |62.3 |12.6 |18.3     |9.699146825396824 |2021-10-13 17:35:30.349|
+---+-----+-----+---------+------------------+-----------------------+
only showing top 5 rows



# save predicted table to postgresql

In [104]:
jdbcUrl = "jdbc:postgresql://localhost/traindb?user=train&password=Ankara06"

In [105]:
predicted_df2.write.jdbc(url=jdbcUrl,
              table="advertising_pred", 
              mode="overwrite", 
              properties={"driver": 'org.postgresql.Driver'})

In [106]:
(spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("driver", 'org.postgresql.Driver')
.option("query","select * from advertising_pred")
.load()).limit(5).toPandas()

Unnamed: 0,ID,TV,Radio,Newspaper,prediction,current_date
0,4,151.5,41.3,58.5,17.238333,2021-10-13 17:36:00.818
1,9,8.6,2.1,1.0,6.834167,2021-10-13 17:36:00.818
2,11,66.1,5.8,24.2,10.371085,2021-10-13 17:36:00.818
3,15,204.1,32.9,46.0,18.453333,2021-10-13 17:36:00.818
4,25,62.3,12.6,18.3,9.699147,2021-10-13 17:36:00.818
