In [1]:
import dask.bag as bag
import os

In [2]:
raw_text = bag.read_text("foods.txt",encoding='latin1')

In [3]:
def get_next_buffer_part(file,start_index,span_index=0,blocksize=1000):
    file.seek(start_index)
    buffer = file.read(blocksize + span_index).decode('latin1')
    delimeter_position = buffer.find("\n\n")
    if delimeter_position == -1:
        return get_next_buffer_part(file,start_index,span_index+blocksize)
    else:
        file.seek(start_index)
        return start_index,delimeter_position

In [4]:
with open("foods.txt","rb") as file_handle:
    size = file_handle.seek(0,2) - 1
    more_data = True
    output = list()
    current_position = 0
    next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position,next_position = get_next_buffer_part(file_handle,current_position,0)
            output.append((current_position,next_position))
            current_position = current_position + next_position + 2

In [5]:
output

[(0, 471),
 (473, 390),
 (865, 737),
 (1604, 414),
 (2020, 357),
 (2379, 616),
 (2997, 543),
 (3542, 357),
 (3901, 326),
 (4229, 343),
 (4574, 999),
 (5575, 587),
 (6164, 613),
 (6779, 306),
 (7087, 337),
 (7426, 384),
 (7812, 396),
 (8210, 330),
 (8542, 948),
 (9492, 349),
 (9843, 459),
 (10304, 522),
 (10828, 354),
 (11184, 283),
 (11469, 507),
 (11978, 379),
 (12359, 286),
 (12647, 411),
 (13060, 695),
 (13757, 999),
 (14758, 700),
 (15460, 323),
 (15785, 1314),
 (17101, 696),
 (17799, 765),
 (18566, 614),
 (19182, 473),
 (19657, 483),
 (20142, 715),
 (20859, 551),
 (21412, 892),
 (22306, 1450),
 (23758, 367),
 (24127, 399),
 (24528, 458),
 (24988, 415),
 (25405, 437),
 (25844, 313),
 (26159, 513),
 (26674, 379),
 (27055, 288),
 (27345, 596),
 (27943, 1487),
 (29432, 371),
 (29805, 606),
 (30413, 530),
 (30945, 410),
 (31357, 380),
 (31739, 412),
 (32153, 311),
 (32466, 382),
 (32850, 365),
 (33217, 275),
 (33494, 1238),
 (34734, 542),
 (35278, 386),
 (35666, 1001),
 (36669, 687),
 

In [6]:
def get_dict_item(filename,start_index,delimeter_position,encoding='latin1'):
    with open(filename,"rb") as file_handle:
        file_handle.seek(start_index)
        single_review = file_handle.read(delimeter_position).decode(encoding)
        single_review_elements = single_review.strip().split("\n")
        key_value_pairs = list()
        for element in single_review_elements:
            if len(element.split(": ")) > 1:
                key_value_pairs.append((element.split(": ")[0], element.split(": ")[1]))
            else:
                key_value_pairs.append(("unknown",element))
        return dict(key_value_pairs)

In [7]:
reviews = bag.from_sequence(output)

In [8]:
reviews = reviews.map(lambda x: get_dict_item("foods.txt",x[0],x[1]))

In [9]:
from dask.diagnostics import ProgressBar

In [10]:
with ProgressBar():
    count = reviews.count().compute()

[########################################] | 100% Completed | 26.2s


In [11]:
count

568454

In [12]:
def tag_reviews(single_review):
    D = dict([(1.0,"Not Good"),(2.0,"Average"),(3.0,"Good"),(4.0,"Very Good"),(5.0,"Excellent")])
    single_review['review/score'] = D[float(single_review['review/score'])]
    return single_review

In [13]:
reviews = reviews.map(tag_reviews)

In [17]:
reviews

dask.bag<tag_reviews, npartitions=101>

In [18]:
reviews.take(1)

({'product/productId': 'B001E4KFG0',
  'review/userId': 'A3SGXH7AUHU8GW',
  'review/profileName': 'delmartian',
  'review/helpfulness': '1/1',
  'review/score': 'Excellent',
  'review/time': '1303862400',
  'review/summary': 'Good Quality Dog Food',
  'review/text': 'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.'},)