In [0]:
# Required for installation of Spark in Google Colab

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

import findspark
findspark.init()

# Creates a Spark session

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [0]:
# Can be used to import local files to Colab
from google.colab import files
files.upload()

{}

In [0]:
data = spark.read.csv('/content/sample_data/california_housing_train.csv',inferSchema=True, header=True)

In [0]:
# show top 5 rows of the dataframe
data.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [0]:
# to get the shape of the data
data.count(),  len(data.columns) # number of rows, number of columns

(17000, 9)

In [0]:
# prints the schema of the dataframe.
# it will return the datatypes and other details about the dataset.
# similar to the pandas df.describe() method.
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [0]:
# only display selected columns 
data.select("median_income","median_house_value","housing_median_age") \
.show(15, truncate = False)

+-------------+------------------+------------------+
|median_income|median_house_value|housing_median_age|
+-------------+------------------+------------------+
|1.4936       |66900.0           |15.0              |
|1.82         |80100.0           |19.0              |
|1.6509       |85700.0           |17.0              |
|3.1917       |73400.0           |14.0              |
|1.925        |65500.0           |20.0              |
|3.3438       |74000.0           |29.0              |
|2.6768       |82400.0           |25.0              |
|1.7083       |48500.0           |41.0              |
|2.1782       |58400.0           |34.0              |
|2.1908       |48100.0           |46.0              |
|2.6797       |86500.0           |16.0              |
|1.625        |62000.0           |21.0              |
|2.1571       |48600.0           |48.0              |
|3.212        |70400.0           |31.0              |
|0.8585       |45000.0           |15.0              |
+-------------+-------------

In [0]:
# to get the statistical summary of the selected columns
data.describe(["median_income","median_house_value","housing_median_age", "longitude", "latitude"]).show()

+-------+------------------+------------------+------------------+-------------------+------------------+
|summary|     median_income|median_house_value|housing_median_age|          longitude|          latitude|
+-------+------------------+------------------+------------------+-------------------+------------------+
|  count|             17000|             17000|             17000|              17000|             17000|
|   mean| 3.883578100000021|207300.91235294117| 28.58935294117647|-119.56210823529375|  35.6252247058827|
| stddev|1.9081565183791036|115983.76438720895|12.586936981660406| 2.0051664084260357|2.1373397946570867|
|    min|            0.4999|           14999.0|               1.0|            -124.35|             32.54|
|    max|           15.0001|          500001.0|              52.0|            -114.31|             41.95|
+-------+------------------+------------------+------------------+-------------------+------------------+



In [0]:
# using the audible dataset

data = spark.read.csv('/content/all_english_audible.csv',inferSchema=True, header=True)

In [0]:
data.show(5)

+----------+--------------------+--------------------+------------------+--------------------+--------------------+-----+------+------------+------------+--------------------+
|      asin|              author|            category|            length|                link|            narrator|price|rating|rating_count|release_date|               title|
+----------+--------------------+--------------------+------------------+--------------------+--------------------+-----+------+------------+------------+--------------------+
|      null|        Paul Stanley|Arts & Entertainment| 4 hrs and 38 mins|https://www.audib...|          Sean Pratt|20.52|   4.0|        78.0|  2019-04-30|      Backstage Pass|
|B0052OM9XK|          Simon Pegg|Arts & Entertainment| 4 hrs and 50 mins|https://www.audib...|          Simon Pegg| 24.5|   4.5|       872.0|  2011-06-09|        Nerd Do Well|
|B077XN66F8|        Sidney Lumet|Arts & Entertainment| 7 hrs and 23 mins|https://www.audib...| Richard M. Davidson|20.99

In [0]:
data.count(), len(data.columns)

(44384, 11)

In [0]:
data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- author: string (nullable = true)
 |-- category: string (nullable = true)
 |-- length: string (nullable = true)
 |-- link: string (nullable = true)
 |-- narrator: string (nullable = true)
 |-- price: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_count: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- title: string (nullable = true)



In [0]:
data.describe().show()

+-------+--------------------+-----------------+-------------------+--------------------+--------------------+-----------------+--------------------+------------------+------------------+-----------------+--------------------+
|summary|                asin|           author|           category|              length|                link|         narrator|               price|            rating|      rating_count|     release_date|               title|
+-------+--------------------+-----------------+-------------------+--------------------+--------------------+-----------------+--------------------+------------------+------------------+-----------------+--------------------+
|  count|               41907|            44267|              44384|               44381|               44384|            43962|               44371|             44333|             44378|            44379|               44384|
|   mean|     1.49163573135E9|             null|               null|                null|   

