In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, to_timestamp, year, col, udf, sqrt
from pyspark.ml.feature import StringIndexer, VectorAssembler, PCA, Normalizer
from pyspark.ml.linalg import Vectors, VectorUDT
import math
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator



spark = SparkSession.builder.appName("ReadParquet").getOrCreate()


df_video = spark.read.parquet("/content/videos-tratados-parquet")


df_video.show(5)
df_video.printSchema()

+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+
|Unnamed: 0|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+
|         0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|
|         1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|
|         2|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825|    3338|1564007|    1631170|
|         3|Petrol Vs Liquid ...|kXiYSI7H2b0|  2022-08-23|   tech|71566|    1426| 922918|     995910|
|         4|Best Back to Scho...|ErMwWXQxHp0|  2022-08-08|   tech|96513|    5155|1855644|    1957312|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+
only showing top 5 rows
root
 |-- Unnamed: 0: long (nullable = true)
 |-- Title: s

In [3]:
df_video = df_video.withColumn("Month", month(to_timestamp("Published At")))

df_video.show(5)
df_video.printSchema()

+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+
|Unnamed: 0|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Month|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+
|         0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|    8|
|         1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|    8|
|         2|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825|    3338|1564007|    1631170|    8|
|         3|Petrol Vs Liquid ...|kXiYSI7H2b0|  2022-08-23|   tech|71566|    1426| 922918|     995910|    8|
|         4|Best Back to Scho...|ErMwWXQxHp0|  2022-08-08|   tech|96513|    5155|1855644|    1957312|    8|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+
only showing top 5 rows
root

In [4]:
indexer = StringIndexer(inputCol="Keyword", outputCol="Keyword_Index")

df_video = indexer.fit(df_video).transform(df_video)


df_video.show(5)
df_video.printSchema()

+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+
|Unnamed: 0|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Month|Keyword_Index|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+
|         0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|    8|         23.0|
|         1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|    8|         23.0|
|         2|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825|    3338|1564007|    1631170|    8|         23.0|
|         3|Petrol Vs Liquid ...|kXiYSI7H2b0|  2022-08-23|   tech|71566|    1426| 922918|     995910|    8|         23.0|
|         4|Best Back to Scho...|ErMwWXQxHp0|  2022-08-08|   tech|96513|    5155|1855644|    1957312|    8|         23.0|
+----------+------------

In [5]:
df_video = df_video.withColumn("Year", year(to_timestamp("Published At")))

input_cols = ["Likes", "Views", "Year", "Month", "Keyword_Index"]

assembler = VectorAssembler(inputCols=input_cols, outputCol="Features")

df_video = assembler.transform(df_video)

df_video.show(5)
df_video.printSchema()

+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+----+--------------------+
|Unnamed: 0|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Month|Keyword_Index|Year|            Features|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+----+--------------------+
|         0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|    8|         23.0|2022|[3407.0,135612.0,...|
|         1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|    8|         23.0|2022|[76779.0,1758063....|
|         2|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825|    3338|1564007|    1631170|    8|         23.0|2022|[63825.0,1564007....|
|         3|Petrol Vs Liquid ...|kXiYSI7H2b0|  2022-08-23|   tech|71566|    1426| 922918|     995910|    8|     

In [11]:
df_video = df_video.filter(col("Features").isNotNull())

def l2_normalize_vector(vector):
    if vector is None:
        return None
    squared_sum = sum([x*x for x in vector])
    norm = math.sqrt(squared_sum)

    if norm == 0.0:
        return Vectors.dense([0.0] * len(vector))
    else:
        return Vectors.dense([x / norm for x in vector])

l2_normalize_udf = udf(l2_normalize_vector, VectorUDT())

df_video = df_video.withColumn("Features Normal", l2_normalize_udf(col("Features")))

df_video.show(5)
df_video.printSchema()

+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+----+--------------------+--------------------+
|Unnamed: 0|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Month|Keyword_Index|Year|            Features|     Features Normal|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+----+--------------------+--------------------+
|         0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|    8|         23.0|2022|[3407.0,135612.0,...|[0.02511243077059...|
|         1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|    8|         23.0|2022|[76779.0,1758063....|[0.04363087906195...|
|         2|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825|    3338|1564007|    1631170|    8|         23.0|2022|[63825.0,1564007....|[0.04077

In [12]:
pca = PCA(k=1, inputCol="Features Normal", outputCol="Features PCA")

pca_model = pca.fit(df_video)

df_video = pca_model.transform(df_video)

df_video.show(5)
df_video.printSchema()

+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+----+--------------------+--------------------+--------------------+
|Unnamed: 0|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Month|Keyword_Index|Year|            Features|     Features Normal|        Features PCA|
+----------+--------------------+-----------+------------+-------+-----+--------+-------+-----------+-----+-------------+----+--------------------+--------------------+--------------------+
|         0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|    8|         23.0|2022|[3407.0,135612.0,...|[0.02511243077059...| [0.427441419713353]|
|         1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|    8|         23.0|2022|[76779.0,1758063....|[0.04363087906195...|[0.4391255482465112]|
|         2|My New House Gami...|4mgePWWCAmA|  202

In [13]:
train_df, test_df = df_video.randomSplit([0.8, 0.2], seed=42)

print("Training DataFrame count:", train_df.count())
print("Test DataFrame count:", test_df.count())

Training DataFrame count: 1541
Test DataFrame count: 328


In [14]:
lr = LinearRegression(featuresCol='Features Normal', labelCol='Comments')

lr_model = lr.fit(train_df)

predictions = lr_model.transform(test_df)

evaluator = RegressionEvaluator(labelCol='Comments', predictionCol='prediction', metricName='rmse')

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator.setMetricName('r2')
r2 = evaluator.evaluate(predictions)
print(f"R-squared (R2) on test data = {r2}")

predictions.select("Comments", "prediction", "Features Normal").show(5)

Root Mean Squared Error (RMSE) on test data = 19117.245960448807
R-squared (R2) on test data = -0.023688671400983186
+--------+------------------+--------------------+
|Comments|        prediction|     Features Normal|
+--------+------------------+--------------------+
|    3338| 9670.831105204954|[0.04077466900432...|
|    9367|10461.384965563513|[0.02273776566764...|
|    2882|11160.426267629213|[0.00650799846771...|
|     157| 9593.528753213963|[0.04011981375200...|
|   13609|10095.162212723226|[0.03182956269131...|
+--------+------------------+--------------------+
only showing top 5 rows
