<a href="https://colab.research.google.com/github/CostrunLarisa/Big-Data/blob/main/YoutubeComments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Citirea datelor

Link catre dataset: https://www.kaggle.com/datasets/nipunarora8/most-liked-comments-on-youtube

In [4]:
pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ytcomments').getOrCreate()
data = spark.read.csv('sample_data/youtube_dataset.csv',inferSchema=True,
                     header=True)
data.printSchema()

root
 |-- Video Name: string (nullable = true)
 |-- Channel Name: string (nullable = true)
 |-- Comment Id: string (nullable = true)
 |-- User Name: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Likes: string (nullable = true)



## Data Preprocessing

Vom elimina coloana "User Name", deoarece nu este un element relevant in analiza noastra. La fel si pentru "Comment Id"

In [7]:
# Stergem liniile care au coloana de like-uri sau comentariu null

data = data.na.drop(subset=["Likes", "Comment"])
data = data.drop("User Name")
data = data.drop("Comment Id")
data.printSchema()
data.columns

root
 |-- Video Name: string (nullable = true)
 |-- Channel Name: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Likes: string (nullable = true)



['Video Name', 'Channel Name', 'Comment', 'Date', 'Likes']

In [8]:
from pyspark.sql.functions import length

#Adaugam o coloana noua, Comment_Length, pentru a evita procesarea fiecarei linii

data = data.withColumn("Comment_Length", length(data["Comment"]))
data.columns

['Video Name', 'Channel Name', 'Comment', 'Date', 'Likes', 'Comment_Length']

Vom transforma coloana Date in tipul de date Date in format 'yyyy-MM-dd' pentru a calcula care a fost nr. de zile care a trecut de la data publicarii comentariului pana in prezent.

In [9]:
from pyspark.sql.functions import current_date, datediff
from pyspark.sql.functions import substring
from pyspark.sql.functions import to_date

data = data.withColumn('Date', substring(data['Date'], 1, 10))
data = data.withColumn('Date', to_date(data['Date'], 'yyyy-MM-dd'))
updated_data = data.withColumn('Days_Passed', datediff(current_date(), data['Date']))
updated_data = updated_data.drop('Date')

In [10]:
data = updated_data
data.show()

+--------------------+-------------+--------------------+------+--------------+-----------+
|          Video Name| Channel Name|             Comment| Likes|Comment_Length|Days_Passed|
+--------------------+-------------+--------------------+------+--------------+-----------+
|Luis Fonsi - Desp...|LuisFonsiVEVO|The people who li...| 98280|            63|       1043|
|Luis Fonsi - Desp...|LuisFonsiVEVO|Let's be honest t...|    13|            67|       1013|
|Luis Fonsi - Desp...|LuisFonsiVEVO|3.2 Million comme...|370547|            51|       1191|
|Luis Fonsi - Desp...|LuisFonsiVEVO|claim your “here ...|   763|            77|       1014|
|Luis Fonsi - Desp...|LuisFonsiVEVO|The ones who are ...|    94|            56|       1014|
|Luis Fonsi - Desp...|LuisFonsiVEVO|  Kimler burda😂🥰🌹|    45|            15|       1014|
|Luis Fonsi - Desp...|LuisFonsiVEVO|The first viewer ...| 36446|            56|       1032|
|Luis Fonsi - Desp...|LuisFonsiVEVO|Just imagine how ...|   142|            51|    

## Adaugarea unui UDF care calculeaza nr. de emoji-uri dintr-un comentariu

