

Set up Pyspark

In [None]:
!pip install pyspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 54.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=fe31766a4b670ea2cd0125b79b7015371ab6c93a7822baa2003a3a32f35a80fd
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark

In [None]:
import pandas as pd
import pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

Import additional libraries

In [None]:
!pip install pandas_bokeh

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pandas_bokeh
  Downloading pandas_bokeh-0.5.5-py2.py3-none-any.whl (29 kB)
Installing collected packages: pandas-bokeh
Successfully installed pandas-bokeh-0.5.5


In [None]:
import pyspark.pandas as ps
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.pyplot as plt
from pandas_bokeh import *



In [None]:
df = spark.read.csv('/content/UdemyCourses.csv', header=True, inferSchema=True)

AnalysisException: ignored

Check Data types

In [None]:
#displays the dataschema of dataframe
df.printSchema()
df.dtypes

First five rows before cleaning

In [None]:
df.show(5)

Last five rows before cleaning

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import desc
 
df2 = df.withColumn("index", monotonically_increasing_id())
df2.orderBy(desc("index")).drop("index").show(5)

Data Cleaning

In [None]:
#changed published_timestamp from string to date data type
from pyspark.sql.functions import *
df.withColumn("published_timestamp",to_timestamp("published_timestamp"))
df.withColumn("published_timestamp",df.published_timestamp.cast('date'))

Rename Columns

In [None]:
df = df.withColumnRenamed(existing="num_subscribers", new="number_subscribers")
df = df.withColumnRenamed(existing="num_reviews", new="number_reviews")
df = df.withColumnRenamed(existing="num_lectures", new="number_lectures")

In [None]:
df.show()

In [None]:
#check for duplicates rows using pyspark(not pyspark.pandas)

from pyspark.sql import functions as F

cols = df.columns

counts_df = df.select([
    F.countDistinct(*cols).alias('n_unique'),
    F.count('*').alias('n_rows')
])
n_unique, n_rows = counts_df.collect()[0]
print(n_unique)
print(n_rows)

In [None]:
df = df.dropDuplicates()
print("Distinct count: "+str(df.count()))
df.show(truncate=False)

In [None]:
#check again for good measure

from pyspark.sql import functions as F

cols = df.columns

counts_df = df.select([
    F.countDistinct(*cols).alias('n_unique'),
    F.count('*').alias('n_rows')
])
n_unique, n_rows = counts_df.collect()[0]
print(n_unique)
print(n_rows)

In [None]:
### Get count of null values in pyspark
 
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
#drop nulls
df.na.drop()

First Five Rows after cleaning

In [None]:
df.show(5)

Last five rows after cleaning

In [None]:
df2 = df.withColumn("index", monotonically_increasing_id())
df2.orderBy(desc("index")).drop("index").show(5)

Additional Information about dataset

In [None]:
print("Total number of rows: ", df.count())
print("Total number of columns: ", df.columns)

In [None]:
df.describe().show()

Convert Spark Dataframe to Pandas Dataframe

In [None]:
pdf = ps.DataFrame(df)

In [None]:
print("Percentage of duplicated data/rows within dataset: ", pdf.duplicated().sum() * 100 / len(pdf))

In [None]:
print("Percentage of Missing data within dataset: ", pdf.isnull().sum() * 100 / len(pdf))

In [None]:
pdf.dropna(inplace=True)

12.Find out number of courses per subjects

In [None]:
num_courses_per_subject_df = pdf['subject'].value_counts().reset_index()
num_courses_per_subject_df

In [None]:
num_courses_per_subject = pdf.groupby('subject').count().reset_index()
num_courses_per_subject2 = num_courses_per_subject[['subject', 'course_id']].sort_values(by='course_id', ascending=False).reset_index()
num_courses_per_subject3 = num_courses_per_subject2[~num_courses_per_subject2.subject.str.contains('2014')\
                                                    & ~num_courses_per_subject2.subject.str.contains('2015')\
                                                    & ~num_courses_per_subject2.subject.str.contains('2016')].reset_index()
num_courses_per_subject3 = num_courses_per_subject3.set_index('level_0')
num_courses_per_subject3

In [None]:
num_courses_per_subject3.dtypes

In [None]:
num_courses_per_subject3['course_id'] = num_courses_per_subject3['course_id'].astype(int)
x = ['Web Development', 'Business Finance	', 'Musical Instruments', 'Graphic Design']
y = num_courses_per_subject3['course_id'].to_numpy()
plt.figure(figsize=(20,10))
sns.barplot(x = x, y = y, data = num_courses_per_subject3).set(title="Number of Courses Per Subject")

