In [1]:
import findspark
findspark.init()

import pyspark
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
from itertools import combinations 

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("jumble").getOrCreate()

In [4]:
def add_row_index(df):
    a1 = df.withColumn("row_idx", monotonically_increasing_id())
    windowSpec1 = W.orderBy("row_idx")
    b1 = a1.withColumn("row_idx", row_number().over(windowSpec1))
    return b1

In [5]:
def sortWord(word):
    return ''.join(sorted(word))

In [6]:
# this function is to process the filtered input dataframe and separate the inputs into two categories
# The first category is: input word has the same sorted word but the original word is not the same
# The second category is: input word has no other input word to share the sorted world
# The logic is: group the dataframe, if the count number for each group is larger than 1 , then the word is category 1
# if the count number for each group is equal one, then the word is category 2

def findDuplicateInputs(df):
    
    df1 = df.groupBy("sorted_word").agg(F.max("freq").alias("max_freq"), 
                                        F.max("filter_index"), F.count("word").alias("count_number")) 
    df2 = df1.where((col("count_number") > 1))
    for i in df2.collect():
        if i.max_freq != 0:
            df3 = df2.select("sorted_word", "max_freq").alias("df2").join(df.alias("df"), (col("df2.sorted_word") == col("df.sorted_word")) &\
                                            (col("df2.max_freq") ==col("df.freq")))\
                                           .drop(col("df2.sorted_word")).drop(col("input_word")).drop(col("df2.max_freq"))
        if i.max_freq == 0:
            df3 = df2.select("sorted_word").alias("df2").join(df.alias("df"), col("df2.sorted_word") == col("df.sorted_word"))\
                                           .drop(col("df2.sorted_word")).drop(col("input_word"))
                
    df2_1 = df1.where((col("count_number") == 1)) 
    df3_1 = df2_1.select("sorted_word").alias("df2_1").join(df.alias("df"), col("df2_1.sorted_word") == col("df.sorted_word"))\
                                       .drop(col("df2_1.sorted_word")).drop(col("input_word"))
    return df3, df3_1

In [7]:
# those functions are used to apply the index to the word, ex: "GLAND" 's filter index is [1,3,4], function getCharList is used to
# apply the index to the word and the get a combined string: 'lnd'

# function getDuplicateChar is used to apply the index to the category 1 word and get a combined string list

def getCharList(lis):
    
    input_list = []
    for i in lis:
        word = i[0]
        filter_index = i[1]
        
        temp_list = [word[i] for i in filter_index]
        input_list.extend(temp_list)
    return sortWord(input_list)

def getDuplicateChar(lis):
    
    input_list = []
    for i in lis:
            word = i[0]
            filter_index = i[1]
            temp_list = sortWord([word[i] for i in filter_index])
            input_list.append(temp_list)
    return input_list

In [8]:
def getCharList2(lis):

    input_list2 = []
    for i in lis:
        word1 = i[0]
        filter_index1 = i[1]
        
        word2=i[3]
        filter_index2 = i[4]
        
        temp_list1 = [word1[i] for i in filter_index1]
        temp_list2 = [word2[i] for i in filter_index2]
        temp_list = sortWord(temp_list1 + temp_list2)
        input_list2.append(temp_list)
        
    return input_list2

In [9]:
# This is the situation where the inputs have mutiple words to share the same sorted words, but there is just one kind.
# for example: A1, A2, A3 has the same sorted word A, in input_duplicates_df, the distinct column is 1

def getFinalInputChar1(lis):
    
    final_inut_char_list = []
    lis1 = getDuplicateChar(lis)
    lis2 = set(lis1)
    
    input_only_one_char = getCharList(input_only_one_list)
    
    for i in lis2:
        temp = sortWord(i+input_only_one_char)
        final_inut_char_list.append(temp)
        
    return final_inut_char_list

In [10]:
# Ex: In input_duplicates_df, A1, A2, A3 shared the sorted word: A, B1, B2 shared the sorted word B,so the disctinct number is 2
# the possiblility for inputs now is: A1B1, A1B2, A2B1, A2B2, A3B1, A3B2. then filter by the index, get rid duplicates, and then 
# add the inputs in input_only_one_list to get the final input string

def getFinalInputChar2(lis):
    
    temp = []
    final_inut_char_list = []
    
    for i in lis:
        for j in lis:
            if  i != j and i[2] != j[2]:
                temp.append(i+j)
                
    temp1 = getCharList2(temp)
    temp2 = set(temp1)
    
    input_only_one_char = getCharList(input_only_one_list)
    for i in temp2:
        temp3 = sortWord(i+input_only_one_char)
        final_inut_char_list.append(temp3)
    
    return final_inut_char_list

In [11]:
def getOutputWordPara(input_df):
    
    output_word_number_df = input_df.where(col("input_word") == "RESULT_LENGTH")
    output_word_index = output_word_number_df.collect()[0].filter_index
    output_word_number = len(output_word_index)
    
    return output_word_index, output_word_number

In [12]:
# filter the freq_dict list based on the output words length: for ex: if the output index is [3,4,5], which means filter the freq_dict 
# with length =3, length =4, length =5, after the filtering, the result is only have the possible element with the word length as the input need

