-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.py
139 lines (118 loc) · 4.46 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright (C) 2024 Beksultan Artykbaev - All Rights Reserved
import os
from collections import Counter
from typing import List
import spacy
from path_utils import resource_path
from download_lemmatizers import models
def filter_by_frequency(graph: List[List[str]], num: int) -> List[List[str]]:
'''Keep only num keywords in a graph, sorted by a frequency.'''
if num == 0:
return
all_keywords = list()
for line in graph:
for keyword in line:
all_keywords.append(keyword)
# Sorting by quantity
c = Counter(all_keywords)
most_common = c.most_common()
filtered_arr = list()
try:
for i in range(num):
filtered_arr.append(most_common[i][0]) # extracting keyword
except IndexError:
pass
new_graph = list()
for line in graph:
keywords = list()
for keyword in line:
if keyword in filtered_arr:
keywords.append(keyword)
if len(keywords) >= 1:
new_graph.append(keywords)
return new_graph
def generate_co_occurrence_matrix(graph: List[List[str]], binary:bool=False):
'''Generate co-occurrence matrix based on undirected graph.'''
all_keywords = list()
for line in graph:
for keyword in line:
all_keywords.append(keyword)
# Sorting by quantity
c = Counter(all_keywords)
most_common = c.most_common()
sorted_arr = list()
for keyword, frequency in most_common:
sorted_arr.append(keyword)
matrix = list()
matrix.append(list())
matrix[-1].append(None)
# Filling first row with var names
for keyword in sorted_arr:
matrix[-1].append(keyword)
# Filling first vertical column with var names.
# Not filling part of the matrix diagonally
# so that we don't have to calculate twice since
# matrix is symmetrical
symmetry = 0
for keyword in sorted_arr:
matrix.append(list())
matrix[-1].append(keyword)
for j in range(symmetry):
matrix[-1].append("") # Leaving empty for auto symmetry fill later
for i in range(len(sorted_arr)-symmetry):
matrix[-1].append(0)
symmetry += 1
# Counting occurence for each element in matrix
for y in range(len(matrix)):
for x in range(len(matrix[y])):
if y != 0 and x != 0:
if matrix[y][x] == "":
# Pass for symmetry fill later
pass
elif x == y: # Since matrix[y][x] is the two same keywords, so it's 0 by default
pass
else:
for keywords in graph:
if matrix[y][0] in set(keywords) and matrix[x][0] in set(keywords):
matrix[y][x] += 1
if binary:
break
# Symmetry fill
for y in range(len(matrix)):
for x in range(len(matrix[y])):
if matrix[y][x] == "":
matrix[y][x] = matrix[x][y]
return matrix
def homogenize(graph: List[List[str]]) -> List[List[str]]:
'''Converts strings in graph to their lower-cased version'''
homogenized_graph = list(list())
for line in graph:
homogenized_graph.append(list())
for text in line[0]:
homogenized_graph[-1].append(text.lower())
return homogenized_graph
def lemmatize(graph: List[List[str]], language:str="english") -> List[List[str]]:
'''Converts strings in graph to their lemmatized version'''
lemmatized_words = list(list())
model = models[language.lower()]
nlp = spacy.load(os.path.join(resource_path("models"), model))
for line in graph:
lemmatized_words.append(list())
for text in line:
doc = nlp(text)
lemmas = [token.lemma_ for token in doc]
s = " ".join(lemmas)
lemmatized_words[-1].append(s)
return lemmatized_words
def exclude_keywords_from_graph(graph: List[List[str]], exclude_keywords: List[str]) -> List[List[str]]:
'''Returns graph where given keywords are excluded from graph (Nodes connected to the excluded keywords (nodes) are removed too).'''
fixed_graph = list()
if exclude_keywords == None:
return graph
lower_cased = [word.lower().strip() for word in exclude_keywords]
for line in graph:
if len(set(line).intersection(set(lower_cased))) != 0:
pass
else:
fixed_graph.append(line)
return fixed_graph