In [1]:
import findspark;
findspark.init()

## Create a spark session 
- Spark session is the `entry point for Spark` library.
- `pyspark.sql` can be considered as an interface between datasources (e.g Hive, csv files, databases etc)
- `pyspark.sql` provided streamlined SQL-like interface to interact with DataFrame obtained from datasource.

In [58]:
import pyspark
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("lin_reg_1").getOrCreate()

In [59]:
X_Cols = ["Freq_Hz", "AoA_Deg", "Chord_m", "V_inf_mps", "displ_thick_m"]
Y_Col = "sound_db"
df = sparkSession.read.csv('./airfoil_self_noise.csv', header=True, inferSchema=True)
#df_X = df.select(X_Cols)
#df_Y = df.select(Y_Col)
df.show()
type(df)

+-------+-------+-------+---------+-------------+--------+
|Freq_Hz|AoA_Deg|Chord_m|V_inf_mps|displ_thick_m|sound_db|
+-------+-------+-------+---------+-------------+--------+
|    800|    0.0| 0.3048|     71.3|    0.0026634|   126.2|
|   1000|    0.0| 0.3048|     71.3|    0.0026634|   125.2|
|   1250|    0.0| 0.3048|     71.3|    0.0026634|  125.95|
|   1600|    0.0| 0.3048|     71.3|    0.0026634|  127.59|
|   2000|    0.0| 0.3048|     71.3|    0.0026634|  127.46|
|   2500|    0.0| 0.3048|     71.3|    0.0026634|  125.57|
|   3150|    0.0| 0.3048|     71.3|    0.0026634|   125.2|
|   4000|    0.0| 0.3048|     71.3|    0.0026634|  123.06|
|   5000|    0.0| 0.3048|     71.3|    0.0026634|   121.3|
|   6300|    0.0| 0.3048|     71.3|    0.0026634|  119.54|
|   8000|    0.0| 0.3048|     71.3|    0.0026634|  117.15|
|  10000|    0.0| 0.3048|     71.3|    0.0026634|  115.39|
|  12500|    0.0| 0.3048|     71.3|    0.0026634|  112.24|
|  16000|    0.0| 0.3048|     71.3|    0.0026634|  108.7

pyspark.sql.dataframe.DataFrame

In [60]:
df.printSchema()

root
 |-- Freq_Hz: integer (nullable = true)
 |-- AoA_Deg: double (nullable = true)
 |-- Chord_m: double (nullable = true)
 |-- V_inf_mps: double (nullable = true)
 |-- displ_thick_m: double (nullable = true)
 |-- sound_db: double (nullable = true)



