## Creating Spark session

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

## Schema definition for the two files
The dataset had to be divided into two files. Defining the schema here. Being an astronomical data and a regression task, all the values are continuous.

In [2]:
# schema 1
gal_info_schema = T.StructType([
    T.StructField('specObjID', T.LongType(), True),
    T.StructField('class', T.StringType(), True),
    T.StructField('z', T.FloatType(), True),
    T.StructField('zErr', T.FloatType(), True),
    T.StructField('ra', T.FloatType(), True),
    T.StructField('dec', T.FloatType(), True),
    T.StructField('zphot', T.FloatType(), True),
    T.StructField('dzphot', T.FloatType(), True),
    T.StructField('l', T.FloatType(), True),
    T.StructField('b', T.FloatType(), True),
])

# schema 2
gal_petro_schema = T.StructType([
    T.StructField('specObjID', T.LongType(), True),
    T.StructField('petroMag_u', T.FloatType(), True),
    T.StructField('petroMag_g', T.FloatType(), True),
    T.StructField('petroMag_r', T.FloatType(), True),
    T.StructField('petroMag_i', T.FloatType(), True),
    T.StructField('petroMag_z', T.FloatType(), True),
    T.StructField('petroMagErr_u', T.FloatType(), True),
    T.StructField('petroMagErr_g', T.FloatType(), True),
    T.StructField('petroMagErr_r', T.FloatType(), True),
    T.StructField('petroMagErr_i', T.FloatType(), True),
    T.StructField('petroMagErr_z', T.FloatType(), True),
])

## Reading of the files with the created schema

In [3]:
gal_info = spark.read.csv(
    path="gs://dataproc-staging-us-central1-931970996334-cxascqdj/notebooks/jupyter/gal.csv",
    sep=",",
    header=True,
    schema=gal_info_schema
)

gal_petro = spark.read.csv(
    path="gs://dataproc-staging-us-central1-931970996334-cxascqdj/notebooks/jupyter/gal_petro.csv",
    sep=",",
    header=True,
    schema=gal_petro_schema
)

## Joining the two DataFrames

In [4]:
gal_data = gal_info.join(
    gal_petro,
    gal_info["specObjID"] == gal_petro["specObjID"]
)

## Removing one key from joined data
Since the dataframes were joined on 'spechObjID', the joined table would have two instances of the primary key, removing one to avoid any ambiguity.

In [5]:
gal_data = gal_data.drop(gal_petro["specObjID"])
gal_data.printSchema()

root
 |-- specObjID: long (nullable = true)
 |-- class: string (nullable = true)
 |-- z: float (nullable = true)
 |-- zErr: float (nullable = true)
 |-- ra: float (nullable = true)
 |-- dec: float (nullable = true)
 |-- zphot: float (nullable = true)
 |-- dzphot: float (nullable = true)
 |-- l: float (nullable = true)
 |-- b: float (nullable = true)
 |-- petroMag_u: float (nullable = true)
 |-- petroMag_g: float (nullable = true)
 |-- petroMag_r: float (nullable = true)
 |-- petroMag_i: float (nullable = true)
 |-- petroMag_z: float (nullable = true)
 |-- petroMagErr_u: float (nullable = true)
 |-- petroMagErr_g: float (nullable = true)
 |-- petroMagErr_r: float (nullable = true)
 |-- petroMagErr_i: float (nullable = true)
 |-- petroMagErr_z: float (nullable = true)



## Creation of assembler and scaler

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import time

In [7]:
feature_columns = [
    "z",
    "zErr",
    "ra",
    "dec",
    "dzphot",
    "petroMag_u",
    "petroMag_g",
    "petroMag_r",
    "petroMag_i",
    "petroMag_z",
    "petroMagErr_u",
    "petroMagErr_g",
    "petroMagErr_r",
    "petroMagErr_i",
    "petroMagErr_z"
]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

In [8]:
# Linear Regression pipeline
train_data, test_data = gal_data.randomSplit([0.8, 0.2], seed=42)
evaluator = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="r2")

## Linear Regression

In [9]:
lr_best = LinearRegression(featuresCol="scaled_features", labelCol="zphot", regParam=0.2, elasticNetParam=1.0, loss='squaredError')
pipeline = Pipeline(stages=[assembler, scaler, lr_best])

In [10]:
evaluation_results = []
percentage_list = [0.25, 0.5, 0.75, 1.0]

# Loop through different percentages
for percentage in percentage_list:
    start = time.time()
    sampled_data = gal_data.sample(fraction=percentage, seed=42)
    
    # Split data, train on train set, make predictions on test set
    train_data, test_data = sampled_data.randomSplit([0.8, 0.2], seed=42)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)

    # Evaluation - R2
    evaluator1 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="r2")
    r2 = evaluator1.evaluate(predictions)
    # Evaluation - RMSE
    evaluator2 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="rmse")
    rmse = evaluator2.evaluate(predictions)

    # Append results to the list
    evaluation_results.append((percentage, r2, rmse))
    
    # Time for x% of data to complete
    print(f"With {percentage * 100}% data: {(time.time() - start):.2f} seconds.")

# Print the evaluation results
for percentage, r2, rmse in evaluation_results:
    print(f"With {percentage * 100}% data | R2: {r2:.4f} | RMSE: {rmse:.4f}")

23/12/17 00:10:07 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

With 25.0% data: 165.73 seconds.


                                                                                

With 50.0% data: 74.98 seconds.


                                                                                

With 75.0% data: 68.22 seconds.




With 100.0% data: 63.88 seconds.
With 25.0% data | R2: 1.0000 | RMSE: 0.2816
With 50.0% data | R2: 1.0000 | RMSE: 0.2809
With 75.0% data | R2: 1.0000 | RMSE: 0.2806
With 100.0% data | R2: 1.0000 | RMSE: 0.2812


                                                                                