In [0]:
data.groupBy("author") \
.count() \
.orderBy("count", ascending=False) \
.show(10)

+--------------------+-----+
|              author|count|
+--------------------+-----+
|Charles River Edi...|  641|
|Harvard Business ...|  335|
|   Geoffrey Giuliano|  219|
|        Holly Newson|  195|
|Robin Morgan-Bentley|  164|
|          IntroBooks|  162|
|      Hourly History|  122|
|           Instaread|  119|
|                null|  117|
|  Edward Lucie-Smith|  102|
+--------------------+-----+
only showing top 10 rows



In [0]:
# filtering the null values
condition1 = data.rating.isNotNull()

data = data.filter(condition1)
condition1 = data.price.isNotNull()
data = data.filter(condition1)
condition1 = data.rating_count.isNotNull()
data = data.filter(condition1)

In [178]:
data.describe().show()

+-------+--------------------+-----------------+-------------------+--------------------+--------------------+-----------------+--------------------+------------------+------------------+-----------------+--------------------+
|summary|                asin|           author|           category|              length|                link|         narrator|               price|            rating|      rating_count|     release_date|               title|
+-------+--------------------+-----------------+-------------------+--------------------+--------------------+-----------------+--------------------+------------------+------------------+-----------------+--------------------+
|  count|               41843|            44198|              44315|               44312|               44315|            43895|               44315|             44315|             44315|            44315|               44315|
|   mean|     1.49163573135E9|             null|               null|                null|   

In [148]:
data.show(2)

+----------+------------+--------------------+-----------------+--------------------+----------+-----+------+------------+------------+--------------+
|      asin|      author|            category|           length|                link|  narrator|price|rating|rating_count|release_date|         title|
+----------+------------+--------------------+-----------------+--------------------+----------+-----+------+------------+------------+--------------+
|      null|Paul Stanley|Arts & Entertainment|4 hrs and 38 mins|https://www.audib...|Sean Pratt|20.52|   4.0|        78.0|  2019-04-30|Backstage Pass|
|B0052OM9XK|  Simon Pegg|Arts & Entertainment|4 hrs and 50 mins|https://www.audib...|Simon Pegg| 24.5|   4.5|       872.0|  2011-06-09|  Nerd Do Well|
+----------+------------+--------------------+-----------------+--------------------+----------+-----+------+------------+------------+--------------+
only showing top 2 rows



In [0]:
# converting the values from string to double

from pyspark.sql.types import DoubleType

data = data.withColumn("price", data["price"].cast(DoubleType()))
data = data.withColumn("rating_count", data["rating_count"].cast(DoubleType()))
data = data.withColumn("rating", data["rating"].cast(DoubleType()))

In [150]:
data.show()

+----------+--------------------+--------------------+------------------+--------------------+--------------------+-----+------+------------+------------+--------------------+
|      asin|              author|            category|            length|                link|            narrator|price|rating|rating_count|release_date|               title|
+----------+--------------------+--------------------+------------------+--------------------+--------------------+-----+------+------------+------------+--------------------+
|      null|        Paul Stanley|Arts & Entertainment| 4 hrs and 38 mins|https://www.audib...|          Sean Pratt|20.52|   4.0|        78.0|  2019-04-30|      Backstage Pass|
|B0052OM9XK|          Simon Pegg|Arts & Entertainment| 4 hrs and 50 mins|https://www.audib...|          Simon Pegg| 24.5|   4.5|       872.0|  2011-06-09|        Nerd Do Well|
|B077XN66F8|        Sidney Lumet|Arts & Entertainment| 7 hrs and 23 mins|https://www.audib...| Richard M. Davidson|20.99

In [0]:
# vector assembler to create the model from the data

# VectorAssembler can take inout columns and transform them into a single column (named “predictors”)

from pyspark.ml.feature import VectorAssembler
inputcols = ["price",  "rating_count"]
assembler = VectorAssembler(inputCols= inputcols,
                            outputCol = "predictors")

In [181]:
predictors = assembler.setHandleInvalid("skip").transform(data)
predictors.columns

['asin',
 'author',
 'category',
 'length',
 'link',
 'narrator',
 'price',
 'rating',
 'rating_count',
 'release_date',
 'title',
 'predictors']

In [186]:
# collect the predictors column and target column in their separate datframe

model_data = predictors.select("predictors", "rating")
model_data.show(10,truncate=False)
model_data.na.drop().show()

