-
Notifications
You must be signed in to change notification settings - Fork 8
/
item_info_extractor.py
348 lines (294 loc) · 14.8 KB
/
item_info_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
"""
imports
"""
import os
import sys
from datetime import datetime
import gzip
import logging
import xml.etree.ElementTree as ET
from decimal import Decimal
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from bs4 import BeautifulSoup # pylint: disable=import-error
import requests # pylint: disable=import-error
# this allows for the automation sytem to find the relevant scripts in the file
add_to_python_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(add_to_python_path)
from app import supermarket_info_dictionary # pylint: disable=import-error disable=wrong-import-position
from models import Product, BranchPrice # pylint: disable=import-error disable=wrong-import-position
logging.basicConfig(filename='info-extractor.log', level=logging.INFO,
format='%(asctime)s: %(funcName)s: %(levelname)s: %(message)s')
class InfoExtractor:
"""
This class executes the following task:
1. web scrapes the url's from the url list and retrieves the relevant gzip file links
2. extracts the xml file from the gzip file and parses it into an xml tree
3. the relevant information is then extracted from the parsed xml tree
4. the information is placed in the relevant table in the db.
- this script only updates the product information (db table name = products)
- and branch price information (db table name = branch_price)
Currently this class can only handle the supermarkets: shufersal, mega and victory
XML files are found @https://www.consumers.org.il/item/transparency_price
"""
def __init__(self):
self.current_super = ''
# list of unwanted names to be filter out
self.exclude_names = ['משלוחים', 'ריק', 'פיקדון', 'תיבה', 'משלוח']
self.item_id_set = set()
self.branch_price_unique_constraint_set = set()
engine = create_engine('mysql+pymysql://Super_User:SuperX1234'
'@mysql-13101-0.cloudclusters.net:13101/SuperX',
echo=False)
self.session = Session(bind=engine)
def run_info_extractor(self):
"""
This method is incharge of the whole extraction process.
It works in the following way:
1. retreives all zip file links from the relevant website
2. retreives all branch id's and child node objects that hold the wanted information
3. creates 2 lists one containing all Product info and the other all BranchPrice info
4. commits to the db
"""
self.create_branch_price_set()
self.create_product_set()
for key in supermarket_info_dictionary:
self.current_super = supermarket_info_dictionary[key]
url_list = [self.current_super['url']]
if self.current_super['multiple_pages']:
try:
url_list = self.get_all_super_links()
except ConnectionError as c_e:
logging.error(str(c_e))
zip_links = self.get_zip_file_links(url_list)
node_info_list = self.extract_xml_from_zip_and_parse(zip_links)
for info_child_node, branch_id in node_info_list:
xml_info_list = self.extract_information_from_parsed_xml(info_child_node)
if branch_id == '86' and self.current_super['store_name'] == 'victory':
continue
product_info_list, branch_price_list = self.fill_product_and_branch_price_tables(
xml_info_list, branch_id)
if len(product_info_list) != 0:
self.session.bulk_save_objects(product_info_list)
if len(branch_price_list) != 0:
self.session.bulk_save_objects(branch_price_list)
self.session.flush()
self.session.commit()
self.session.close()
def get_zip_file_links(self, url_list):
"""
This method web scrapes the urls in url_list and creates a set of the gzip file links.
The method then sends the set to parsing
If connection to the url failed, moves on to next url
:param url_list: list of urls to extract zip files from
:return: a set of zip file links
"""
for url in url_list:
try:
page = requests.get(url)
web_scrapper = BeautifulSoup(page.content, 'html.parser')
except requests.ConnectionError:
logging.error('Unable to connect to url:\n %s', url)
else:
links_list = web_scrapper.find_all('a')
zip_links = set()
for link in links_list:
if link.has_attr('href'):
https = str(link.attrs['href'])
if self.current_super['price_full'] in https:
zip_links.add(https)
return zip_links
def extract_xml_from_zip_and_parse(self, zip_links):
"""
This method retrieves the xml file in the gzip file and parses it into an xml tree
The child node containg the item information is found
and The branch_id is retrieved from the xml file.
This information is packed into a tuple and placed in a list
If connection failed, moves on to next link
:param zip_links: list of zip file links from the website
:return: a list of tuples containing all branch id's and child node object's
"""
node_info_list = []
for zip_link in zip_links:
# fix zip link url if neccessary
if not self.current_super['zip_link_prefix'] is None:
zip_link = self.current_super['zip_link_prefix'] + zip_link
try:
request = requests.get(zip_link)
content = request.content
except requests.ConnectionError:
logging.error('Unable to extract from zip file with url: %s', zip_link)
else:
xml_file = gzip.decompress(content).decode('utf-8')
store_id = 'StoreId'
if self.current_super['store_name'] == 'victory':
store_id = 'StoreID'
# parses the xml document into a tree
tree = ET.fromstring(xml_file)
branch_id = tree.find(store_id).text.lstrip('0')
# gets child containing item information
info_child_node = tree.getchildren()[-1] # pylint: disable=deprecated-method
node_info_list.append((info_child_node, branch_id))
return node_info_list
def extract_information_from_parsed_xml(self, xml_info_child_node):
"""
This method iterates over all items in the supermarket and extracts the relevant data
The data is then committed packed into a tuple and placed in a list of all the info tuples
:param xml_info_child_node: The child of the parsed xml tree containing all item info
:return: a list of tuples containing the information
"""
item_attr_name = self.current_super['item_attr_name']
is_weighted_attr = self.current_super['is_weighted_attr_name']
xml_info_list = []
for item in xml_info_child_node.findall(item_attr_name):
item_code = int(item.find('ItemCode').text)
if item_code == 0:
continue
item_name = item.find('ItemName').text
# exclude unwanted names from DB
for name in self.exclude_names:
if name in item_name:
continue
quantity = Decimal(item.find('Quantity').text)
price = Decimal(item.find('ItemPrice').text)
update_date = self.standardize_date(item.find('PriceUpdateDate').text)
is_weighted = False
if item.find(is_weighted_attr).text == '1':
is_weighted = True
unit_of_measure = 'יחידה'
if is_weighted:
unit_of_measure = self.standardize_weight_name(item.find('UnitQty').text.strip())
xml_info_list.append(
(item_code, item_name, quantity, is_weighted, unit_of_measure, price, update_date))
return xml_info_list
def fill_product_and_branch_price_tables(self, information_list, branch_id): # pylint: disable=too-many-locals
"""
This method receives a list containing a tuple of all the xml info
and places it into the correct table
:param information_list: list of tuples containing all the xml info
:param branch_id: the id of the current branch
:return: two lists, one containing Product obj the other BranchPrice objects
"""
branch_price_list = []
product_info_list = []
for item_code, item_name, quantity, is_weighted, unit_of_measure, price, update_date in information_list: # pylint: disable=line-too-long
# If the item is not in the db , add it
if item_code not in self.item_id_set:
product_info_list.append(Product(id=item_code,
name=item_name,
quantity=quantity,
is_weighted=is_weighted,
unit_of_measure=unit_of_measure))
self.item_id_set.add(item_code)
# if not in the db then add it
if (self.current_super['chain_id'], item_code, int(branch_id)) not in \
self.branch_price_unique_constraint_set:
branch_price_list.append(BranchPrice(chain_id=self.current_super['chain_id'],
branch_id=branch_id,
item_code=item_code,
price=price,
update_date=update_date))
self.branch_price_unique_constraint_set.add((self.current_super['chain_id'],
item_code,
int(branch_id)))
else:
current_branch_list = self.session.query(BranchPrice).filter_by(
chain_id=self.current_super['chain_id'],
item_code=item_code,
branch_id=branch_id).all()
if current_branch_list:
current_branch = current_branch_list[0]
old_price = current_branch.price
# update the price if it has changed
if old_price != price:
current_branch.price = price
self.session.flush()
return product_info_list, branch_price_list
def standardize_weight_name(self, unit_in_hebrew): # pylint: disable=no-self-use
"""
This method standardizes the unit of measurement
if the unit of measurement is not know, returns unknown
:param unit_in_hebrew: unit of measurement in hebrew
:return: standardized version of the unit or unknown if the unit is not known
"""
unit_dict = {
'ק"ג': ['קילו', 'ק"ג', 'קילו', 'קילוגרמים', '1 ק"ג'],
'גרם': ['גרם', 'גרמים'],
'ליטר': ['ליטר', 'ליטרים', 'ליטר '],
'מ"ל': ['מיליליטרים', 'מ"ל', 'מיליליטר'],
'יחידה': ['יחידה', 'לא ידוע', "יח'", "'יח", "יח`", "מטרים", "מארז", "קרטון"]
}
for unit in unit_dict.keys(): # pylint: disable=C0201
if unit_in_hebrew in unit_dict[unit]:
return unit
if "יח'" in unit_in_hebrew:
return 'יחידה'
# as a default return the original unit and log it
logging.info('New item weight name encoded to UTF-8: %s', unit_in_hebrew.encode("UTF-8"))
return unit_in_hebrew
def standardize_date(self, date):
"""
This method standardizes the update date of the item
:param date: string representation of the date from the xml file
:return: standardized date as a string
"""
# remove time from date
date = date[:10]
date_format = self.current_super['item_date_format']
new_date = datetime.strptime(date, date_format).date()
return new_date.__str__()
def get_all_super_links(self):
"""
builds a list of urls for the supermarket
:return: a list of links according to th amount of pages
"""
num_of_pages = self.get_num_of_pages()
if num_of_pages == -1:
raise ConnectionError(f'Unable to connect to url to find number of pages for '
f'{self.current_super["store_name"]}')
general_url = self.current_super['url']
general_url = general_url[:len(general_url) - 1]
url_list = []
for i in range(1, num_of_pages + 1):
url_list.append(general_url + str(i))
return url_list
def get_num_of_pages(self):
"""
gets the number of pages for a certain supermarket
In case the number of pages increases or decreases
the code will always check if it is a single, double, triple digit
:return: number of pages
"""
num_of_pages = '1'
try:
page = requests.get(self.current_super['url'])
web_scrapper = BeautifulSoup(page.content, 'html.parser')
except requests.ConnectionError:
num_of_pages = -1
else:
if self.current_super['store_name'] == 'shufersal':
links = web_scrapper.find_all(name='a', text='>>')
wanted_link = links[0]
for i in range(1, 4):
try:
num_of_pages = int(wanted_link.attrs['href'][-i::])
except ValueError:
break
return num_of_pages
def create_branch_price_set(self):
"""
queries the DB for all rows and saves the unique contraints in a set
"""
all_rows = BranchPrice.query.all()
for row in all_rows:
unique_constraints = (row.chain_id,
row.item_code,
row.branch_id)
self.branch_price_unique_constraint_set.add(unique_constraints)
def create_product_set(self):
"""
queries the DB for all rows and saves the unique contraints in a set
"""
all_rows = Product.query.all()
for row in all_rows:
self.item_id_set.add(row.id)