In [1]:
#import modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

#create session to access Spark API
spark = SparkSession \
    .builder \
    .appName("Video Data Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#define data schema for the file based on your columns
videoSchema = StructType([
    StructField("video_id", StringType(), True),
    StructField("day", StringType(), True),
    StructField("views", IntegerType(), True),
    StructField("redViews", IntegerType(), True),
    StructField("comments", IntegerType(), True),
    StructField("likes", IntegerType(), True),
    StructField("dislikes", IntegerType(), True),
    StructField("videosAddedToPlaylists", IntegerType(), True),
    StructField("videosRemovedFromPlaylists", IntegerType(), True),
    StructField("shares", IntegerType(), True),
    StructField("estimatedMinutesWatched", IntegerType(), True),
    StructField("estimatedRedMinutesWatched", IntegerType(), True),
    StructField("averageViewDuration", FloatType(), True),
    StructField("averageViewPercentage", FloatType(), True),
    StructField("annotationClickThroughRate", FloatType(), True),
    StructField("annotationCloseRate", FloatType(), True),
    StructField("annotationImpressions", IntegerType(), True),
    StructField("annotationClickableImpressions", IntegerType(), True),
    StructField("annotationClosableImpressions", IntegerType(), True),
    StructField("annotationClicks", IntegerType(), True),
    StructField("annotationCloses", IntegerType(), True),
    StructField("cardClickRate", FloatType(), True),
    StructField("cardTeaserClickRate", FloatType(), True),
    StructField("cardImpressions", IntegerType(), True),
    StructField("cardTeaserImpressions", IntegerType(), True),
    StructField("cardClicks", IntegerType(), True),
    StructField("cardTeaserClicks", IntegerType(), True),
    StructField("subscribersGained", IntegerType(), True),
    StructField("subscribersLost", IntegerType(), True),
])    

# read csv file with the defined schema into a Spark DataFrame
videoDataframe = spark.read.csv(
    "youtube_analytics_data.csv", 
    header=True, schema=videoSchema, sep=",")  # assuming CSV uses comma delimiter

In [2]:
import pandas as pd

In [3]:
df = pd.read_csv('youtube_analytics_data.csv')

In [4]:
df

Unnamed: 0,video_id,day,views,redViews,comments,likes,dislikes,videosAddedToPlaylists,videosRemovedFromPlaylists,shares,...,annotationClicks,annotationCloses,cardClickRate,cardTeaserClickRate,cardImpressions,cardTeaserImpressions,cardClicks,cardTeaserClicks,subscribersGained,subscribersLost
0,VID000001,2023-01-14,6158,119,2780,12309,73,89,46,209,...,828,810,0.10,3.10,6635,4498,1869,1519,857,162
1,VID000002,2020-04-02,69584,906,647,3342,402,66,26,449,...,1797,960,0.54,2.05,323,6506,4828,4356,149,89
2,VID000003,2021-06-06,20303,205,1095,7879,488,10,5,120,...,907,625,8.05,0.95,1527,3474,886,1633,173,51
3,VID000004,2022-04-08,57208,963,4458,12378,34,70,19,234,...,873,276,7.60,7.89,183,757,3314,4127,866,487
4,VID000005,2021-04-11,7318,712,926,8927,448,4,26,58,...,205,473,1.06,0.22,1012,6681,2575,3767,339,472
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,VID000996,2022-05-13,43026,951,2323,7512,315,58,37,357,...,172,799,4.02,3.93,9287,205,4991,4446,813,137
996,VID000997,2021-11-09,31020,482,1474,13294,54,38,39,151,...,1295,735,2.93,6.25,6338,2912,1145,2068,725,327
997,VID000998,2020-03-02,44038,687,1259,11887,309,31,29,168,...,769,129,3.06,1.15,3290,6960,2877,4896,796,353
998,VID000999,2021-11-02,52447,163,1689,60,199,61,19,108,...,539,129,4.69,7.65,9770,9318,310,4344,188,428


In [5]:
videoDataframe = videoDataframe.dropna()

In [6]:
videoDataframe = videoDataframe.fillna({'views': 0, 'likes': 0, 'dislikes': 0})

In [7]:
videoDataframe = videoDataframe.withColumn("views", videoDataframe["views"].cast(IntegerType()))
videoDataframe = videoDataframe.withColumn("likes", videoDataframe["likes"].cast(IntegerType()))

In [8]:
videoDataframe.printSchema()  # Check data types and schema

root
 |-- video_id: string (nullable = true)
 |-- day: string (nullable = true)
 |-- views: integer (nullable = false)
 |-- redViews: integer (nullable = true)
 |-- comments: integer (nullable = true)
 |-- likes: integer (nullable = false)
 |-- dislikes: integer (nullable = false)
 |-- videosAddedToPlaylists: integer (nullable = true)
 |-- videosRemovedFromPlaylists: integer (nullable = true)
 |-- shares: integer (nullable = true)
 |-- estimatedMinutesWatched: integer (nullable = true)
 |-- estimatedRedMinutesWatched: integer (nullable = true)
 |-- averageViewDuration: float (nullable = true)
 |-- averageViewPercentage: float (nullable = true)
 |-- annotationClickThroughRate: float (nullable = true)
 |-- annotationCloseRate: float (nullable = true)
 |-- annotationImpressions: integer (nullable = true)
 |-- annotationClickableImpressions: integer (nullable = true)
 |-- annotationClosableImpressions: integer (nullable = true)
 |-- annotationClicks: integer (nullable = true)
 |-- annotati

In [9]:
df.column

Index(['video_id', 'day', 'views', 'redViews', 'comments', 'likes', 'dislikes',
       'videosAddedToPlaylists', 'videosRemovedFromPlaylists', 'shares',
       'estimatedMinutesWatched', 'estimatedRedMinutesWatched',
       'averageViewDuration', 'averageViewPercentage',
       'annotationClickThroughRate', 'annotationCloseRate',
       'annotationImpressions', 'annotationClickableImpressions',
       'annotationClosableImpressions', 'annotationClicks', 'annotationCloses',
       'cardClickRate', 'cardTeaserClickRate', 'cardImpressions',
       'cardTeaserImpressions', 'cardClicks', 'cardTeaserClicks',
       'subscribersGained', 'subscribersLost'],
      dtype='object')

In [22]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Step 1: Feature engineering
# Define your feature columns (you can select the relevant ones)
feature_columns = [
    "views", "redViews", "comments", "likes", "dislikes", 
    "videosAddedToPlaylists", "videosRemovedFromPlaylists", "shares", 
    "estimatedMinutesWatched", "estimatedRedMinutesWatched", 
    "averageViewDuration", "averageViewPercentage", 
    "annotationClickThroughRate", "annotationCloseRate", 
    "annotationImpressions", "annotationClickableImpressions", 
    "annotationClosableImpressions", "annotationClicks", "annotationCloses",
    "cardClickRate", "cardTeaserClickRate", "cardImpressions", 
    "cardTeaserImpressions", "cardClicks", "cardTeaserClicks", 
    "subscribersGained", "subscribersLost"
]

# Handle missing values by filling them (you can use other imputation strategies too)
videoDataframe = videoDataframe.na.fill(0, subset=feature_columns)

# Step 2: Index labels (convert 'high_views' into a numerical label)
indexer = StringIndexer(inputCol="high_views", outputCol="label")

# Step 3: Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Step 4: Define the Random Forest Classifier
rf_classifier = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Step 5: Create the pipeline with the stages
pipeline = Pipeline(stages=[indexer, assembler, rf_classifier])

# Step 6: Split the data into training and test sets
train_data, test_data = videoDataframe.randomSplit([0.8, 0.2], seed=1234)

# Step 7: Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Step 8: Make predictions on the test data
predictions = model.transform(test_data)

# Step 9: Evaluate the model (Optional, but useful)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy}")


Accuracy: 1.0


In [24]:
# Step 1: Get predictions on the original dataset
predictions = model.transform(videoDataframe)

# Step 2: Select the relevant columns: actual values (label) and predicted values (prediction)
result = predictions.select("views", "prediction", "high_views")

# Step 3: Show the first 5 rows of the actual vs predicted values
result.show(10)


+-----+----------+----------+
|views|prediction|high_views|
+-----+----------+----------+
| 6158|       0.0|         1|
|69584|       0.0|         1|
|20303|       0.0|         1|
|57208|       0.0|         1|
| 7318|       0.0|         1|
| 8944|       0.0|         1|
|69311|       0.0|         1|
|32782|       0.0|         1|
|45202|       0.0|         1|
|22939|       0.0|         1|
+-----+----------+----------+
only showing top 10 rows