In [11]:
pip install emoji

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting emoji
  Downloading emoji-2.5.1.tar.gz (356 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m356.3/356.3 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: emoji
  Building wheel for emoji (setup.py) ... [?25l[?25hdone
  Created wheel for emoji: filename=emoji-2.5.1-py2.py3-none-any.whl size=351210 sha256=a47e933dece01e1ffc6b0bc821ddce12dbc530653362631b5d3a73f15fd16f59
  Stored in directory: /root/.cache/pip/wheels/51/92/44/e2ef13f803aa08711819357e6de0c5fe67b874671141413565
Successfully built emoji
Installing collected packages: emoji
Successfully installed emoji-2.5.1


In [12]:
import emoji
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def count_emojis(text):
    return emoji.emoji_count(text);


# Register the UDF
count_emojis_udf = udf(count_emojis, IntegerType())
spark.udf.register("count_emojis", count_emojis_udf)


<function __main__.count_emojis(text)>

### Adaugam o coloana noua 'Emojis_number'

In [13]:
from pyspark.sql.functions import col

data = data.withColumn('Emojis_number', count_emojis_udf(col('Comment')))
data.show()

+--------------------+-------------+--------------------+------+--------------+-----------+-------------+
|          Video Name| Channel Name|             Comment| Likes|Comment_Length|Days_Passed|Emojis_number|
+--------------------+-------------+--------------------+------+--------------+-----------+-------------+
|Luis Fonsi - Desp...|LuisFonsiVEVO|The people who li...| 98280|            63|       1043|            0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|Let's be honest t...|    13|            67|       1013|            0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|3.2 Million comme...|370547|            51|       1191|            0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|claim your “here ...|   763|            77|       1014|            1|
|Luis Fonsi - Desp...|LuisFonsiVEVO|The ones who are ...|    94|            56|       1014|            0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|  Kimler burda😂🥰🌹|    45|            15|       1014|            3|
|Luis Fonsi - Desp...|LuisFonsiVEVO|The first vie

### Vrem sa analizam cate linii din setul de date au emoji-uri pentru a stabili relevanta acestei coloane (Emojis_number)

In [14]:
from pyspark.sql.functions import col

filtered_data = data.filter(col('Emojis_number') != 0)
print('Total rows: ' + str(data.count()))
print('Rows with emojis: ' + str(filtered_data.count()))

Total rows: 14829
Rows with emojis: 2865


## Adaugam o noua coloana *label*.

Vom adauga o coloana de label pentru eticheta, care va fi 1 sau 0 pentru cazurile in care un comentariu este considerat cel mai apreciat sau nu.

Un comentariu este considerat cel mai apreciat daca nr. de like-uri este > 1000.

In [15]:
from pyspark.sql.functions import lit

data = data.withColumn('Label', lit(0))

### Pentru liniile unde Likes > 1000 modificam valoarea Label in 1.

In [16]:
from pyspark.sql.functions import when

# Create a new column 'NewLabel' with the modified values
updated_data = data.withColumn('Temp_label', when(data.Likes > 1000, 1).otherwise(data.Label))

# Drop the original 'Label' column and rename 'NewLabel' to 'Label'
updated_data = updated_data.drop('Label').withColumnRenamed('Temp_label', 'Label')


In [17]:
data = updated_data
data = data.drop("Likes")
data.show()

+--------------------+-------------+--------------------+--------------+-----------+-------------+-----+
|          Video Name| Channel Name|             Comment|Comment_Length|Days_Passed|Emojis_number|Label|
+--------------------+-------------+--------------------+--------------+-----------+-------------+-----+
|Luis Fonsi - Desp...|LuisFonsiVEVO|The people who li...|            63|       1043|            0|    1|
|Luis Fonsi - Desp...|LuisFonsiVEVO|Let's be honest t...|            67|       1013|            0|    0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|3.2 Million comme...|            51|       1191|            0|    1|
|Luis Fonsi - Desp...|LuisFonsiVEVO|claim your “here ...|            77|       1014|            1|    0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|The ones who are ...|            56|       1014|            0|    0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|  Kimler burda😂🥰🌹|            15|       1014|            3|    0|
|Luis Fonsi - Desp...|LuisFonsiVEVO|The first viewer ...| 

## Formatarea pentru MLlib

Vom lua in considerare datele numerice: ['Days_Passed', 'Comment_Length', 'Emojis_number'].

In [459]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import ArrayType, DoubleType

assembler = VectorAssembler(inputCols=['Days_Passed', 'Comment_Length', 'Emojis_number'],
                            outputCol='features', handleInvalid="skip")

output = assembler.transform(data)
final_data = output.select('features','Label')

# Show the DataFrame
final_data.show()

## Impartirea setului de date

In [414]:
train_yt, test_yt = final_data.randomSplit([0.7,0.3])

## Antrenarea modelului

In [460]:
from pyspark.ml.classification import LogisticRegression

lr_yt = LogisticRegression(labelCol='Label')

In [461]:
fitted_yt_model = lr_yt.fit(train_yt)

## Rezultate

In [250]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [242]:
training_sum = fitted_yt_model.summary

In [243]:
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              Label|         prediction|
+-------+-------------------+-------------------+
|  count|              10275|              10275|
|   mean|0.16875912408759125|0.06832116788321167|
| stddev|0.37455725324436523|0.25230850457721793|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



## Evaluarea rezultatelor

In [277]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pred_and_labels = fitted_yt_model.evaluate(test_yt)
evaluator = BinaryClassificationEvaluator(labelCol='Label')
accuracy = evaluator.evaluate(pred_and_labels.predictions)
print('Accuracy:', accuracy)

Accuracy: 0.5637523187400532


## Antrenarea unui model nou, de tip SVM pentru o acuratete mai buna

Am ales antrenarea unui model SVM, deoarece in cazul in care eliminam coloana Days_Passed din assembler obtinem o acuratete de numai 56%.

Vom rula pasii de citirea datelor si procesarea datelor. La fel si adaugarea coloanei Label.
Vom lua in considerare si Video Name & Channel Name.

Datele vor fi transformate in date numerice.

In [511]:
data.na.drop()
data = data.withColumnRenamed("Video Name", "video_name")
data = data.withColumnRenamed("Channel Name", "channel_name")

train_yt, test_yt = data.randomSplit([0.7,0.3])

In [512]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Define the string columns to be encoded
string_cols = ['video_name', 'channel_name']

# Create a list to store the stages of the pipeline
stages = []

# Perform string indexing for each string column
for col in string_cols:
    # String Indexing
    indexer = StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
    stages.append(indexer)

    # One-Hot Encoding
    encoder = OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded", handleInvalid="keep")
    stages.append(encoder)

# Assemble the encoded features into a single vector
assembler_inputs = [col + "_encoded" for col in string_cols]
assembler_inputs.append("Emojis_number")
assembler_inputs.append("Comment_Length")
assembler_inputs.append("Days_Passed")
assembler = VectorAssembler(inputCols=assembler_inputs,
                            outputCol="features",
                            handleInvalid="skip")
stages.append(assembler)

# Create a pipeline with all the stages
pipeline = Pipeline(stages=stages)

# Fit the pipeline on the training data
pipeline_model = pipeline.fit(train_yt)



In [None]:
# Transform the training and test data using the pipeline
train_data = pipeline_model.transform(train_yt)
test_data = pipeline_model.transform(test_yt)

# Train the SVM model
svm = LinearSVC(featuresCol="features", labelCol="Label")

# Train the SVM model
svm_model = svm.fit(train_data)

# Make predictions on test data
predictions = svm_model.transform(test_data)

## Evaluarea datelor obtinute cu SVM

Eroare: Caused by: org.apache.spark.SparkException: Unseen label: I am from Bolivia. To handle unseen labels, set Param handleInvalid to keep.

Vedem ca SVM-ul obtine o acuratete de 83.2% chiar daca nu integram parametrul de Days_Passed in antrenarea modelului.

Integrand si campul Days_Passed, SVM-ul obtine o acuratete de 86.6%.

In [364]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an evaluator for accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Label",
                                              predictionCol="prediction",
                                              metricName="accuracy")

# Calculate the accuracy
accuracy = evaluator.evaluate(predictions)

# Print the accuracy
print("Accuracy:", accuracy)


Accuracy: 0.8663793103448276


## Predictie pe date noi, neetichetate

Evaluarea datelor din fisierul YouTube_data.csv.

Fiind seturi de date relativ diferite vom incerca sa aducem csv-ul in aceeasi forma.

https://www.kaggle.com/datasets/advaypatil/youtube-statistics
https://www.kaggle.com/datasets/seungguini/youtube-comments-for-covid19-related-videos

In [513]:
data = spark.read.csv('sample_data/covid_2021_1.csv',inferSchema=True,
                              header=True)

In [514]:
data.printSchema()

root
 |-- query: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title: string (nullable = true)
 |-- upload_date: timestamp (nullable = true)
 |-- channel: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- comment_author: string (nullable = true)
 |-- comment_date: string (nullable = true)
 |-- comment_likes: string (nullable = true)
 |-- DATE: string (nullable = true)



Procesam datele.

In [523]:
data = data = data.drop("quey")
data = data.drop("url")
data = data.drop("views")
data = data.drop("dislikes")
data = data.drop("comment_count")
data = data.drop("comment_author")
data = data.drop("upload_date")
data = data.drop("likes")
data = data.drop("DATE")
data = data.drop("Label")

data = data.withColumnRenamed("comment_text","Comment")
data = data.withColumnRenamed("comment_likes","Likes")
data = data.withColumnRenamed("comment_date","Date")
data = data.withColumnRenamed("title","video_name")
data = data.withColumnRenamed("channel","channel_name")

data.show()

+--------------------+--------------------+------------+--------------------+
|               query|          video_name|channel_name|             Comment|
+--------------------+--------------------+------------+--------------------+
|coronavirus|covid...|Which Coronavirus...| India Today|     OMG 😍💋 💝💖❤️|
|coronavirus|covid...|Which Coronavirus...| India Today|I love my Indian ...|
|coronavirus|covid...|Which Coronavirus...| India Today|What about indian...|
|coronavirus|covid...|Which Coronavirus...| India Today|2:32 I like that?...|
|coronavirus|covid...|Growing Number Of...|    NBC News|And all the rich ...|
|coronavirus|covid...|Growing Number Of...|    NBC News|The U.S. Governme...|
|coronavirus|covid...|Growing Number Of...|    NBC News|How can we call o...|
|coronavirus|covid...|Growing Number Of...|    NBC News|Wealthiest countr...|
|coronavirus|covid...|Growing Number Of...|    NBC News|The real irony, t...|
|coronavirus|covid...|Growing Number Of...|    NBC News|The apocalyp

Rulam din nou adaugarea de emoji-uri, comment_size, Days_Passed, etc.

In [483]:
test_new_data = assembler.transform(data)

In [484]:
final_results = fitted_yt_model.transform(test_new_customers)

In [485]:
final_results.select('Comment','prediction').show()

+-------+----------+
|Comment|prediction|
+-------+----------+
+-------+----------+



## Observatie

Mai departe putem observa ca datele au fost antrenate numai pe un set specific de video-uri si anume: muzica/videoclipuri aflate in trending. De aceea am putea lua in considerare analiza comentariilor in-depth si observarea cuvintelor cel mai frecvent utilizate.

## Construirea unui Data Pipeline pentru stabilirea celor mai utilizate cuvinte din comentariile apreciate

Vom normaliza comentariile, deoarece in aceasta etapa ne intereseaza numai textul utilizat. Textul va fi scris cu litere mici si vom elimina toate emoji-urile sau caractere numerice.

In [18]:
from pyspark.sql.functions import col, lower, regexp_replace

data = data.withColumn("Comment", lower(col("Comment")))
data = data.withColumn("Comment", regexp_replace(col("Comment"), "[^a-zA-Z0-9\\s]", ""))

Extragem setul de date concatenand toate comentariile intr-un singur text.

In [19]:
from pyspark.sql.functions import concat_ws, collect_list

comments_concatenated = data.groupBy().agg(concat_ws(" ", collect_list("Comment")).alias("Concatenated_Comments")).first()["Concatenated_Comments"]
comments_concatenated



Cream un nou date frame cu o singura linie astfel incat sa putem rula pipeline-ul definit.

In [20]:
from pyspark.sql import Row
from pyspark.sql.functions import lit

comments_df = spark.createDataFrame([Row(Comments=comments_concatenated)])
comments_df.show()


+--------------------+
|            Comments|
+--------------------+
|the people who li...|
+--------------------+



### Tokenizarea datelor

In [21]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer, VectorAssembler

spark2 = SparkSession.builder.appName('DataPipeline').getOrCreate()

# Definim stage-urile pipeline-ului

# Tokenization stage
tokenizer = Tokenizer(inputCol='Comments', outputCol='tokens')

# Regular expression tokenizer stage
regexTokenizer = RegexTokenizer(inputCol='Comments', outputCol='tokens', pattern='\\W')

# Stop words removal stage
stopWordsRemover = StopWordsRemover(inputCol='tokens', outputCol='filtered_tokens')

# Count vectorization stage
countVectorizer = CountVectorizer(inputCol='filtered_tokens', outputCol='features')


In [22]:
pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, countVectorizer])

