In [1]:
pip install pyspark pyyaml

Defaulting to user installation because normal site-packages is not writeable
Looking in links: /usr/share/pip-wheels
Note: you may need to restart the kernel to use updated packages.


In [10]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, ArrayType, IntegerType
from pyspark.sql.functions import sum as sum_agg
import pyspark.sql.functions as F
import yaml
import re

In [3]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    return text

def tokenize(text):
    return text.split()

def preprocess_and_tokenize(text):
    text = preprocess_text(text)
    return tokenize(text)

# UDF for preprocessing and tokenizing
preprocess_and_tokenize_udf = udf(preprocess_and_tokenize, ArrayType(StringType()))

In [4]:
spark = SparkSession.builder.appName("IntentRecognition").getOrCreate()

23/12/18 16:06:20 WARN Utils: Your hostname, blue-nbjupyterhub5 resolves to a loopback address: 127.0.0.1; using 10.0.0.155 instead (on interface ens5)
23/12/18 16:06:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/18 16:06:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Read the YAML file
with open('nlu.yml', 'r') as file:
    file_content = file.read()

In [6]:
# Parse the YAML file
parsed_data = yaml.safe_load(file_content)
data = [(intent["intent"], example) for intent in parsed_data["nlu"] for example in intent["examples"].split("\n")[1:] if example.strip()]

In [12]:
# Create DataFrame
df = spark.createDataFrame(data, ["intent", "example"])

# Map Phase: Preprocess and tokenize examples
df = df.withColumn("tokens", preprocess_and_tokenize_udf(df["example"]))

# Reduce Phase 1: Explode tokens and count occurrences
df_token_count = df.withColumn("token", explode(df["tokens"])).groupBy("intent", "token").count()

In [16]:
# Test cases
test_inputs = ["when was the company founded?", "search ekber", "So I'm here Today to ask one very simple question, what are you ?"]

for user_input in test_inputs:
    user_tokens = set(preprocess_and_tokenize(user_input))
    # Broadcast user input tokens
    broadcast_user_input = spark.sparkContext.broadcast(user_tokens)

    # Reduce Phase 2: Calculate match score
    def match_score(count, token):
        if token in broadcast_user_input.value:
            return count
        else:
            return 0

    match_score_udf = udf(match_score, IntegerType())

    # Add match score to DataFrame
    df_score = df_token_count.withColumn("match_score", match_score_udf(df_token_count["count"], df_token_count["token"]))

    # Aggregate scores for each intent
    df_intent_score = df_score.groupBy("intent").agg(F.sum("match_score").alias("total_score"))

    # Find the intent with the highest match score
    result_intent = df_intent_score.orderBy("total_score", ascending=False).limit(1)

    result_intent.show()

                                                                                

+--------------------+-----------+
|              intent|total_score|
+--------------------+-----------+
|faq/when_was_eCom...|        128|
+--------------------+-----------+



                                                                                

+--------------+-----------+
|        intent|total_score|
+--------------+-----------+
|search_product|         36|
+--------------+-----------+





+--------------------+-----------+
|              intent|total_score|
+--------------------+-----------+
|chitchat/ask_lang...|        193|
+--------------------+-----------+



                                                                                