In [None]:
num_courses_per_subject3['Percent% of total courses'] = num_courses_per_subject3['course_id'] * 100 / len(pdf)
num_courses_per_subject3

13.For which levels, udemy courses providing the courses

In [None]:
courses_by_lvl_dftemp = pdf['level'].value_counts().reset_index()
courses_by_lvl_dftemp

In [None]:
courses_by_lvl_df = pdf.groupby('level').count().reset_index()
courses_by_lvl_df2 = courses_by_lvl_df[['level', 'course_id']].sort_values(by='course_id', ascending=False).reset_index()

courses_by_lvl_df3 = courses_by_lvl_df2[~courses_by_lvl_df2.level.str.contains('34')\
                                                    & ~courses_by_lvl_df2.level.str.contains('9')\
                                                    & ~courses_by_lvl_df2.level.str.contains('20') \
                                                    & ~courses_by_lvl_df2.level.str.contains('56') \
                                                    & ~courses_by_lvl_df2.level.str.contains('63')].reset_index()
courses_by_lvl_df3

In [None]:
courses_by_lvl_df3['course_id'] = courses_by_lvl_df3['course_id'].astype(int)
x = ['All Level', 'Beginner Level', 'Intermediate Level', 'Expert Level']
y = courses_by_lvl_df3['course_id'].to_numpy()
plt.figure(figsize=(20,10))
sns.barplot(x = x, y = y, data = courses_by_lvl_df3).set(title="Number of Courses Per Level")

In [None]:
courses_by_lvl_df3['Percentage'] = courses_by_lvl_df3['course_id'] * 100 / len(pdf)
courses_by_lvl_df3

14.Display the count of paid and free courses

In [None]:
courses_by_paidfree_df = pdf.groupby('is_paid').count().reset_index()
courses_by_paidfree_df2 = courses_by_paidfree_df[['is_paid', 'course_id']].sort_values(by='course_id', ascending=False).reset_index()
courses_by_paidfree_df3 = courses_by_paidfree_df2[~courses_by_paidfree_df2.is_paid.str.contains('http')].reset_index()
courses_by_paidfree_df3

In [None]:
courses_by_paidfree_df3['course_id'] = courses_by_paidfree_df3['course_id'].astype(int)
x = ['TRUE', 'False']
y = courses_by_paidfree_df3['course_id'].to_numpy()
plt.figure(figsize=(20,10))
sns.barplot(x = x, y = y, data = courses_by_lvl_df3).set(title="Courses that Are Paid(True = Yes, False = No)")

In [None]:
courses_by_paidfree_df3['Percentage'] = courses_by_paidfree_df3['course_id'] * 100 / len(pdf)
courses_by_paidfree_df3

15.Which course has more lectures (free or paid)?

In [None]:
pdf['content_duration'] = pdf['content_duration'].map(lambda x: x.lstrip('').rstrip(' hours'))
pdf['content_duration'] = pdf['content_duration'].astype('float')

In [None]:
pdf.head()

In [None]:
#Total number of lecture hours for paid classes
num_lectures_is_paid = pdf[pdf.is_paid == True]['content_duration'].sum()
print("Total number of lecture hours for paid classes: ", num_lectures_is_paid)

In [None]:
#Total number of lecture hours for paid classes
num_lectures_is_not_paid = pdf[pdf.is_paid == False]['content_duration'].sum()
print("Total number of lecture hours for unpaid classes: ", num_lectures_is_not_paid)

16.Which courses have a higher number of subscribers free or paid?

In [None]:
pdf['number_subscribers'] = pdf['number_subscribers'].astype('int')

In [None]:
#Total number of subscribers for paid classes
num_subsriberes_is_paid = pdf[pdf.is_paid == True]['number_subscribers'].sum()
print("Total number of subscribers for paid classes: ", num_subsriberes_is_paid)

In [None]:
#Total number of subscribers for unpaid classes
num_subsriberes_is_not_paid = pdf[pdf.is_paid == False]['number_subscribers'].sum()
print("Total number of subscribers for unpaid classes: ", num_subsriberes_is_not_paid)

17.Which level has the highest number of subscribers?