In [23]:
pipelineModel = pipeline.fit(comments_df)

In [24]:
transformedComments = pipelineModel.transform(comments_df)

### Vrem sa vedem daca au fost filtrate tokenurile. Verificam acest lucru prin size-ul lor

In [26]:
vector_size_udf = udf(lambda x: len(x), IntegerType())

transformedComments.withColumn('vector_size', vector_size_udf('filtered_tokens')).show()
transformedComments.withColumn('vector_size', vector_size_udf('tokens')).show()

+--------------------+--------------------+--------------------+--------------------+-----------+
|            Comments|              tokens|     filtered_tokens|            features|vector_size|
+--------------------+--------------------+--------------------+--------------------+-----------+
|the people who li...|[the, people, who...|[people, liked, c...|(14996,[0,1,2,3,4...|     144510|
+--------------------+--------------------+--------------------+--------------------+-----------+

+--------------------+--------------------+--------------------+--------------------+-----------+
|            Comments|              tokens|     filtered_tokens|            features|vector_size|
+--------------------+--------------------+--------------------+--------------------+-----------+
|the people who li...|[the, people, who...|[people, liked, c...|(14996,[0,1,2,3,4...|     213720|
+--------------------+--------------------+--------------------+--------------------+-----------+



In [58]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Declaram un set dummy de comentarii
comments = transformedComments.select('Comments').rdd.flatMap(lambda x: x[0]).collect()
labels = tf.keras.utils.to_categorical([1, 0])


# Selectam token-urile cel mai des intalnite
word_set = transformedComments.select('filtered_tokens').rdd.flatMap(lambda x: x[0]).collect()
word_set = list(filter(lambda word: word != '', word_set))

print(word_set)
#Tokenizam comentariile
tokenizer = Tokenizer()
tokenizer.fit_on_texts(comments)
comment_sequences = tokenizer.texts_to_sequences(comments)

# Pad sequences to a fixed length
max_length = 10  # Maximum sequence length
padded_sequences = pad_sequences(comment_sequences, maxlen=max_length)
print(padded_sequences)


[[0 0 0 ... 0 0 6]
 [0 0 0 ... 0 0 9]
 [0 0 0 ... 0 0 1]
 ...
 [0 0 0 ... 0 0 7]
 [0 0 0 ... 0 0 6]
 [0 0 0 ... 0 0 2]]
[[0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]]


In [56]:
# Definirea modelului
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(len(word_set) + 1, 16, input_length=max_length),
    tf.keras.layers.Conv1D(128, 5, activation='relu'),
    tf.keras.layers.Dense(16, activation='relu'),
    tf.keras.layers.Dense(2, activation='softmax')
])


In [None]:
# Antrenarea modelului
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.fit(padded_sequences, labels, epochs=10)

In [None]:
new_comments = ['Really nice 👌 😍💋 💝💖❤️', 'Had the first shot am awaiting my second.  No side effects from the first.  BTW notice you have some Buffalo Trace bourbon my personal favorite!']
new_sequences = tokenizer.texts_to_sequences(new_comments)
new_padded_sequences = pad_sequences(new_sequences, maxlen=max_length)
predictions = model.predict(new_padded_sequences)

In [None]:
# Predictii

for comment, prediction in zip(new_comments, predictions):
    print(f"Comment: {comment}")
    print(f"Prediction: {'Popular' if prediction.argmax() == 1 else 'Not Popular'}")
    print()