In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=f5f077b972e5c2c1236284c1e18637c4d0f2d8761bcb9ab7c1c951a852dc80ed
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.appName("NLP").getOrCreate()

In [13]:
title_df = spark.read.csv('/content/drive/MyDrive/Datasets/Course Title.csv',inferSchema=True,header=True)

In [14]:
category_df = spark.read.csv('/content/drive/MyDrive/Datasets/Course Category.csv',inferSchema=True,header=True)

In [15]:
feature_df = spark.read.csv('/content/drive/MyDrive/Datasets/Course Features.csv',inferSchema=True,header=True)

In [20]:
feature_df.show()

+-------+------+---------------+----------+-----------+------------+------------+------------------+
|is_paid| price|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|
+-------+------+---------------+----------+-----------+------------+------------+------------------+
|   true| 24.99|           2231|      3.75|        134|          42|          37|              1268|
|  false|   0.0|          26474|       4.5|        709|         112|           9|                88|
|   true| 19.99|           1713|       4.4|         41|          13|          14|                82|
|   true|199.99|           4988|       4.8|        395|          88|          36|              1511|
|   true|159.99|           1266|      4.75|         38|          12|          38|               569|
|   true| 29.99|          20505| 4.5301204|        796|         135|          31|              1163|
|   true| 39.99|           3309|      3.85|        958|         241|           8|          

In [16]:
from pyspark.sql.functions import monotonically_increasing_id

In [17]:
DF1 = title_df.withColumn("row_id", monotonically_increasing_id())
DF2 = category_df.withColumn("row_id", monotonically_increasing_id())
result_df = DF1.join(DF2,("row_id"))

In [21]:
DF3 = feature_df.withColumn("row_id",monotonically_increasing_id())
result_df = result_df.join(DF3,("row_id")).drop("row_id")

In [22]:
result_df.show(5)

+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+
|               title| category|is_paid| price|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|
+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+
|Online Vegan Vege...|Lifestyle|   true| 24.99|           2231|      3.75|        134|          42|          37|              1268|
|The Lean Startup ...| Business|  false|   0.0|          26474|       4.5|        709|         112|           9|                88|
|How To Become a V...|Lifestyle|   true| 19.99|           1713|       4.4|         41|          13|          14|                82|
|How to Train a Puppy|Lifestyle|   true|199.99|           4988|       4.8|        395|          88|          36|              1511|
|Web Design from t...|   Design|   true|159.99|           1266|      4.75|  

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer

In [24]:
tokenize = Tokenizer(inputCol = 'title', outputCol = 'wordtokens')
stopwords_remove = StopWordsRemover(inputCol = 'wordtokens', outputCol = 'cleanwords')
vectorizer = CountVectorizer(inputCol = 'cleanwords', outputCol = 'features')
idf = IDF(inputCol = 'features', outputCol = 'features_vector')
dummyencode = StringIndexer(inputCol = 'category', outputCol = 'label')

In [25]:
pipeline = Pipeline(stages=[tokenize, stopwords_remove, vectorizer,idf,dummyencode])

In [26]:
df = pipeline.fit(result_df)

In [27]:
df = df.transform(result_df)

In [29]:
df.show(5)

+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+-----+
|               title| category|is_paid| price|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|          wordtokens|          cleanwords|            features|     features_vector|label|
+--------------------+---------+-------+------+---------------+----------+-----------+------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+-----+
|Online Vegan Vege...|Lifestyle|   true| 24.99|           2231|      3.75|        134|          42|          37|              1268|[online, vegan, v...|[online, vegan, v...|(102545,[39,536,6...|(102545,[39,536,6...|  8.0|
|The Lean Startup ...| Business|  false|   0.0|          26474|       4.5|        709|         112|           9|

In [30]:
(df_train,df_test) = df.randomSplit((0.7,0.3), seed = 42)

In [31]:
from pyspark.ml.classification import LogisticRegression

In [32]:
logit = LogisticRegression(featuresCol="features_vector", labelCol = 'label')

In [33]:
logitmodel = logit.fit(df_train)