In [None]:
num_subscribers_per_lvl = pdf.groupby('level')['number_subscribers'].sum().reset_index()
num_subscribers_per_lvl2 = num_subscribers_per_lvl[~num_subscribers_per_lvl.level.str.contains('34')\
                                                    & ~num_subscribers_per_lvl.level.str.contains('9')\
                                                    & ~num_subscribers_per_lvl.level.str.contains('20') \
                                                    & ~num_subscribers_per_lvl.level.str.contains('56') \
                                                    & ~num_subscribers_per_lvl.level.str.contains('63') \
                                                    & ~num_subscribers_per_lvl.level.str.contains('115')].reset_index()
num_subscribers_per_lvl2                            

In [None]:
num_subscribers_per_lvl2['Percentage'] = num_subscribers_per_lvl2['number_subscribers'] * 100 / pdf['number_subscribers'].sum()
num_subscribers_per_lvl2

In [None]:
num_subscribers_per_lvl2['number_subscribers'] = num_subscribers_per_lvl2['number_subscribers'].astype(int)
x = ['Expert Level', 'Intermediate Level', 'All Levels', 'Beginner Level']
y = num_subscribers_per_lvl2['number_subscribers'].to_numpy()
plt.figure(figsize=(20,10))
sns.barplot(x = x, y = y, data = num_subscribers_per_lvl2).set(title="Number of Subscribers Per Level")

18.Find most popular course title
19.Display 10 most popular courses as per number of subscribers 

In [None]:
#Most popular
mostpopularcourse = pdf[['course_title', 'number_subscribers']].sort_values(by=["number_subscribers"], ascending = False).reset_index()
mostpopularcourse.head(20)

In [None]:
#Least popular courses
leastpopularcourse = pdf[['course_title', 'number_subscribers']].sort_values(by=["number_subscribers"], ascending = True).reset_index()
leastpopularcourse.head(100)

20.Find the course which is having the highest number of reviews.

In [None]:
#Most reviewed courses
highestnumreviews = pdf[['course_title', 'number_reviews']].sort_values(by=["number_reviews"], ascending = False).reset_index()
highestnumreviews.head(100)

In [None]:
#least reviewed courses
lowestnumreviews = pdf[['course_title', 'number_reviews']].sort_values(by=["number_reviews"], ascending = True).reset_index()
lowestnumreviews.head(100)

21.Does price affect the number of reviews?

In [None]:
pdf['price'] = pdf['price'].astype(int)
pdf['number_reviews'] = pdf['number_reviews'].astype(int)
x=pdf['price'].to_numpy()
y=pdf['number_reviews'].to_numpy()
sns.scatterplot(data=pdf, x=x, y=y)

In [None]:
pdf['price'] = pdf['price'].astype(int)
pdf['number_reviews'] = pdf['number_reviews'].astype(int)
x=pdf['price'].to_numpy()
y=pdf['number_reviews'].to_numpy()
sns.lineplot(data=pdf, x=x, y=y)

22.Find total number of courses related to python

In [None]:
coursesrealtedtopython = pdf[pdf['course_title' ].str.contains('python') | pdf['course_title' ].str.contains('Python')]['course_title'].count()
print('There are a total of',  coursesrealtedtopython, 'courses related to python.')

23.Display 10 most popular python courses as per number of subscribers

In [None]:
#Most popular python related courses
mostpopularcourse = pdf[pdf['course_title' ].str.contains('python') | pdf['course_title' ].str.contains('Python')].sort_values(by=["number_subscribers"], ascending = False).reset_index()
mostpopularcourse.head(20)

In [None]:
#Least popular python related courses
mostpopularcourse = pdf[pdf['course_title' ].str.contains('python') | pdf['course_title' ].str.contains('Python')].sort_values(by=["number_subscribers"], ascending = True).reset_index()
mostpopularcourse.head(20)

24.In which year the highest number of courses were posted?

In [None]:
pdf.dtypes

In [None]:
from pyspark.pandas import DatetimeIndex
pdf['year'] = DatetimeIndex(pdf['published_timestamp']).year
courses_by_year = pdf.groupby(pdf['year']).count().reset_index()
courses_by_year2 = courses_by_year[['year', 'course_id']].reset_index()
courses_by_year2.sort_values(by=["course_id"], ascending = False).reset_index()

25.Display category-wise count of posted subjects [year wise]

In [None]:
#They mean number of courses by subject?

In [None]:
num_courses_per_subject = pdf.groupby(['subject', 'year'])['course_id'].count().reset_index()
num_courses_per_subject2 = num_courses_per_subject.sort_values(by=["year"], ascending = True).reset_index()
num_courses_per_subject2