In [1]:
## TF-idf PySpark

In [3]:
### Feature Transformations

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
data = spark.read.csv("Data/processed_TestData2500_df.csv",inferSchema=True,header=True,sep=',')

In [5]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="short_desc_processed", outputCol="token_short_desc_processed")
count_vec = CountVectorizer(inputCol='token_short_desc_processed',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")

In [6]:
### Pipeline

In [7]:
from pyspark.ml import Pipeline

In [8]:
data_prep_pipe = Pipeline(stages=[tokenizer,count_vec,idf])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)
clean_data = clean_data.select(['id','product','tf_idf'])
tfidf = clean_data.select(['tf_idf'])

In [9]:
clean_data.show()

+---+--------+--------------------+
| id| product|              tf_idf|
+---+--------+--------------------+
|  2| Log4j -|(1590,[49,404,572...|
|  3| Log4j -|(1590,[4,738],[2....|
| 22|Apache h|(1590,[3,39,62,27...|
| 27| Log4j -|(1590,[3,190,199]...|
| 29| Log4j -|(1590,[3,59,92,11...|
| 31| Log4j -|(1590,[44,49,123,...|
| 32| Log4j -|(1590,[424],[5.76...|
| 34| Log4j -|(1590,[8,95,112,6...|
| 35| Log4j -|(1590,[121,127,47...|
| 43| Log4j -|(1590,[0,193,395,...|
| 44| Log4j -|(1590,[3,156],[2....|
| 45| Batik -|(1590,[30,34,45,2...|
| 46| Batik -|(1590,[30,34,618]...|
| 47| Batik -|(1590,[292,618,88...|
| 48| Batik -|(1590,[0,452,839,...|
| 49| Batik -|(1590,[619,1124,1...|
| 50| Batik -|(1590,[74,190,195...|
| 51| Batik -|(1590,[35,212,541...|
| 52| Batik -|(1590,[93,225,312...|
| 53| Batik -|(1590,[116,219,61...|
+---+--------+--------------------+
only showing top 20 rows



In [10]:
## UDF

In [11]:
# Model-2: Similarity Score - TF-idf ----------------------------------------------------------------------------
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

tfidf_cosine_similarities = []
execution_count_tfidf = 0

def tfidf_preprocess_pyspark(id,df):
    X_train = df['short_desc_processed']
    print('TF-idf Vectorization and similarity score computation')
    # Vectorization
    vectorizer = TfidfVectorizer()
    tfidf = vectorizer.fit_transform(X_train)
    # Calculate the cosine similarity score
    tfidf_cosine_similarities = linear_kernel(tfidf)
    print('TF-idf preprocess done')
    
    index_main   = df.loc[lambda df: df['id'] == id].index.array[0]
    product_main = df.loc[lambda df: df['id'] == id,'product'].array[0]
    tfidf_cosine_similarities_list = []
    for index_other in range(shape_tfidf):
        id_other      = df.iloc[index_other]['id']
        product_other = df.iloc[index_other]['product']
        if product_main == product_other:
            tfidf_cosine_similarities_list.append([id_other,tfidf_cosine_similarities[index_main,index_other]])
    #Conver to dataframe
    tfidf_cosine_similarities_score_df = pd.DataFrame(tfidf_cosine_similarities_list, columns=['id','tfidf_score'])
    tfidf_cosine_similarities_score_df = tfidf_cosine_similarities_score_df.reset_index(drop=True)
    return tfidf_cosine_similarities_score_df

In [12]:
from pyspark.sql.functions import udf, struct, col
from pyspark.sql.types import * 
import pyspark.sql.functions as func

In [13]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("tfidf_score", IntegerType(), False)
])

In [14]:
tfidf_udf = udf(lambda z: tfidf_preprocess_pyspark(id, df), schema)

In [15]:
df_new = df.select('id', tfidf_udf(2,processed_data_df))
df_new.show()

NameError: name 'df' is not defined