+--------------+------+
|predictors    |rating|
+--------------+------+
|[20.52,78.0]  |4.0   |
|[24.5,872.0]  |4.5   |
|[20.99,90.0]  |5.0   |
|[20.99,805.0] |4.5   |
|[52.95,585.0] |5.0   |
|[29.65,390.0] |4.5   |
|[24.95,1398.0]|4.5   |
|[19.95,60.0]  |4.5   |
|[40.59,4.0]   |3.0   |
|[42.0,841.0]  |4.5   |
+--------------+------+
only showing top 10 rows

+--------------+------+
|    predictors|rating|
+--------------+------+
|  [20.52,78.0]|   4.0|
|  [24.5,872.0]|   4.5|
|  [20.99,90.0]|   5.0|
| [20.99,805.0]|   4.5|
| [52.95,585.0]|   5.0|
| [29.65,390.0]|   4.5|
|[24.95,1398.0]|   4.5|
|  [19.95,60.0]|   4.5|
|   [40.59,4.0]|   3.0|
|  [42.0,841.0]|   4.5|
| [21.48,176.0]|   4.5|
|    [0.0,53.0]|   4.5|
|    [0.0,16.0]|   4.0|
|   [6.95,30.0]|   4.5|
| [27.37,144.0]|   4.5|
| [39.95,381.0]|   4.0|
|   [0.0,162.0]|   4.5|
|   [6.95,58.0]|   4.5|
|[27.99,1675.0]|   4.5|
| [27.99,355.0]|   4.0|
+--------------+------+
only showing top 20 rows



In [0]:
# split the data into testing and training

train_data,test_data = model_data.randomSplit([0.8,0.2])

In [0]:
# training the regression model

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol = 'predictors', 
    labelCol = 'rating')

lrModel = lr.fit(train_data)

In [0]:
pred = lrModel.evaluate(test_data)

In [188]:
# acces the model parameters

lrModel.coefficients

DenseVector([0.0499, 0.0])

In [189]:
lrModel.intercept

2.607785884371491

In [193]:
# View the final model predictions on the test data

pred.predictions.show(10)

+----------+------+------------------+
|predictors|rating|        prediction|
+----------+------+------------------+
| (2,[],[])|   0.0| 2.607785884371491|
| (2,[],[])|   0.0| 2.607785884371491|
| (2,[],[])|   0.0| 2.607785884371491|
| [0.0,1.0]|   2.0|2.6078319015612497|
| [0.0,1.0]|   3.0|2.6078319015612497|
| [0.0,1.0]|   3.0|2.6078319015612497|
| [0.0,1.0]|   3.0|2.6078319015612497|
| [0.0,1.0]|   3.0|2.6078319015612497|
| [0.0,1.0]|   3.0|2.6078319015612497|
| [0.0,1.0]|   3.0|2.6078319015612497|
+----------+------+------------------+
only showing top 10 rows



In [0]:
# For more detailed information on the model performance

from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(
    labelCol="rating", 
    predictionCol="prediction", 
    metricName="rmse")

In [197]:
# Computing some statistics for the model

rmse = eval.evaluate(pred.predictions)
print("rmse: ", rmse)
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("mse: ", mse)
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("mae: ", mae)
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: ", r2)

rmse:  1.659140253662445
mse:  2.752746381323083
mae:  1.2934172060348403
r2:  0.07246204701103753


In [199]:
# Gradient boosted tree regression

from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'predictors', labelCol = 'rating', maxIter=10)
gbt_model = gbt.fit(train_data)
gbt_predictions = gbt_model.transform(test_data)
gbt_predictions.select('prediction', 'rating', 'predictors').show(5)

+--------------------+------+----------+
|          prediction|rating|predictors|
+--------------------+------+----------+
|-0.04924867870855...|   0.0| (2,[],[])|
|-0.04924867870855...|   0.0| (2,[],[])|
|-0.04924867870855...|   0.0| (2,[],[])|
|   3.684271727528901|   2.0| [0.0,1.0]|
|   3.684271727528901|   3.0| [0.0,1.0]|
+--------------------+------+----------+
only showing top 5 rows



In [200]:
gbt_evaluator = RegressionEvaluator(
    labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.6546


In [205]:
# Decision Tree Regression

from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='predictors', labelCol = 'rating')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_predictions.select('prediction', 'rating', 'predictors').show(5)

+------------------+------+----------+
|        prediction|rating|predictors|
+------------------+------+----------+
|               0.0|   0.0| (2,[],[])|
|               0.0|   0.0| (2,[],[])|
|               0.0|   0.0| (2,[],[])|
|3.8414264036418815|   2.0| [0.0,1.0]|
|3.8414264036418815|   3.0| [0.0,1.0]|
+------------------+------+----------+
only showing top 5 rows



In [207]:
dt_evaluator = RegressionEvaluator(
    labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.656283
