In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425350 sha256=2558db08aaaaef9da96a0baaec9031142f071048b1e504b0defdf8884a47c164
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Tokenizer, HashingTF,IDF, Normalizer, VectorAssembler

In [3]:
spark = SparkSession.\
            builder.\
            master("local[2]").\
            appName("testing-NLP").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/06 15:34:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc = spark.sparkContext
sc

In [5]:
# Sample sentences in a list
sentences_list = [
    "This is the first sentence.",
    "Here is the second sentence.",
    "And here's a third sentence."
]

# Create a DataFrame with a column containing sentences
df = spark.createDataFrame([(sentence,) for sentence in sentences_list], ["sentence"])

# Split the sentences into words
df = df.withColumn("words", F.split("sentence", " "))

# Explode the words into separate rows
df_words = df.withColumn("word", F.explode("words")).drop("words")

# Convert words into an array of strings (single word per array)
df_words = df_words.withColumn("word_array", F.array("word")).drop("word")

df_words.show()

                                                                                

+--------------------+-----------+
|            sentence| word_array|
+--------------------+-----------+
|This is the first...|     [This]|
|This is the first...|       [is]|
|This is the first...|      [the]|
|This is the first...|    [first]|
|This is the first...|[sentence.]|
|Here is the secon...|     [Here]|
|Here is the secon...|       [is]|
|Here is the secon...|      [the]|
|Here is the secon...|   [second]|
|Here is the secon...|[sentence.]|
|And here's a thir...|      [And]|
|And here's a thir...|   [here's]|
|And here's a thir...|        [a]|
|And here's a thir...|    [third]|
|And here's a thir...|[sentence.]|
+--------------------+-----------+



In [6]:
# colecting word count for accurate modeling
df_words = df_words.withColumn('wordCount', F.size(col('word_array')))
df_words.show()
print(df_words.printSchema()) 
# wordCount = df_words.select(sum('wordCount')).collect()
wordCount = df_words.agg(F.sum("wordCount").alias("sum_wordCount")).collect()[0]["sum_wordCount"]
# a power of 2 that is larger than your word count
powerof2 = 2
exponent = 1
while wordCount > powerof2:
    powerof2 = 2**exponent
    exponent += 1
print(powerof2)
df_words = df_words.drop("wordCount")
# Compute Term Frequency (TF)
hashingTF = HashingTF(inputCol="word_array", outputCol="rawFeatures", numFeatures=powerof2) 
df_words = hashingTF.transform(df_words)
df_words.show()


+--------------------+-----------+---------+
|            sentence| word_array|wordCount|
+--------------------+-----------+---------+
|This is the first...|     [This]|        1|
|This is the first...|       [is]|        1|
|This is the first...|      [the]|        1|
|This is the first...|    [first]|        1|
|This is the first...|[sentence.]|        1|
|Here is the secon...|     [Here]|        1|
|Here is the secon...|       [is]|        1|
|Here is the secon...|      [the]|        1|
|Here is the secon...|   [second]|        1|
|Here is the secon...|[sentence.]|        1|
|And here's a thir...|      [And]|        1|
|And here's a thir...|   [here's]|        1|
|And here's a thir...|        [a]|        1|
|And here's a thir...|    [third]|        1|
|And here's a thir...|[sentence.]|        1|
+--------------------+-----------+---------+

root
 |-- sentence: string (nullable = true)
 |-- word_array: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- wor

In [7]:
# Compute Inverse Document Frequency (IDF) 
idf = IDF(inputCol="rawFeatures", outputCol="features") 
idfModel = idf.fit(df_words) 
df_words = idfModel.transform(df_words) 
print('checking IDF:')
print(df_words.show())

                                                                                

