In [1]:
import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

properties = {
    'username': 'postgres',
    'password': '20020202',
    'url': "jdbc:postgresql://localhost:5432/postgres",
    'table': 'fifa.player_data',
    'driver': 'org.postgresql.Driver'
}

def write_to_pgadmin(df, mode='overwrite'):
    df.write.format('jdbc').mode(mode)\
        .option("url", properties['url'])\
        .option("dbtable", properties['table'])\
        .option("user", properties['username'])\
        .option("password", properties['password'])\
        .option("Driver", properties['driver'])\
        .save()

def read_from_pgadmin():
    return spark.read.format("jdbc")\
        .option("url", properties['url'])\
        .option("dbtable", properties['table'])\
        .option("user", properties['username'])\
        .option("password", properties['password'])\
        .option("Driver", properties['driver'])\
        .load()

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .set('spark.jars.packages', 'org.postgresql:postgresql:42.7.0')\
    .setAppName(appName)\
    .setMaster(master)

# conf = pyspark.SparkConf().\
#     set('spark.jars.packages', 'org.postgresql:postgresql:42.7.0')\
#     .setAppName(appName).setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [3]:
# from preprocess import *
# df = read_from_pgadmin()
# preprocess_pipeline = get_preprocess_pipeline()
# preprocess_pipeline_model = preprocess_pipeline.fit(df)
# df_processed = preprocess_pipeline_model.transform(df)

from preprocess import *
properties['table'] = 'fifa.clean_data'
df_new = read_from_pgadmin()
df_new.show(5)
preprocess_pipeline = get_preprocess_pipeline()
preprocess_pipeline_model = preprocess_pipeline.fit(df_new)
df_processed = preprocess_pipeline_model.transform(df_new)

+-------+---+---------+---------+---------+-----------+------------------------+-------------+----------------+---------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+------------------------+------------------+--------------------+-------------------+-----------------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+
|overall|age|height_cm

In [16]:
df_processed.show(5)

+-------+--------------------+
|outcome|            features|
+-------+--------------------+
|   94.0|[6.00629302994062...|
|   93.0|[6.43531396065066...|
|   90.0|[6.64982442600569...|
|   90.0|[6.22080349529564...|
|   90.0|[6.00629302994062...|
+-------+--------------------+
only showing top 5 rows



In [5]:
train_df, test_df = df_processed.randomSplit([0.8, 0.2], seed=42)

In [7]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(featuresCol='features', labelCol='outcome')
evaluator = RegressionEvaluator(metricName="rmse", labelCol="outcome", predictionCol="prediction")

# Set up parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

cross_validator = CrossValidator(estimator=lr,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)

lr_cv_model = cross_validator.fit(train_df)

# # Get the best model
# lr_best_model = lr_cv_model.bestModel

# # Evaluate the best model on the test set
# predictions = lr_best_model.transform(test_df)
# rmse = evaluator.evaluate(predictions)
# print(f"Best Model Test RMSE: {rmse}")
# print(f"Best Model Parameters: regParam={lr_best_model._java_obj.getRegParam()}, elasticNetParam={lr_best_model._java_obj.getElasticNetParam()}, maxIter={lr_best_model._java_obj.getMaxIter()}")

Best Model Test RMSE: 2.5087356589009353
Best Model Parameters: regParam=0.1, elasticNetParam=0.0, maxIter=100


In [17]:
lr_best_model = lr_cv_model.bestModel

# Evaluate the best model on the test set
predictions = lr_best_model.transform(test_df)
rmse = evaluator.evaluate(predictions)
print(f"Best Model Test RMSE: {rmse}")
print(f"Best Model Parameters: regParam={lr_best_model._java_obj.getRegParam()}, elasticNetParam={lr_best_model._java_obj.getElasticNetParam()}, maxIter={lr_best_model._java_obj.getMaxIter()}")



Best Model Test RMSE: 2.5087356589009353
Best Model Parameters: regParam=0.1, elasticNetParam=0.0, maxIter=100


In [14]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features", labelCol="outcome")
evaluator = RegressionEvaluator(metricName="rmse", labelCol="outcome", predictionCol="prediction")

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2, 5, 10]) \
    .addGrid(gbt.maxIter, [10, 20, 50]) \
    .addGrid(gbt.stepSize, [0.1, 0.2, 0.3]) \
    .build()

cross_validator = CrossValidator(estimator=gbt,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)

gbt_cv_model = cross_validator.fit(train_df)

# # Get the best model
# gbt_best_model = gbt_cv_model.bestModel

# # Evaluate the best model on the test set
# predictions = gbt_best_model.transform(test_df)
# rmse = evaluator.evaluate(predictions)
# print(f"Best Model Test RMSE: {rmse}")
# print(f"Best Model Parameters: maxDepth={gbt_best_model._java_obj.getMaxDepth()}, maxIter={gbt_best_model._java_obj.getMaxIter()}, stepSize={gbt_best_model._java_obj.getStepSize()}")

In [18]:
# Get the best model
gbt_best_model = gbt_cv_model.bestModel

# Evaluate the best model on the test set
predictions = gbt_best_model.transform(test_df)
rmse = evaluator.evaluate(predictions)
print(f"Best Model Test RMSE: {rmse}")
print(f"Best Model Parameters: maxDepth={gbt_best_model._java_obj.getMaxDepth()}, maxIter={gbt_best_model._java_obj.getMaxIter()}, stepSize={gbt_best_model._java_obj.getStepSize()}")

Best Model Test RMSE: 1.6506834424683523
Best Model Parameters: maxDepth=5, maxIter=20, stepSize=0.1
