## Learning Spark Orielly Book Study

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import warnings

In [2]:
sc= SparkContext(appName="schema", master="local[*]").getOrCreate()
ss= SparkSession.builder.appName("example_schema").getOrCreate()

**Defining Schema** <br/>
There are 2 ways to define a schema. We will use simplest one which is employing the DDL(Data Defination Language).

In [4]:
schema= "name STRING, team STRING, assist INT" # we defined the schema for dataFrame

In [6]:
data= [["gedson", "galatasaray", 12], ["pelkas", "fenerbahce", 7], ["josef", "besiktas", 8]]
df= ss.createDataFrame(data, schema)

### Machine Learning With MLib
spark.mllib is the original machine learning API, based on the RDD API (which has been in maintenance mode since Spark 2.0), while spark.ml is the newer API, based on Data‐ Frames. 

In [3]:
airbnbDf= ss.read.csv("airbnb.csv", header=True, inferSchema=False)
airbnbDf.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



In [99]:
df= airbnbDf.select("latitude", "longitude", "price", "minimum_nights" ,
                "number_of_reviews" ,"last_review", "reviews_per_month")
df.show(5, truncate=10)

+--------+----------+-----+--------------+-----------------+-----------+-----------------+
|latitude| longitude|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|
+--------+----------+-----+--------------+-----------------+-----------+-----------------+
|37.77028|-122.43317|  150|             2|              277| 2021-04-05|             1.94|
|37.74474|-122.42089|  195|            30|              111| 2017-08-06|             0.76|
|37.76555|-122.45213|   56|            32|               19| 2020-03-06|             0.13|
|37.76555|-122.45213|   56|            32|                8| 2018-09-12|             0.10|
|37.77564|-122.43642|  795|             7|               28| 2019-06-28|             0.20|
+--------+----------+-----+--------------+-----------------+-----------+-----------------+
only showing top 5 rows



In [100]:
df.count()

6689

In [101]:
df.printSchema()

root
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)



In [102]:
df= df.drop("last_review")

In [104]:
for i in df.columns:
    df= df.withColumn(i, df[i].cast(IntegerType()))  #changing the column types

In [62]:
df.printSchema()

root
 |-- latitude: integer (nullable = true)
 |-- longitude: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: integer (nullable = true)
 |-- reviews_per_month: integer (nullable = true)



In [117]:
df= df.na.drop("any")
df.show(5)

+--------+---------+-----+--------------+-----------------+-----------------+
|latitude|longitude|price|minimum_nights|number_of_reviews|reviews_per_month|
+--------+---------+-----+--------------+-----------------+-----------------+
|      37|     -122|  150|             2|              277|                1|
|      37|     -122|  195|            30|              111|                0|
|      37|     -122|   56|            32|               19|                0|
|      37|     -122|   56|            32|                8|                0|
|      37|     -122|  795|             7|               28|                0|
+--------+---------+-----+--------------+-----------------+-----------------+
only showing top 5 rows



In [109]:
df_train, df_test= df.randomSplit([.8, .2], seed=42)
print(f"""There are {df_train.count()} rows in train set.  \nThere are {df_test.count()} rows in test set.""")

There are 4048 rows in train set.  
There are 951 rows in test set.


Linear regression (like many other algorithms in Spark) requires that all the input features are contained within a single vector in your DataFrame. Thus, we need to ***transform*** our data. <br/>
For putting all the features into single vector we will use the **VectorAssembler transformer**.

In [112]:
from pyspark.ml.feature import VectorAssembler

In [113]:
vecAssembler= VectorAssembler(inputCols=["number_of_reviews"], outputCol="features")
vecTrainDf= vecAssembler.transform(df_train)

In [116]:
vecTrainDf.show(5)

+--------+---------+-----+--------------+-----------------+-----------------+--------+
|latitude|longitude|price|minimum_nights|number_of_reviews|reviews_per_month|features|
+--------+---------+-----+--------------+-----------------+-----------------+--------+
|      37|     -122|   10|            30|                1|                0|   [1.0]|
|      37|     -122|   25|            30|                2|                0|   [2.0]|
|      37|     -122|   25|            30|                4|                0|   [4.0]|
|      37|     -122|   25|            30|                4|                0|   [4.0]|
|      37|     -122|   25|            30|                5|                0|   [5.0]|
+--------+---------+-----+--------------+-----------------+-----------------+--------+
only showing top 5 rows



**Building Model**

In [118]:
from pyspark.ml.regression import LinearRegression

In [119]:
lr= LinearRegression(featuresCol="features", labelCol="price")
lrModel= lr.fit(vecTrainDf)

**Inspecting the parameters** <br/>
**Note:** round function filters the decimals

In [131]:
c= round(lrModel.coefficients[0], 2)
i= round(lrModel.intercept, 2)

print("The formula for the linear regression is: \nprice= {}*number_of_reviews + {}".format(c,i))

The formula for the linear regression is: 
price= -0.29*number_of_reviews + 218.67


#### Creating a Pipeline

In [133]:
from pyspark.ml import Pipeline

In [135]:
pipeline= Pipeline(stages=[vecAssembler, lr])
pipelineModel= pipeline.fit(df_train)

**Applying it to test data**

In [137]:
predDf= pipelineModel.transform(df_test)
predDf.select("number_of_reviews", "features", "price" ,"prediction").show(5)

+-----------------+--------+-----+------------------+
|number_of_reviews|features|price|        prediction|
+-----------------+--------+-----+------------------+
|                2|   [2.0]|   10|218.09041633638142|
|               19|  [19.0]|   10|213.19946428258487|
|                8|   [8.0]|   23|216.36419796445324|
|                2|   [2.0]|   27|218.09041633638142|
|                4|   [4.0]|   27|217.51501021240537|
+-----------------+--------+-----+------------------+
only showing top 5 rows



### SparseVector 
SparseVectors work: <br/>
    DenseVector(0, 0, 0, 7, 0, 2, 0, 0, 0, 0) <br/>
    SparseVector(10, [3, 5], [7, 2]) <br/>
The DenseVector in this example contains 10 values, all but 2 of which are 0. To cre‐ ate a SparseVector, we need to keep track of the size of the vector, the indices of the nonzero elements, and the corresponding values at those indices. In this example the size of the vector is 10, there are two nonzero values at indices 3 and 5, and the corre‐ sponding values at those indices are 7 and 2.

#### Evaluating Model

In [138]:
from pyspark.ml.evaluation import RegressionEvaluator

In [146]:
regEvaluator= RegressionEvaluator(predictionCol="prediction",
                                 labelCol="price",
                                 metricName="r2")
model_rmse= regEvaluator.evaluate(predDf)
print("R2 value of our model: {}".format(model_rmse))

R2 value of our model: 0.006958209446088159


**Saving and Loading Model**

In [147]:
pipelinePath= "/Users/ahmetemintek/Desktop/new_pyspark/lr_pipeline_model"
pipelineModel.write().save(pipelinePath)

In [148]:
#load
from pyspark.ml import PipelineModel
savedPipelineModel= PipelineModel.load(pipelinePath)