## Decision Tree Regression

In [11]:
from pyspark.ml.regression import DecisionTreeRegressor

In [12]:
dtr_best = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="zphot", maxDepth=5)
pipeline = Pipeline(stages=[assembler, scaler, dtr_best])

In [13]:
evaluation_results = []
percentage_list = [0.25, 0.5, 0.75, 1.0]

# Loop through different percentages
for percentage in percentage_list:
    start = time.time()
    sampled_data = gal_data.sample(fraction=percentage, seed=42)
    
    # Split data, train on train set, make predictions on test set
    train_data, test_data = sampled_data.randomSplit([0.8, 0.2], seed=42)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)

    # Evaluation - R2
    evaluator1 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="r2")
    r2 = evaluator1.evaluate(predictions)
    # Evaluation - RMSE
    evaluator2 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="rmse")
    rmse = evaluator2.evaluate(predictions)

    # Append results to the list
    evaluation_results.append((percentage, r2, rmse))
    
    # Time for x% of data to complete
    print(f"With {percentage * 100}% data: {(time.time() - start):.2f} seconds.")

# Print the evaluation results
for percentage, r2, rmse in evaluation_results:
    print(f"With {percentage * 100}% data | R2: {r2:.4f} | RMSE: {rmse:.4f}")

                                                                                

With 25.0% data: 54.55 seconds.


                                                                                

With 50.0% data: 56.85 seconds.


                                                                                

With 75.0% data: 58.41 seconds.




With 100.0% data: 61.23 seconds.
With 25.0% data | R2: 0.9603 | RMSE: 160.4291
With 50.0% data | R2: 0.9521 | RMSE: 174.4568
With 75.0% data | R2: 0.9472 | RMSE: 182.6277
With 100.0% data | R2: 0.9523 | RMSE: 173.8204


                                                                                

## Random forest Regression

In [14]:
from pyspark.ml.regression import RandomForestRegressor

In [15]:
rfr_best = RandomForestRegressor(featuresCol="scaled_features", labelCol="zphot", numTrees=4 , featureSubsetStrategy='onethird')
pipeline = Pipeline(stages=[assembler, scaler, rfr_best])

In [16]:
evaluation_results = []
percentage_list = [0.25, 0.5, 0.75, 1.0]

# Loop through different percentages
for percentage in percentage_list:
    start = time.time()
    sampled_data = gal_data.sample(fraction=percentage, seed=42)
    
    # Split data, train on train set, make predictions on test set
    train_data, test_data = sampled_data.randomSplit([0.8, 0.2], seed=42)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)

    # Evaluation - R2
    evaluator1 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="r2")
    r2 = evaluator1.evaluate(predictions)
    # Evaluation - RMSE
    evaluator2 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="rmse")
    rmse = evaluator2.evaluate(predictions)

    # Append results to the list
    evaluation_results.append((percentage, r2, rmse))
    
    # Time for x% of data to complete
    print(f"With {percentage * 100}% data: {(time.time() - start):.2f} seconds.")

# Print the evaluation results
for percentage, r2, rmse in evaluation_results:
    print(f"With {percentage * 100}% data | R2: {r2:.4f} | RMSE: {rmse:.4f}")

                                                                                

With 25.0% data: 57.34 seconds.


                                                                                

With 50.0% data: 43.26 seconds.




With 75.0% data: 43.28 seconds.




With 100.0% data: 44.53 seconds.
With 25.0% data | R2: 0.9093 | RMSE: 238.5070
With 50.0% data | R2: 0.9350 | RMSE: 205.2915
With 75.0% data | R2: 0.9318 | RMSE: 211.8675
With 100.0% data | R2: 0.9157 | RMSE: 234.7890


                                                                                

## Gradient Boosted Tree Regression

In [17]:
from pyspark.ml.regression import GBTRegressor

In [18]:
gbtr_best = GBTRegressor(featuresCol="scaled_features", labelCol="zphot", lossType='squared')
pipeline = Pipeline(stages=[assembler, scaler, gbtr_best])

In [19]:
evaluation_results = []
percentage_list = [0.25, 0.5, 0.75, 1.0]

# Loop through different percentages
for percentage in percentage_list:
    start = time.time()
    sampled_data = gal_data.sample(fraction=percentage, seed=42)
    
    # Split data, train on train set, make predictions on test set
    train_data, test_data = sampled_data.randomSplit([0.8, 0.2], seed=42)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)

    # Evaluation - R2
    evaluator1 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="r2")
    r2 = evaluator1.evaluate(predictions)
    # Evaluation - RMSE
    evaluator2 = RegressionEvaluator(labelCol="zphot", predictionCol="prediction", metricName="rmse")
    rmse = evaluator2.evaluate(predictions)

    # Append results to the list
    evaluation_results.append((percentage, r2, rmse))
    
    # Time for x% of data to complete
    print(f"With {percentage * 100}% data: {(time.time() - start):.2f} seconds.")

# Print the evaluation results
for percentage, r2, rmse in evaluation_results:
    print(f"With {percentage * 100}% data | R2: {r2:.4f} | RMSE: {rmse:.4f}")

                                                                                

With 25.0% data: 69.90 seconds.


                                                                                

With 50.0% data: 76.22 seconds.


                                                                                

With 75.0% data: 81.13 seconds.




With 100.0% data: 86.74 seconds.
With 25.0% data | R2: 0.9613 | RMSE: 155.7864
With 50.0% data | R2: 0.9682 | RMSE: 143.5513
With 75.0% data | R2: 0.9675 | RMSE: 146.3123
With 100.0% data | R2: 0.9678 | RMSE: 145.1515


                                                                                