checking IDF:
+--------------------+-----------+---------------+--------------------+
|            sentence| word_array|    rawFeatures|            features|
+--------------------+-----------+---------------+--------------------+
|This is the first...|     [This]| (16,[3],[1.0])|(16,[3],[1.163150...|
|This is the first...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...|
|This is the first...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...|
|This is the first...|    [first]| (16,[3],[1.0])|(16,[3],[1.163150...|
|This is the first...|[sentence.]| (16,[8],[1.0])|(16,[8],[1.163150...|
|Here is the secon...|     [Here]|(16,[12],[1.0])|(16,[12],[2.07944...|
|Here is the secon...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...|
|Here is the secon...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...|
|Here is the secon...|   [second]| (16,[8],[1.0])|(16,[8],[1.163150...|
|Here is the secon...|[sentence.]| (16,[8],[1.0])|(16,[8],[1.163150...|
|And here's a thir...|      [And]| (16,[1],[1.0])|

In [8]:
print(df_words.show())

+--------------------+-----------+---------------+--------------------+
|            sentence| word_array|    rawFeatures|            features|
+--------------------+-----------+---------------+--------------------+
|This is the first...|     [This]| (16,[3],[1.0])|(16,[3],[1.163150...|
|This is the first...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...|
|This is the first...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...|
|This is the first...|    [first]| (16,[3],[1.0])|(16,[3],[1.163150...|
|This is the first...|[sentence.]| (16,[8],[1.0])|(16,[8],[1.163150...|
|Here is the secon...|     [Here]|(16,[12],[1.0])|(16,[12],[2.07944...|
|Here is the secon...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...|
|Here is the secon...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...|
|Here is the secon...|   [second]| (16,[8],[1.0])|(16,[8],[1.163150...|
|Here is the secon...|[sentence.]| (16,[8],[1.0])|(16,[8],[1.163150...|
|And here's a thir...|      [And]| (16,[1],[1.0])|(16,[1],[1.386

In [9]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
df_words_norm = normalizer.transform(df_words)
print(df_words_norm.show())

+--------------------+-----------+---------------+--------------------+---------------+
|            sentence| word_array|    rawFeatures|            features|   normFeatures|
+--------------------+-----------+---------------+--------------------+---------------+
|This is the first...|     [This]| (16,[3],[1.0])|(16,[3],[1.163150...| (16,[3],[1.0])|
|This is the first...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...| (16,[9],[1.0])|
|This is the first...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...| (16,[1],[1.0])|
|This is the first...|    [first]| (16,[3],[1.0])|(16,[3],[1.163150...| (16,[3],[1.0])|
|This is the first...|[sentence.]| (16,[8],[1.0])|(16,[8],[1.163150...| (16,[8],[1.0])|
|Here is the secon...|     [Here]|(16,[12],[1.0])|(16,[12],[2.07944...|(16,[12],[1.0])|
|Here is the secon...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...| (16,[9],[1.0])|
|Here is the secon...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...| (16,[1],[1.0])|
|Here is the secon...|   [second

In [10]:
# Function to convert sparse vector to dense vector
def sparse_to_dense(vector):
    return vector.toArray().tolist()

# UDF to apply the conversion function
sparse_to_dense_udf = udf(sparse_to_dense, ArrayType(FloatType()))

# Apply the UDF and add as a new column 'tfidf_dense'
df_words_dense = df_words_norm.withColumn('tfidf_dense', sparse_to_dense_udf(df_words_norm['normFeatures']))
df_words_dense.show()

                                                                                

+--------------------+-----------+---------------+--------------------+---------------+--------------------+
|            sentence| word_array|    rawFeatures|            features|   normFeatures|         tfidf_dense|
+--------------------+-----------+---------------+--------------------+---------------+--------------------+
|This is the first...|     [This]| (16,[3],[1.0])|(16,[3],[1.163150...| (16,[3],[1.0])|[0.0, 0.0, 0.0, 1...|
|This is the first...|       [is]| (16,[9],[1.0])|(16,[9],[1.673976...| (16,[9],[1.0])|[0.0, 0.0, 0.0, 0...|
|This is the first...|      [the]| (16,[1],[1.0])|(16,[1],[1.386294...| (16,[1],[1.0])|[0.0, 1.0, 0.0, 0...|
|This is the first...|    [first]| (16,[3],[1.0])|(16,[3],[1.163150...| (16,[3],[1.0])|[0.0, 0.0, 0.0, 1...|
|This is the first...|[sentence.]| (16,[8],[1.0])|(16,[8],[1.163150...| (16,[8],[1.0])|[0.0, 0.0, 0.0, 0...|
|Here is the secon...|     [Here]|(16,[12],[1.0])|(16,[12],[2.07944...|(16,[12],[1.0])|[0.0, 0.0, 0.0, 0...|
|Here is the secon.

In [11]:
df_words = df_words.drop("rawFeatures","features")
df_words.show()

+--------------------+-----------+
|            sentence| word_array|
+--------------------+-----------+
|This is the first...|     [This]|
|This is the first...|       [is]|
|This is the first...|      [the]|
|This is the first...|    [first]|
|This is the first...|[sentence.]|
|Here is the secon...|     [Here]|
|Here is the secon...|       [is]|
|Here is the secon...|      [the]|
|Here is the secon...|   [second]|
|Here is the secon...|[sentence.]|
|And here's a thir...|      [And]|
|And here's a thir...|   [here's]|
|And here's a thir...|        [a]|
|And here's a thir...|    [third]|
|And here's a thir...|[sentence.]|
+--------------------+-----------+



In [12]:
# Sample data
data = [([1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0],1.0), 
        ([2.0, 2.0, 2.0, 2.0],[1.0, 1.0, 1.0, 1.0], 1.0), 
        ([3.0, 3.0, 3.0, 3.0],[1.0, 1.0, 1.0, 1.0], 1.0)]

# Define the schema for the DataFrame
schema = StructType([
    StructField("features", ArrayType(DoubleType()), True),
    StructField("features2", ArrayType(DoubleType()), True),
    StructField("label", DoubleType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show()

+--------------------+--------------------+-----+
|            features|           features2|label|
+--------------------+--------------------+-----+
|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|
|[2.0, 2.0, 2.0, 2.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|
|[3.0, 3.0, 3.0, 3.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|
+--------------------+--------------------+-----+



In [13]:
df_dot_product = df.withColumn("dot_product",
                               F.expr("transform(features, (x, i) -> x * features2[i])"))
df_dot_product.show()

+--------------------+--------------------+-----+--------------------+
|            features|           features2|label|         dot_product|
+--------------------+--------------------+-----+--------------------+
|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[1.0, 1.0, 1.0, 1.0]|
|[2.0, 2.0, 2.0, 2.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[2.0, 2.0, 2.0, 2.0]|
|[3.0, 3.0, 3.0, 3.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[3.0, 3.0, 3.0, 3.0]|
+--------------------+--------------------+-----+--------------------+



In [14]:
# Calculate the sum of the array elements and create a new column
df_dot_product = df_dot_product.withColumn("dot_product_sum",
                                           F.expr('aggregate(dot_product, 0D, (acc, x) -> acc + x)'))
df_dot_product.show()

+--------------------+--------------------+-----+--------------------+---------------+
|            features|           features2|label|         dot_product|dot_product_sum|
+--------------------+--------------------+-----+--------------------+---------------+
|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[1.0, 1.0, 1.0, 1.0]|            4.0|
|[2.0, 2.0, 2.0, 2.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[2.0, 2.0, 2.0, 2.0]|            8.0|
|[3.0, 3.0, 3.0, 3.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[3.0, 3.0, 3.0, 3.0]|           12.0|
+--------------------+--------------------+-----+--------------------+---------------+



In [15]:
# Calculate the magnitude of each vector
df_dot_product = df_dot_product.withColumn("mag_list", 
                    F.expr("transform(features, x -> x * x)"))
# Calculate the magnitude of each vector
df_dot_product = df_dot_product.withColumn("mag_list2", 
                    F.expr("transform(features2, x -> x * x)"))
df_dot_product.show()

+--------------------+--------------------+-----+--------------------+---------------+--------------------+--------------------+
|            features|           features2|label|         dot_product|dot_product_sum|            mag_list|           mag_list2|
+--------------------+--------------------+-----+--------------------+---------------+--------------------+--------------------+
|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[1.0, 1.0, 1.0, 1.0]|            4.0|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|
|[2.0, 2.0, 2.0, 2.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[2.0, 2.0, 2.0, 2.0]|            8.0|[4.0, 4.0, 4.0, 4.0]|[1.0, 1.0, 1.0, 1.0]|
|[3.0, 3.0, 3.0, 3.0]|[1.0, 1.0, 1.0, 1.0]|  1.0|[3.0, 3.0, 3.0, 3.0]|           12.0|[9.0, 9.0, 9.0, 9.0]|[1.0, 1.0, 1.0, 1.0]|
+--------------------+--------------------+-----+--------------------+---------------+--------------------+--------------------+



In [16]:
df_dot_product = df_dot_product.drop('label') 
df_dot_product = df_dot_product.drop('label') 
df_dot_product = df_dot_product.withColumn("mag_list_sum",
                    F.sqrt(F.expr('aggregate(mag_list, 0D, (acc, x) -> acc + x)')))
df_dot_product = df_dot_product.withColumn("mag_list_sum2",
                    F.sqrt(F.expr('aggregate(mag_list2, 0D, (acc, x) -> acc + x)')))
df_dot_product.show()

+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+------------+-------------+
|            features|           features2|         dot_product|dot_product_sum|            mag_list|           mag_list2|mag_list_sum|mag_list_sum2|
+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+------------+-------------+
|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|            4.0|[1.0, 1.0, 1.0, 1.0]|[1.0, 1.0, 1.0, 1.0]|         2.0|          2.0|
|[2.0, 2.0, 2.0, 2.0]|[1.0, 1.0, 1.0, 1.0]|[2.0, 2.0, 2.0, 2.0]|            8.0|[4.0, 4.0, 4.0, 4.0]|[1.0, 1.0, 1.0, 1.0]|         4.0|          2.0|
|[3.0, 3.0, 3.0, 3.0]|[1.0, 1.0, 1.0, 1.0]|[3.0, 3.0, 3.0, 3.0]|           12.0|[9.0, 9.0, 9.0, 9.0]|[1.0, 1.0, 1.0, 1.0]|         6.0|          2.0|
+--------------------+--------------------+--------------------+---------------+--------------------

In [17]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session