In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=fdd99ef29b70cdeb7fc7b7ddb6aef034a7e6ed61e6a8ea1dde67e3aa4f5ccffd
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, BooleanType, DateType, FloatType
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [None]:
spark = SparkSession.builder.appName('DSCI632_Project').getOrCreate()

In [None]:
udemy = spark.read.option("inferSchema", True).option("header", True).csv("/content/gdrive/MyDrive/dsci632/FinalProject/udemy_courses.csv")
udemy.printSchema()

root
 |-- course_id: integer (nullable = true)
 |-- course_title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: integer (nullable = true)
 |-- num_subscribers: integer (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- num_lectures: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- content_duration: string (nullable = true)
 |-- published_timestamp: timestamp (nullable = true)
 |-- subject: string (nullable = true)



In [None]:
udemy = udemy.withColumn("is_paid",udemy.is_paid.cast(FloatType()))
udemy = udemy.withColumn("price",udemy.is_paid.cast(FloatType()))
udemy = udemy.withColumn("content_duration",udemy.is_paid.cast(FloatType()))

In [None]:
udemy.show()

+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+----------------+-------------------+----------------+
|course_id|        course_title|                 url|is_paid|price|num_subscribers|num_reviews|num_lectures|             level|content_duration|published_timestamp|         subject|
+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+----------------+-------------------+----------------+
|  1070968|Ultimate Investme...|https://www.udemy...|    1.0|  1.0|           2147|         23|          51|        All Levels|             1.0|2017-01-18 20:58:58|Business Finance|
|  1113822|Complete GST Cour...|https://www.udemy...|    1.0|  1.0|           2792|        923|         274|        All Levels|             1.0|2017-03-09 16:34:20|Business Finance|
|  1006314|Financial Modelin...|https://www.udemy...|    1.0|  1.0|           2174|       

## Using some course features to predict weather the course is free or paid
We will use the subject, num_subscribers, num_reviews, num_lectures and level attributes to predict whether the course is paid or free.

We will use the following supervised classification algorithms to experiment with the data - 
1.   Logistic Regression
2.   Decision Tree
3.   Support Vector Machines



Constructing Feature Vectors

In [None]:
cols = ["level", "subject"]
stages = [StringIndexer(inputCol=column, outputCol=column+"_idx").setHandleInvalid("skip").fit(udemy) for column in cols]

# Building Feature 
feature_cols = ["num_subscribers", "num_reviews", "num_lectures", "level_idx", "subject_idx"]
stages.append(VectorAssembler(inputCols=feature_cols, outputCol='features'))

# Building Pipeline
pipeline = Pipeline(stages=stages)
udemy_trx = pipeline.fit(udemy).transform(udemy)

# Select Feature Vector Column And Target Claim Column
udemy_trx = udemy_trx.select("features", "is_paid")
udemy_trx = udemy_trx.orderBy(rand())
udemy_trx.show(10)

+--------------------+-------+
|            features|is_paid|
+--------------------+-------+
|[540.0,13.0,15.0,...|    1.0|
|[45.0,4.0,48.0,1....|    1.0|
|[16158.0,159.0,12...|    1.0|
|[1018.0,12.0,22.0...|    1.0|
|[1393.0,27.0,21.0...|    1.0|
|[1186.0,12.0,7.0,...|    1.0|
|[1085.0,8.0,13.0,...|    1.0|
|[106.0,13.0,12.0,...|    1.0|
|[35.0,4.0,26.0,0....|    1.0|
|[15493.0,415.0,25...|    0.0|
+--------------------+-------+
only showing top 10 rows



Splitting data into train and test

In [None]:
train, test = udemy_trx.randomSplit([0.8, 0.2], seed=12345)

**Logistic Regression**

In [None]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(featuresCol = 'features', labelCol='is_paid')
log_reg_model = log_reg.fit(train)
print("Coefficients: " + str(log_reg_model.coefficients))
print("Intercept: " + str(log_reg_model.intercept))

Coefficients: [-0.00012719069800616606,0.00028373218077309796,0.04398570656526946,-0.15014748052190857,0.1478932495897718]
Intercept: 1.6178828694889427


In [None]:
predictions = log_reg_model.transform(test)
predictions.select('is_paid', 'features', 'rawPrediction', 'prediction', 'probability').toPandas().head(5)

Unnamed: 0,is_paid,features,rawPrediction,prediction,probability
0,1.0,"(0.0, 0.0, 6.0, 0.0, 1.0)","[-2.029690358470331, 2.029690358470331]",1.0,"[0.11612069887085782, 0.8838793011291421]"
1,1.0,"(0.0, 0.0, 20.0, 0.0, 1.0)","[-2.6454902503841033, 2.6454902503841033]",1.0,"[0.06626751010598507, 0.9337324898940149]"
2,1.0,"(0.0, 0.0, 20.0, 0.0, 1.0)","[-2.6454902503841033, 2.6454902503841033]",1.0,"[0.06626751010598507, 0.9337324898940149]"
3,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)","[-1.7657761190787145, 1.7657761190787145]",1.0,"[0.1460683965916868, 0.8539316034083132]"
4,1.0,"[0.0, 0.0, 7.0, 1.0, 1.0]","[-1.9235285845136922, 1.9235285845136922]",1.0,"[0.12746859974373384, 0.8725314002562662]"


In [None]:
# Metrics For Logistic Regression
predictionAndLabels = predictions.rdd.map(lambda rec:(rec.prediction, rec.is_paid))
metrics = MulticlassMetrics(predictionAndLabels)
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)



Summary Stats
Precision = 0.9353932584269663
Recall = 0.9940298507462687
F1 Score = 0.9638205499276411


**Decision Tree**

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dec_tree = DecisionTreeClassifier(featuresCol='features', labelCol='is_paid', impurity='gini')
dec_tree_model = dec_tree.fit(train)

In [None]:
predictions = dec_tree_model.transform(test)
predictions.select('is_paid', 'features', 'rawPrediction', 'prediction', 'probability').toPandas().head(5)

Unnamed: 0,is_paid,features,rawPrediction,prediction,probability
0,1.0,"(0.0, 0.0, 6.0, 0.0, 1.0)","[40.0, 1806.0]",1.0,"[0.021668472372697724, 0.9783315276273022]"
1,1.0,"(0.0, 0.0, 20.0, 0.0, 1.0)","[40.0, 1806.0]",1.0,"[0.021668472372697724, 0.9783315276273022]"
2,1.0,"(0.0, 0.0, 20.0, 0.0, 1.0)","[40.0, 1806.0]",1.0,"[0.021668472372697724, 0.9783315276273022]"
3,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)","[40.0, 1806.0]",1.0,"[0.021668472372697724, 0.9783315276273022]"
4,1.0,"[0.0, 0.0, 7.0, 1.0, 1.0]","[40.0, 1806.0]",1.0,"[0.021668472372697724, 0.9783315276273022]"


In [None]:
# Metrics For Decision Trees
predictionAndLabels = predictions.rdd.map(lambda rec:(rec.prediction, rec.is_paid))
metrics = MulticlassMetrics(predictionAndLabels)
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)



Summary Stats
Precision = 0.9507246376811594
Recall = 0.9791044776119403
F1 Score = 0.9647058823529412


**Support Vector Machine**

In [None]:
from pyspark.ml.classification import LinearSVC
lsvm = LinearSVC(featuresCol='features', labelCol='is_paid')
lsvm_model = lsvm.fit(train)

In [None]:
predictions = lsvm_model.transform(test)
predictions.select('is_paid', 'features', 'rawPrediction', 'prediction').toPandas().head(5)

Unnamed: 0,is_paid,features,rawPrediction,prediction
0,1.0,"(0.0, 0.0, 6.0, 0.0, 1.0)","[-1.0115035936446453, 1.0115035936446453]",1.0
1,1.0,"(0.0, 0.0, 20.0, 0.0, 1.0)","[-1.0608045105554709, 1.0608045105554709]",1.0
2,1.0,"(0.0, 0.0, 20.0, 0.0, 1.0)","[-1.0608045105554709, 1.0608045105554709]",1.0
3,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)","[-0.9903746292542915, 0.9903746292542915]",1.0
4,1.0,"[0.0, 0.0, 7.0, 1.0, 1.0]","[-1.0113916176638944, 1.0113916176638944]",1.0


In [None]:
# Metrics For Support Vector Machine
predictionAndLabels = predictions.rdd.map(lambda rec:(rec.prediction, rec.is_paid))
metrics = MulticlassMetrics(predictionAndLabels)
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)



Summary Stats
Precision = 0.9241379310344827
Recall = 1.0
F1 Score = 0.9605734767025089


#### How our models have performed for predicting whether a course is paid or free?

Model | Precision | Recall | F1 Score
--- | --- | --- | ---
Logistic Regression | 0.935 | 0.994 | 0.963
Decision Trees | 0.951 | 0.979 | 0.964
Linear SVM | 0.924 | 1.0 | 0.961

## Using some course features to predict the price of a course
We will use the content_duration, num_lectures, subject and level attributes to predict whether price of the course

We will use the following supervised classification algorithms to experiment with the data - 
1.   Linear Regression
2.   Decision Tree Regression
3.   Random Forest Regression

In [None]:
udemy = spark.read.option("inferSchema", True).option("header", True).csv("/content/gdrive/MyDrive/dsci632/FinalProject/udemy_courses.csv")
udemy.printSchema()

root
 |-- course_id: integer (nullable = true)
 |-- course_title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: integer (nullable = true)
 |-- num_subscribers: integer (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- num_lectures: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- content_duration: string (nullable = true)
 |-- published_timestamp: timestamp (nullable = true)
 |-- subject: string (nullable = true)



In [None]:
udemy = udemy.withColumn("is_paid",udemy.is_paid.cast(FloatType()))
udemy = udemy.withColumn("price",udemy.price.cast(FloatType()))
udemy = udemy.withColumn("content_duration",udemy.content_duration.cast(FloatType()))

In [None]:
# Selecting only paid courses
udemy = udemy.filter(udemy.is_paid == True)

In [None]:
udemy.show()

+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+----------------+-------------------+----------------+
|course_id|        course_title|                 url|is_paid|price|num_subscribers|num_reviews|num_lectures|             level|content_duration|published_timestamp|         subject|
+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+----------------+-------------------+----------------+
|  1070968|Ultimate Investme...|https://www.udemy...|    1.0|200.0|           2147|         23|          51|        All Levels|             1.5|2017-01-18 20:58:58|Business Finance|
|  1113822|Complete GST Cour...|https://www.udemy...|    1.0| 75.0|           2792|        923|         274|        All Levels|            39.0|2017-03-09 16:34:20|Business Finance|
|  1006314|Financial Modelin...|https://www.udemy...|    1.0| 45.0|           2174|       

Constructing Feature Vectors

In [None]:
cols = ["level", "subject"]
stages = [StringIndexer(inputCol=column, outputCol=column+"_idx").setHandleInvalid("skip").fit(udemy) for column in cols]

# Building Feature 
feature_cols = ["content_duration", "num_lectures", "level_idx", "subject_idx"]
stages.append(VectorAssembler(inputCols=feature_cols, outputCol='features'))

# Building Pipeline
pipeline = Pipeline(stages=stages)
udemy_trx = pipeline.fit(udemy).transform(udemy)

# Select Feature Vector Column And Target Claim Column
udemy_trx = udemy_trx.select("features", "price")
udemy_trx.show()

+--------------------+-----+
|            features|price|
+--------------------+-----+
|  [1.5,51.0,0.0,0.0]|200.0|
|[39.0,274.0,0.0,0.0]| 75.0|
|  [2.5,51.0,2.0,0.0]| 45.0|
|  [3.0,36.0,0.0,0.0]| 95.0|
|  [2.0,26.0,2.0,0.0]|200.0|
|  [3.0,25.0,0.0,0.0]|150.0|
|  [1.0,26.0,1.0,0.0]| 65.0|
|  [2.5,23.0,0.0,0.0]| 95.0|
|  [2.5,38.0,3.0,0.0]|195.0|
|  [1.0,15.0,0.0,0.0]|200.0|
|  [5.0,76.0,0.0,0.0]|200.0|
|  [1.0,17.0,0.0,0.0]|200.0|
|  [1.5,19.0,0.0,0.0]| 30.0|
|  [2.0,16.0,0.0,0.0]|195.0|
|  [7.0,42.0,0.0,0.0]|200.0|
|  [1.5,19.0,2.0,0.0]| 75.0|
|  [1.5,16.0,1.0,0.0]| 20.0|
|  [4.0,52.0,0.0,0.0]|200.0|
|  [1.5,23.0,0.0,0.0]| 50.0|
|[0.58333331346511...| 95.0|
+--------------------+-----+
only showing top 20 rows



Splitting data into train and test

In [None]:
train, test = udemy_trx.randomSplit([0.8, 0.2], seed=12345)

**Linear Regression**

In [None]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(featuresCol='features', labelCol='price')
lin_reg_model = lin_reg.fit(train)
print("Coefficients: " + str(lin_reg_model.coefficients))
print("Intercept: " + str(lin_reg_model.intercept))

Coefficients: [0.6515697103980651,0.3020862440156468,-3.2300138957749946,-7.079583171447448]
Intercept: 66.22341429536631


In [None]:
lin_reg_preds = lin_reg_model.transform(test)
lin_reg_preds.select("prediction","price","features").show(5)

+------------------+-----+--------------------+
|        prediction|price|            features|
+------------------+-----+--------------------+
| 53.90046402774868|135.0|   [0.5,5.0,0.0,2.0]|
|62.199251660075596| 20.0|[0.51666665077209...|
|  58.9692377643006| 50.0|[0.51666665077209...|
| 67.86133839984193|200.0|[0.51666665077209...|
|54.224269280234665|130.0|[0.53333336114883...|
+------------------+-----+--------------------+
only showing top 5 rows



In [None]:
lin_reg_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="price",
    metricName="rmse"
)
print("Root Mean Squared Error (RMSE) on test data = %g" % lin_reg_evaluator.evaluate(lin_reg_preds))

Root Mean Squared Error (RMSE) on test data = 59.4739


**Decision Tree Regressor**

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dec_tree_reg = DecisionTreeRegressor(featuresCol='features', labelCol='price')
dec_tree_reg_model = dec_tree_reg.fit(train)

In [None]:
dec_tree_reg_preds = dec_tree_reg_model.transform(test)
dec_tree_reg_preds.select("prediction","price","features").show(5)

+------------------+-----+--------------------+
|        prediction|price|            features|
+------------------+-----+--------------------+
|43.982300884955755|135.0|   [0.5,5.0,0.0,2.0]|
| 39.82142857142857| 20.0|[0.51666665077209...|
| 39.82142857142857| 50.0|[0.51666665077209...|
| 48.14159292035398|200.0|[0.51666665077209...|
|43.982300884955755|130.0|[0.53333336114883...|
+------------------+-----+--------------------+
only showing top 5 rows



In [None]:
dec_tree_reg_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="price",
    metricName="rmse"
)
print("Root Mean Squared Error (RMSE) on test data = %g" % dec_tree_reg_evaluator.evaluate(dec_tree_reg_preds))

Root Mean Squared Error (RMSE) on test data = 58.2871


**Random Forest Regressor**

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf_reg = RandomForestRegressor(featuresCol='features', labelCol='price')
rf_reg_model = rf_reg.fit(train)

In [None]:
rf_reg_preds = rf_reg_model.transform(test)
rf_reg_preds.select("prediction","price","features").show(5)

+------------------+-----+--------------------+
|        prediction|price|            features|
+------------------+-----+--------------------+
| 40.42444770475924|135.0|   [0.5,5.0,0.0,2.0]|
|  41.4649543762185| 20.0|[0.51666665077209...|
|41.624703015852404| 50.0|[0.51666665077209...|
| 52.38912740304287|200.0|[0.51666665077209...|
| 40.42444770475924|130.0|[0.53333336114883...|
+------------------+-----+--------------------+
only showing top 5 rows



In [None]:
rf_reg_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="price",
    metricName="rmse"
)
print("Root Mean Squared Error (RMSE) on test data = %g" % rf_reg_evaluator.evaluate(rf_reg_preds))

Root Mean Squared Error (RMSE) on test data = 57.867


Regression Model | RMSE
--- | ---
Linear Regression | 59.4739
Decision Tree Regression | 58.2871
Random Forest Regressor | 57.867