def getFilteredList(freq_dict,output_word_index, output_word_number):
    
    temp = []
    res_list = []
    freq_dict_list = [[i.word, i.freq] for i in freq_dict.collect()]
    
    for i in range(0, output_word_number):
        for j in freq_dict_list:
            if len(j[0]) == output_word_index[i]:
                temp.append(j)
            
    for i in temp:
        if i not in res_list:
            res_list.append(i)       
    
    return res_list

In [32]:
# for ouput word number =1
def getFinalResult1(final_inut_char_list):
    
    for i in final_inut_char_list:
        for j in res_list:
            if sortWord(j[0]) == i and j[1] != 0 and j[1] <=1200:
                print(j)
    return j

In [14]:
#for output word number = 2
def getFinalResult2(final_inut_char_list):
    
    for i in final_inut_char_list:
        for j in comb:
            if len(j[0][0])==output_word_index[0] and len(j[1][0])== output_word_index[1]\
            and j[0][1]+j[1][1] !=0 and j[1][1] <=1200\
            and j[1][1] !=0 and sortWord(j[0][0] + j[1][0]) == i:
                print(j)
    return j

In [15]:
#for output word number =3
def getFinalResult3(final_inut_char_list):
    
    for i in final_inut_char_list:
        for j in comb:
            if len(j[0][0])==output_word_index[0] and len(j[1][0])==output_word_index[1] and len(j[2][0])==output_word_index[2]\
            and j[0][1]+j[1][1]+j[2][1] !=0 and j[2][1] <=1200\
            and j[2][1] != 0 and sortWord(j[0][0] + j[1][0] +j[2][0]) == i:
                print(j)
    return j

In [16]:
# load the freq_dict file and create a dataframe
input_file =open("freq_dict.json").read()
input_data = json.loads(input_file)

schema = StructType([StructField('word', StringType(), False),
                     StructField('freq', IntegerType(), False)])

freq_dict_df = spark.createDataFrame(input_data.items(),schema)

sort_word = udf(lambda x:sortWord(x.lower()))


#add a index column
freq_dict = add_row_index(freq_dict_df)
freq_dict = freq_dict.withColumn("sorted_word", sort_word(freq_dict.word))

#create a view
freq_dict.createOrReplaceTempView("freq_dict")

#load the json inputs

In [17]:
user_input =open("input4.json").read()
user_data = json.loads(user_input)

input_shcema = StructType([StructField('input_word', StringType(), False),
                           StructField('filter_index', ArrayType(IntegerType()), False)])

input_df = spark.createDataFrame(user_data.items(),input_shcema)

# add a soretd word column
input_df = add_row_index(input_df)
input_df = input_df.withColumn("sorted_word", sort_word(input_df.input_word))

In [18]:
input_word_df = input_df.where(col("input_word") != "RESULT_LENGTH")

# join input_wprd_df with freq_dict to filter freq_dict with inputs
# the result join_df is a filtered dataframe 

join_df = freq_dict.alias("a").join(input_word_df.alias("b"), col("a.sorted_word") == col("b.sorted_word"))
join_df = join_df.select("word","freq", "a.sorted_word", "input_word", "filter_index")

In [19]:
# separate the inputs into category 1 and category 2
# input_duplicates_df is category 1 (have mutilple inputs share a sorted word)
# input_only_one_df is category 2

input_duplicates_df, input_only_one_df = findDuplicateInputs(join_df)

In [20]:
# sorted_number is used to determine how to combine category 1 and category 2 inputs together to get the final input string

sorted_number_df = input_duplicates_df.agg(countDistinct(col("sorted_word")).alias("sorted_number"))
sorted_number = sorted_number_df.collect()[0].sorted_number

In [21]:
input_only_one_list = [[i.word, i.filter_index] for i in input_only_one_df.collect()]
input_duplicate_list = [[i.word, i.filter_index, i.sorted_word] for i in input_duplicates_df.collect()]

In [22]:
input_only_one_list


[['dinky', [0, 1]], ['encore', [1, 3]], ['devout', [0, 5]]]

In [23]:
if sorted_number ==1:
    
    final_inut_char_list = getFinalInputChar1(input_duplicate_list)
    
else:
    
    final_inut_char_list = getFinalInputChar2(input_duplicate_list)
    

In [24]:
final_inut_char_list

['ddgilnot', 'addiinot']

#Filter the freq_dict_list based on user inputs

In [25]:
#output_word_index means the output word length
# output_word_number means the how many words in output

output_word_index, output_word_number = getOutputWordPara(input_df)

In [26]:
output_word_number

1

In [27]:
res_list = getFilteredList(freq_dict,output_word_index, output_word_number)

#Result

In [29]:
# get combination from the filter list with output required length

comb = combinations(res_list, output_word_number) 

In [33]:
if output_word_number == 1:
    result_1 = getFinalResult1(final_inut_char_list)
    
if output_word_number == 2:
    result_2 = getFinalResult2(final_inut_char_list)
    
if output_word_number == 3:
    result_3 = getFinalResult3(final_inut_char_list)

['addition', 1092]
