In [1]:
import os
import sys
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql import SparkSession, Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, Word2Vec, StopWordsRemover
from pyspark.ml.clustering import KMeans

In [2]:
filelist = os.listdir('extractandshuffle')
filelist[0]
shufflefilepathlist=[]
for f in filelist:
    shufflefilepathlist.append("extractandshuffle/"+f)
shufflefilepathlist[0]
txtpath='extractandshuffle/'

In [3]:
class KSearch:
    def __init__(self, filepathlist,txtpath,filelist):
        self.name = filepathlist
        self.txtpath = txtpath
        self.filelist = filelist
        self.saveModelName ="KmeansModelPipeline.model"

    
    def startSpark(self): #starts spark session and defines base schema for dfs
        os.environ['PYSPARK_PYTHON'] = sys.executable
        os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
        self.sc = SparkContext(appName="k") #not yet implmented handling of existing spark sessions
        self.spark = SparkSession(self.sc) #kernel must be reset to reinitiate spark to create new class instance
        self.spark.sparkContext.setLogLevel("FATAL")
        self.schema = schema = StructType([
            StructField("filename", StringType(), True),
            StructField("filepath", StringType(), True),
            StructField("contents", StringType(), True)])
        
    def pullfile(self,file): #Function used in make_spark_df to pull text files
        with open(file,encoding='utf-8') as f:
            chapStr=f.read()
        return chapStr

    def make_spark_df(self): #Function used to aggregate content of text files into a dataframe
        records = []
        for file in self.filelist:
            filepath=self.txtpath+file
            contents = self.pullfile(filepath)
            record={'filename':file,'filepath':filepath,'contents':contents}
            records.append(record)
        aDF = self.spark.createDataFrame((Row(**x) for x in records),self.schema)
        return aDF
    
    def createFrames(self): #Function used to create dataframes
        self.df = self.make_spark_df().sample(fraction=.1) #creates main database of documents that can be reccomended
        self.searchDF = self.make_spark_df().sample(fraction=.001)
    
    def fit_pipeline(self): #creates stages of piplines fits pipline to document database
        self.tokenizer= RegexTokenizer(inputCol="contents", outputCol="tokens",minTokenLength=4)
        self.stop = StopWordsRemover(inputCol="tokens",outputCol="filteredtokens")
        self.w2v = Word2Vec(inputCol="filteredtokens", outputCol="features")
        self.kmeans = KMeans(maxIter=5,featuresCol='features',predictionCol='prediction').setK(1500)
        self.stages=[self.tokenizer,self.stop,self.w2v,self.kmeans]
        self.content_processing_pipe = Pipeline(stages=self.stages).fit(self.df)
        
    def transform_ddb(self):# transforms document database with predictions to compare with search input
        self.df = self.content_processing_pipe.transform(self.df).select('filename','filepath','prediction')
        self.df = self.df.drop("tokens","filteredtokens","features","content") #removes unused columns to free up space
        # currently content column is also removed as the functionality to pull recommeneded content has not been implemented
        # content could be pulled from the origin text file using the filepath column rather than from df in volitle memory
    
    def writeModel(self): #attempts to save model
        try: #Reusing model not yet implemented
            self.content_processing_pipe.save(self.saveModelName)
        except:
            print("save failed")
    
    def make_predictions(self, search=None): #takes in a document and predicts which cluster it will relate to
        if search != None:
            self.predictions = self.content_processing_pipe.transform(search).select('filename','filepath','prediction')
        else:
            self.predictions = self.content_processing_pipe.transform(self.searchDF).select('filename','filepath','prediction')
        
    def isolatesearch(self): #ensures that only one document is searched
        self.userSearch=self.predictions.tail(1) #batch searches not implemented
    
    def search_results(self): #extracts predicted cluster label from users search
        clusterLabel=str(self.userSearch[0][2])
        print(clusterLabel)
        results = self.df.filter(self.df['prediction'] == clusterLabel)
        return results
                      
    def reccomend(self): # returns a reccomendation to the user
        results = self.search_results().collect()
        print("If you liked: " +self.userSearch[0][0]) #returns file name of users document used for search
        print("Then you may like:")
        for result in results:
            print(result[0]) #returns filenames of cluster members
    
    def rt(self):
        now = datetime.now()
        print("time = ", now)
    
    def search(self): #main function of class that builds pipline model and simulates user input by selecting
        print("Starting Spark")
        self.rt()
        self.startSpark()
        print("creating spark DFs")
        self.createFrames()
        print("Building model")
        self.fit_pipeline()
        print("Creating base dataset")
        self.transform_ddb()
        print("saving model")
        self.writeModel()
        print("searching for similar documents")
        self.make_predictions()
        self.isolatesearch()
        self.reccomend()
        self.rt()
        
    def newSearch(self,document=None):
        print("searching for similar documents")
        if document != None:
            self.sampleDF = document
        else:
            count = self.searchDF.count()
            x = 10/count
            sampleDF = self.searchDF.sample(fraction=x)
        self.make_predictions(sampleDF)
        self.isolatesearch()
        self.reccomend()
        self.rt()
        
    def load_existing_search(self): #not implemented function to load in existing Kmeans pipline save to preform searches
        pass #would use a sub class to load in model and inherit document database for user instance searching
            #would also add in additional functionality for handling user provided documents
            #implementation of spark streaming would allow for distributed computation of user searches to scale


In [4]:
s = KSearch(shufflefilepathlist,txtpath,filelist)
s.search()

Starting Spark
time =  2022-03-04 13:18:10.236066


Picked up _JAVA_OPTIONS: -Xmx30G
Picked up _JAVA_OPTIONS: -Xmx30G
22/03/04 13:18:11 WARN Utils: Your hostname, matthew-MS-7A34 resolves to a loopback address: 127.0.1.1; using 192.168.0.232 instead (on interface wlp40s0)
22/03/04 13:18:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/04 13:18:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


creating spark DFs
Building model


                                                                                

Creating base dataset
saving model
save failed
searching for similar documents
931




If you liked: _book_138_chap_00935.txt
Then you may like:
_book_138_chap_00295.txt
_book_138_chap_00695.txt
_book_138_chap_00468.txt
_book_138_chap_00595.txt
_book_138_chap_01104.txt
_book_138_chap_01063.txt
_book_138_chap_01216.txt
_book_138_chap_00843.txt
_book_138_chap_00939.txt
_book_138_chap_01107.txt
_book_138_chap_00764.txt
_book_138_chap_00680.txt
_book_138_chap_00354.txt
_book_138_chap_00912.txt
_book_138_chap_00911.txt
_book_138_chap_00080.txt
_book_138_chap_01173.txt
_book_138_chap_01060.txt
_book_138_chap_01168.txt
time =  2022-03-04 13:24:34.544157


                                                                                

In [5]:
s.newSearch()

searching for similar documents


                                                                                

1035


[Stage 38:>                                                       (0 + 16) / 16]

If you liked: _book_54_chap_00515.txt
Then you may like:
_book_54_chap_00187.txt
_book_54_chap_00723.txt
_book_54_chap_00013.txt
_book_54_chap_00105.txt
_book_54_chap_00088.txt
_book_54_chap_00150.txt
_book_54_chap_00673.txt
_book_54_chap_00761.txt
_book_54_chap_00205.txt
_book_54_chap_00108.txt
_book_54_chap_00820.txt
_book_54_chap_00771.txt
_book_54_chap_00243.txt
_book_54_chap_00347.txt
_book_54_chap_00482.txt
_book_54_chap_00833.txt
time =  2022-03-04 13:24:43.428874


                                                                                