In [58]:
#! pip install nltk
import nltk
nltk.download('stopwords')


[nltk_data] Downloading package stopwords to /usr/share/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [59]:
import nltk
nltk.download('punkt')


[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [60]:
pip install pyspark nltk

[0mNote: you may need to restart the kernel to use updated packages.


In [61]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from nltk.util import ngrams
from collections import Counter
import re

def preprocess_message_nltk(message):
    # Convert to lowercase
    message = message.lower()

    # Remove punctuation
    punctuation = r'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
    message = re.sub("[" + re.escape(punctuation) + "]", "", message)

    # Tokenization
    tokens = word_tokenize(message)

    # Remove stopwords
    stop_words = set(stopwords.words("english"))
    tokens = [token for token in tokens if token not in stop_words]

    # Stemming
    stemmer = PorterStemmer()
    tokens = [stemmer.stem(token) for token in tokens]

    # Return preprocessed message as string
    return " ".join(tokens)


def create_ngrams(text, n):
    tokens = word_tokenize(text)
    ngrams_list = list(ngrams(tokens, n))
    return ngrams_list


def analyze_ngrams(ngrams_list, column=None):
    # Count the n-grams
    ngram_counts = Counter(ngrams_list)

    # Create a DataFrame with the statistics
    df = pd.DataFrame(ngram_counts.items(), columns=["N-gram", "Count"])

    if column:
        # Group by column
        grouped_stats = df.groupby(column)["Count"].describe()
        return df, grouped_stats
    else:
        # Descriptive statistics
        descriptive_stats = df["Count"].describe()
        return df, descriptive_stats


def create_graph(ngrams_list):
    # Create a directed graph
    graph = nx.DiGraph()

    # Add nodes and edges to the graph
    for ngram in ngrams_list:
        for i in range(len(ngram) - 1):
            node1 = " ".join(ngram[:i + 1])
            node2 = " ".join(ngram[:i + 2])
            graph.add_edge(node1, node2)

    # Plot the graph
    plt.figure(figsize=(10, 6))
    pos = nx.spring_layout(graph)
    nx.draw_networkx(graph, pos, with_labels=True, node_size=500, font_size=10)
    plt.show()


def main(spark, df, column_name, n, group_column=None):
    preprocess_udf = udf(preprocess_message_nltk, StringType())
    df = df.withColumn(column_name, preprocess_udf(df[column_name]))

    ngrams_udf = udf(lambda text: create_ngrams(text, n))
    ngrams_df = df.withColumn("ngrams", ngrams_udf(df[column_name]))

    ngrams_list = ngrams_df.select("ngrams").rdd.flatMap(lambda x: x[0]).collect()

    if group_column:
        df_stats, grouped_stats = analyze_ngrams(ngrams_list, column=group_column)
        return df, ngrams_df, df_stats, grouped_stats
    else:
        df_stats, descriptive_stats = analyze_ngrams(ngrams_list)
        return df, ngrams_df, df_stats, descriptive_stats




In [62]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Create a SparkSession
spark = SparkSession.builder.appName("BigramAnalysis").getOrCreate()

import pandas as pd

data = [
    ("Customer1", "Hello, I have a problem with my order.", "Atendimento1"),
    ("Agent1", "How may I assist you?", "Atendimento1"),
    ("Customer1", "I received the wrong item.", "Atendimento1"),
    ("Agent1", "I apologize for the inconvenience. Could you please provide your order number?", "Atendimento1"),
    ("Customer1", "Sure, my order number is ABC123.", "Atendimento1"),
    ("Agent1", "Thank you for providing the order number. Let me check the details.", "Atendimento1"),
    ("Agent1", "I see that there was a mistake in shipping. We will arrange a replacement for you.", "Atendimento1"),
    ("Customer1", "That sounds good. When can I expect the replacement?", "Atendimento1"),
    ("Agent1", "The replacement will be shipped within 24 hours. You will receive a confirmation email.", "Atendimento1"),
    ("Customer1", "Great, thank you for your help!", "Atendimento1"),
    ("Customer2", "Hello, I have a question.", "Atendimento2"),
    ("Agent2", "Sure, what is your question?", "Atendimento2"),
    ("Customer2", "I want to know the status of my order.", "Atendimento2"),
    ("Agent2", "Could you please provide your order number?", "Atendimento2"),
    ("Customer2", "My order number is XYZ456.", "Atendimento2"),
    ("Agent2", "Let me check the status for you.", "Atendimento2"),
    ("Agent2", "Your order is out for delivery and will be delivered today.", "Atendimento2"),
    ("Customer2", "That's great!", "Atendimento2"),
    ("Agent2", "Is there anything else I can assist you with?", "Atendimento2"),
    ("Customer2", "No, that's all. Thank you!", "Atendimento2"),
    ("Agent2", "No, that's all. Thank you!", "Atendimento2")
]

duplicated_data = []
for line in data:
    duplicated_data.append(line)
    duplicated_data.append(line)

print(duplicated_data)

df = spark.createDataFrame(data, ["User", "Message", "AtendimentoID"])

# Define a preprocessing function to convert text to lowercase and remove punctuation
def preprocess_text(text):
    import re
    text = text.lower()
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    return text

# Register the preprocessing function as a UDF
preprocess_udf = udf(preprocess_text, StringType())

# Apply the preprocessing function to the Message column
df = df.withColumn("PreprocessedMessage", preprocess_udf(df["Message"]))

# Define a tokenizer function to tokenize the preprocessed message
def tokenize_text(text):
    return text.split()

# Register the tokenizer function as a UDF
tokenize_udf = udf(tokenize_text, ArrayType(StringType()))

# Apply the tokenizer function to the PreprocessedMessage column
df = df.withColumn("Tokens", tokenize_udf(df["PreprocessedMessage"]))


[('Customer1', 'Hello, I have a problem with my order.', 'Atendimento1'), ('Customer1', 'Hello, I have a problem with my order.', 'Atendimento1'), ('Agent1', 'How may I assist you?', 'Atendimento1'), ('Agent1', 'How may I assist you?', 'Atendimento1'), ('Customer1', 'I received the wrong item.', 'Atendimento1'), ('Customer1', 'I received the wrong item.', 'Atendimento1'), ('Agent1', 'I apologize for the inconvenience. Could you please provide your order number?', 'Atendimento1'), ('Agent1', 'I apologize for the inconvenience. Could you please provide your order number?', 'Atendimento1'), ('Customer1', 'Sure, my order number is ABC123.', 'Atendimento1'), ('Customer1', 'Sure, my order number is ABC123.', 'Atendimento1'), ('Agent1', 'Thank you for providing the order number. Let me check the details.', 'Atendimento1'), ('Agent1', 'Thank you for providing the order number. Let me check the details.', 'Atendimento1'), ('Agent1', 'I see that there was a mistake in shipping. We will arrange a

In [63]:
from pyspark.ml.feature import NGram

# Define an NGram transformer to create bigrams
ngram = NGram(n=2, inputCol="Tokens", outputCol="Bigrams")

# Apply the NGram transformer to create the Bigrams column
bigram_df = ngram.transform(df)

# Show the DataFrame with the Bigrams column
bigram_df.select("User", "Message", "AtendimentoID", "Bigrams").show(truncate=False)

+---------+---------------------------------------------------------------------------------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|User     |Message                                                                                |AtendimentoID|Bigrams                                                                                                                                                                        |
+---------+---------------------------------------------------------------------------------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Customer1|Hello, I have a problem with my order.                                                 |Atendimento1 |[hello i, i have,

In [64]:
from pyspark.ml.feature import NGram

# Define an NGram transformer to create trigrams
ngram = NGram(n=3, inputCol="Tokens", outputCol="Trigrams")

# Apply the NGram transformer to create the Trigrams column
trigram_df = ngram.transform(df)

# Show the DataFrame with the Trigrams column
trigram_df.select("User", "Message", "AtendimentoID", "Trigrams").show(truncate=False)


+---------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|User     |Message                                                                                |AtendimentoID|Trigrams                                                                                                                                                                                                                                     |
+---------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [65]:
from pyspark.ml.feature import NGram

# Define an NGram transformer to create tetragrams
ngram = NGram(n=4, inputCol="Tokens", outputCol="Tetragrams")

# Apply the NGram transformer to create the Tetragrams column
tetragram_df = ngram.transform(df)

# Show the DataFrame with the Tetragrams column
tetragram_df.select("User", "Message", "AtendimentoID", "Tetragrams").show(truncate=False)


+---------+---------------------------------------------------------------------------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|User     |Message                                                                                |AtendimentoID|Tetragrams                                                                                                                                                                                                                                                                                    |
+---------+---------------------------------------------------------------------------------------+-------------+-------------------------------------------------------------------------------------

In [67]:
from collections import Counter
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def print_repeated_ngrams(data, n):
    ngrams = []
    repeated_ngrams = []

    for item in data:
        message = item[1]
        words = message.split()
        current_ngrams = [" ".join(words[i:i+n]) for i in range(len(words)-n+1)]
        ngrams.extend(current_ngrams)

    counter = Counter(ngrams)

    for ngram, count in counter.items():
        if count > 1:
            repeated_ngrams.append((ngram, count))

    print(f"{n}-grams repetidos:")
    print("{:<20} {:<10}".format("N-grama", "Quantidade"))
    for ngram, count in repeated_ngrams:
        print("{:<20} {:<10}".format(ngram, count))

    # Criação do DataFrame em Spark
    spark = SparkSession.builder.getOrCreate()
    schema = StructType([
        StructField("N-grama", StringType(), nullable=False),
        StructField("Quantidade", IntegerType(), nullable=False)
    ])
    repeated_ngrams = [(ngram, count) for ngram, count in repeated_ngrams]
    df = spark.createDataFrame(repeated_ngrams, schema=schema)
    df = df.orderBy(col("Quantidade").desc())
    df.show()

# Exemplo de uso:
data = [
    ("Customer1", "Hello, I have a problem with my order.", "Atendimento1"),
    ("Agent1", "How may I assist you?", "Atendimento1"),
    ("Customer1", "I received the wrong item.", "Atendimento1"),
    ("Agent1", "I apologize for the inconvenience. Could you please provide your order number?", "Atendimento1"),
    ("Customer1", "Sure, my order number is ABC123.", "Atendimento1"),
    ("Agent1", "Thank you for providing the order number. Let me check the details.", "Atendimento1"),
    ("Agent1", "I see that there was a mistake in shipping. We will arrange a replacement for you.", "Atendimento1"),
    ("Customer1", "That sounds good. When can I expect the replacement?", "Atendimento1"),
    ("Agent1", "The replacement will be shipped within 24 hours. You will receive a confirmation email.", "Atendimento1"),
    ("Customer1", "Great, thank you for your help!", "Atendimento1"),
    ("Customer2", "Hello, I have a question.", "Atendimento2")
]

print_repeated_ngrams(data, 2)  # Para bigramas
print_repeated_ngrams(data, 3)  # Para trigramas


2-grams repetidos:
N-grama              Quantidade
Hello, I             2         
I have               2         
have a               2         
you for              2         
+--------+----------+
| N-grama|Quantidade|
+--------+----------+
| you for|         2|
|Hello, I|         2|
|  I have|         2|
|  have a|         2|
+--------+----------+

3-grams repetidos:
N-grama              Quantidade
Hello, I have        2         
I have a             2         
+-------------+----------+
|      N-grama|Quantidade|
+-------------+----------+
|Hello, I have|         2|
|     I have a|         2|
+-------------+----------+