We can see our schema is full of string (if we don't infer schema while reading from datasource), so we need to transform it into `numbers` in order to do some operations.

- This casting can be done using `Column.cast()` 

In [61]:
from pyspark.sql.types import DoubleType

def cast_col():    
    casted_freq = df['Freq_Hz'].cast(DoubleType())
    return df.withColumn('_Freq_Hz', casted_freq)

`VectorAssembler` assembles all the feature into one. 

- This is required because Apis in spark expects only 2 columns, i.e feature column and output column.

In [62]:
from pyspark.ml.feature import VectorAssembler;

"""
def has_column(colName="", df):
    return df.columns.contains(colName)
"""

def assemble_features(df):
    _col_to_add = "feature"
    df = df.drop(_col_to_add)
    assembler = VectorAssembler(inputCols=X_Cols, outputCol="unscaled_features")
    df = assembler.transform(df)
    return df.select(["unscaled_features", Y_Col])

clean_df = assemble_features(df)
clean_df.show(3)

+--------------------+--------+
|   unscaled_features|sound_db|
+--------------------+--------+
|[800.0,0.0,0.3048...|   126.2|
|[1000.0,0.0,0.304...|   125.2|
|[1250.0,0.0,0.304...|  125.95|
+--------------------+--------+
only showing top 3 rows



Need to bring data on same/similar scale before fitting it into the algorithm. We have 2 options here:

- Standardization of data (`pyspark.ml.feature.StandardScaler`)
- Normalization of data (`pyspark.ml.feature.Normalizer`)

In [64]:
from pyspark.ml.feature import StandardScaler

def std_scale_df(df, _input_col="unscaled_features"):
    freq_scaler = StandardScaler(inputCol=_input_col, outputCol="features")
    return freq_scaler.fit(df).transform(df);

scale_df = std_scale_df(clean_df);
scale_df = scale_df.select(["features", Y_Col])
scale_df.show(3)

+--------------------+--------+
|            features|sound_db|
+--------------------+--------+
|[0.25376096453669...|   126.2|
|[0.31720120567086...|   125.2|
|[0.39650150708858...|  125.95|
+--------------------+--------+
only showing top 3 rows



`randomSplit()` generates the splitted dataframe based on split configuration passed in argumet.

**Note**: splitted dataframe is not guaranteed to have exact number(proportion) of element as specified in split confguration. 
Every element in dataframe is guaranteed to have equally-likeliness to be in either of the splits.

In [65]:
train_df, cv_df, test_df = scale_df.randomSplit([0.6, 0.2, 0.2])

In [9]:
"""
desc_train = train_df.describe()
desc_cv = cv_df.describe()
desc_test = test_df.describe()
print(desc_train[desc_train.summary == 'count'].show())
print(desc_cv[desc_cv.summary == 'count'].show())
print(desc_test[desc_test.summary == 'count'].show())
"""

"\ndesc_train = train_df.describe()\ndesc_cv = cv_df.describe()\ndesc_test = test_df.describe()\nprint(desc_train[desc_train.summary == 'count'].show())\nprint(desc_cv[desc_cv.summary == 'count'].show())\nprint(desc_test[desc_test.summary == 'count'].show())\n"

In [66]:
train_df.show(3)

+--------------------+--------+
|            features|sound_db|
+--------------------+--------+
|[0.06344024113417...|  117.19|
|[0.06344024113417...|  118.13|
|[0.06344024113417...|  128.68|
+--------------------+--------+
only showing top 3 rows



In [67]:
from pyspark.ml.regression import LinearRegression;

estimator = LinearRegression(labelCol=Y_Col)
model = estimator.fit(train_df)

model.summary
model.evaluate(train_df).rootMeanSquaredError

4.754475122299981

## Model evaluation 

- R^2 - This is a typical guideline(but not thumb rule), `Higher the R2 better the model`.
- R2 - (Variance explained by Model **/** Total variance)

In [68]:
cv_result = model.evaluate(cv_df)
print(cv_result.rootMeanSquaredError)
print(cv_result.r2)

4.925295283614273
0.4653855397200154


Our model is not able to nicely fit into the data. 

In [69]:
test_result = model.evaluate(test_df)
print(test_result.r2)

0.48183407493301744


# Persist a model

- When all looks good, save a model.

In [70]:
mlwriter = model.write();
mlwriter.overwrite().save('model_lin_reg')

- Model gets saved into current directory.
- Model is saved in `parquet` structure.

- We can also store model in `PMML` structure.

## Reloading a model

In [71]:
from pyspark.ml.regression import LinearRegressionModel

In [72]:
loaded_model = LinearRegressionModel.load('model_lin_reg')

In [75]:
user_sample = df.sample(False, 0.01).limit(1)
_assembler = VectorAssembler(inputCols=X_Cols, outputCol="_features")
tuned_sample = _assembler.transform(user_sample).select('_features')
tuned_sample = std_scale_df(tuned_sample, _input_col="_features")
_user_prediction = loaded_model.transform(tuned_sample)
_user_prediction.show()

+--------------------+--------------------+-----------------+
|           _features|            features|       prediction|
+--------------------+--------------------+-----------------+
|[2500.0,4.0,0.228...|[0.0,0.0,0.0,0.0,...|131.6272735005637|
+--------------------+--------------------+-----------------+



In [77]:
def foo():
    a = "s";
    b = 21
    return a, b

x, y = foo()
print(x, y)

s 21
