In [1]:
import bz2
import json
import re
import random
import sys
import os
import bz2
import time

In [2]:
PATTERN_INPUT = "../quotebank/quotes-{}.json.bz2"

In [3]:
def write_json_to_file(name, obj):
    # Use current timestamp to make the name of the file unique
    millis = round(time.time() * 1000)
    name = f'{name}_{millis}.json'
    with open(name, 'wb') as f:
        output = json.dumps(obj)
        f.write(output.encode('utf-8'))
    return name

# General

In [4]:
"""
Process a chunk of the input stream.
"""
def proc(input, evaluate_quote, max_length=20):
    # Ugly global variable usage :(
    global index
    global invalid_json_count
    global invalid_chunk_count
    global chunk_stitching
    global stitch_length
    global scrap_next
    global quote_is_open
    global quote_part
    global dat_part
    global euro_error
    global euro_count
    
    global totin
    global totout
    global prev
    global dec
    global start
    """Decompress and process a piece of a compressed stream"""
    dat = dec.decompress(input)
    got = len(dat)
    if got != 0:    # 0 is common -- waiting for a bzip2 block
        try:
            if (euro_error):
                # If the previous chunk ended unexpectedly and could not be decoded, try to combine it with this chunk
                s = (dat_part + dat).decode('utf-8')
                euro_error = False
            else:
                # Decode the current chunk
                s = dat.decode('utf-8')
                
            # List elements in the quote files are separated by new lines (\n)
            lines = s.split('\n')

            for line in lines:
                try:
                    if (scrap_next):
                        # If the object spans too many chunks we decide to scrap it, and keep scraping until JSON can parse the line (chunk)
                        ob = json.loads(line)
                        scrap_next = False
                        quote_is_open = False
                        chunk_stitching -= stitch_length
                    else:
                        if (quote_is_open):
                            # If previous chunk ended in the middle of a JSON object we merge that content with the current line
                            ob = json.loads(quote_part + line)
                            quote_is_open = False
                        else:
                            # Parse the current line
                            ob = json.loads(line)

                    # Parametrization - do work on a single quote JSON object
                    evaluate_quote({}, ob)
                except ValueError:
                    """
                    Error occurs when the line does not contain the whole JSON object, which happens for the last line in almost every chunk of input stream.
                    We solve this by remembering the partial object, and then merging it with the rest of the object when we load the next chunk.
                    JSON object might span more than 2 chunks, and in that case we keep merging until we reach max_length chunks, when we just throw away the object
                    and count it as invalid using invalid_json_count.
                    """
                    if (scrap_next):
                        pass
                    else:
                        if (quote_is_open):
                            chunk_stitching += 1
                            quote_part = quote_part + line
                            stitch_length += 1

                            if (stitch_length > max_length):
                                invalid_json_count += 1
                                scrap_next = True
                        else:
                            quote_is_open = True
                            quote_part = line
                            stitch_length = 0
        except UnicodeDecodeError as e:
            # Error occurs when input stream is split in the middle of a character which is encoded with multiple bytes, for example the euro symbol
            if (euro_error):
                dat_part = dat_part + dat
            else:
                euro_error = True
                dat_part = dat
            
            euro_count += 1
        
        index += 1
    return got

In [5]:
def run_through_quotes(init, evaluate_quote, year, target_dict_name, path_to_input, name='test', chunk_size=16384):
    global index
    global invalid_json_count
    global invalid_chunk_count
    global chunk_stitching
    global stitch_length
    global scrap_next
    global quote_is_open
    global quote_part
    global dat_part
    global euro_error
    global euro_count
    
    global totin
    global totout
    global prev
    global dec
    global start
    
    size = os.path.getsize(path_to_input)
    invalid_json_count = 0
    invalid_chunk_count = 0
    chunk_stitching = 0
    stitch_length = 0
    scrap_next = False
    quote_is_open = False
    quote_part = ''
    dat_part = 0
    euro_error = False
    euro_count = 0
    
    totin = 0
    totout = 0
    prev = -1
    dec = bz2.BZ2Decompressor()
    start = time.time()
    
    init({})
    
    target_dict = poli_quotes if target_dict_name == "poli_quotes" else signi_quote_dict
    index = 0
    with open(path_to_input, 'rb') as f:
        for chunk in iter(lambda: f.read(chunk_size), b''):
            # feed chunk to decompressor
            got = proc(chunk, evaluate_quote)

            # handle case of concatenated bz2 streams
            if dec.eof:
                rem = dec.unused_data
                dec = bz2.BZ2Decompressor()
                got += proc(rem, evaluate_quote)

            # show progress
            totin += len(chunk)
            totout += got
            if got != 0:    # only if a bzip2 block emitted
                frac = round(1000 * totin / size)
                if frac != prev:
                    left = (size / totin - 1) * (time.time() - start)
                    print(f'\r{frac / 10:.1f}% (~{left:.1f}s left)\tyear: {year}\tnumber of speakers: {len(target_dict)}\tstitching: {chunk_stitching}\teuro count: {euro_count}\tinvalid json count: {invalid_json_count}\tinvalid chunk count: {invalid_chunk_count}', end='')
                    prev = frac

    # Show the resulting size.
    print(end='\r')
    print(totout, 'uncompressed bytes')

    output_name = write_json_to_file(f'{name}-{year}', target_dict)
    return output_name

### General fix

In [20]:
CHUNK_SIZE = 1_048_576

def process_compressed_json_file(input_file_name, output_name, year, process_json_object):
    # Decompression variables
    decompressor = bz2.BZ2Decompressor()
    
    # Decoding variables
    decoding_buffer = bytearray([])
    decoding_error_counter = 0
    
    # Parsing variables
    parsing_buffer = ''
    parsing_error_counter = 0
    
    # Progress variables - used to provide feedback to the dev
    input_size = os.path.getsize(input_file_name)
    start_time = time.time()
    total_in = 0
    total_out = 0
    previous_value = -1
    
    # Result of processing
    out_json = dict()
    
    # Iterate through the file
    with open(input_file_name, 'rb') as input_file:
        for chunk in iter(lambda: input_file.read(CHUNK_SIZE), b''):
            # Feed chunk to decompressor
            decompressed_chunk = decompressor.decompress(chunk)
            dec_chunk_length = len(decompressed_chunk)
            
            # Check the length of the decompressed data - 0 is common -- waiting for a bzip2 block
            if (dec_chunk_length == 0):
                continue
            
            # Try to decode byte array
            decoding_buffer += decompressed_chunk
            try:
                chunk_string = decoding_buffer.decode('utf-8')
                
                # Clear buffer
                decoding_buffer = bytearray([])
                
                decoding_successful = True
            except UnicodeDecodeError:
                # Error occurs when input stream is split in the middle of a character which is encoded with multiple bytes
                decoding_error_counter += 1
                decoding_successful = False
            
            # Try to parse the decoded string
            if decoding_successful:
                # Elements of the JSON array are split by '\n'
                array_elements = chunk_string.split('\n')
                
                # Iterate through the JSON array in the current chunk
                for json_candidate in array_elements:
                    # Try to parse the JSON object, might fail if the object was divided in parts because of the chunk separation
                    parsing_buffer += json_candidate
                    try:
                        json_obj = json.loads(parsing_buffer)
                        
                        # Clear buffer
                        parsing_buffer = ''
                        
                        parsing_successful = True
                    except ValueError:
                        """
                        Error occurs when the line does not contain the whole JSON object, which happens for the last array element in almost every chunk of input stream.
                        We solve this by remembering the prevous partial objects in parsing_buffer, and then merging it with the rest of the object when we load the next chunk.
                        """
                        parsing_error_counter += 1
                        parsing_successful = False
                    
                    # Perform JSON object processing
                    if parsing_successful:
                        process_json_object(json_obj, out_json)
            
            # Show progress
            total_in += len(chunk)
            total_out += dec_chunk_length
            if dec_chunk_length != 0:    # only if a bzip2 block emitted
                processed_fraction = round(1000 * total_in / input_size)
                if processed_fraction != previous_value:
                    left = (input_size / total_in - 1) * (time.time() - start_time)
                    print(f'\r{processed_fraction / 10:.1f}% (~{left:.1f}s left)\tyear: {year}\tnumber of entries: {len(out_json)}\tdecoding errors: {decoding_error_counter}\tparsing errors: {parsing_error_counter}', end='      ')
                    previous_value = processed_fraction
    
    # Save result to file
    output_full_name = write_json_to_file(f'{output_name}-{year}', out_json)
    
    # Report ending
    print()
    total_time = time.time() - start_time
    print(f'File {input_file_name} processed in {total_time:.1f}s', end='\n\n')
    
    return output_full_name

# Counting

In [7]:
# The signature remains from an older version of the code, parameter out_file could be removed, but then has to be removed in other places in the code as well.
def initialize(out_file):
    global signi_count
    global signi_quote_dict
    signi_count = 0
    signi_quote_dict = {}

In [8]:
signi_count = 0
signi_quote_dict = {}

In [9]:
# The signature remains from an older version of the code, parameter out_file could be removed, but then has to be removed in other places in the code as well.
def count_significant_quotes(out_file, row):
    global signi_count
    global signi_quote_dict
    
    probas = row['probas']
    qids = row['qids']
    
    if (len(probas) == 0 or len(qids) == 0):
        return
    
    if (probas[0][0] == 'None'):
        return
    
    p = float(probas[0][1])
    if (p < 0.8):
        return
    
    qid = qids[0]
    
    signi_count = signi_count + 1
    signi_quote_dict[qid] = signi_quote_dict.get(qid, 0) + 1

### Counting fix

In [10]:
# NOT_SIGNIFICANT_QUOTE = False, 0, 0

def check_if_significant_quote(row, significant_quote_counters):
    probabilities = row['probas']
    qids = row['qids']
    
    # Check if the probas and qids values exist
    if (len(probabilities) == 0 or len(qids) == 0):
        return
    
    # Check if the speaker is not 'Unknown'
    if (probabilities[0][0] == 'None'):
        return
    
    # Check if the probability is over 80%
    prob = float(probabilities[0][1])
    if (prob < 0.8):
        return
    
    # Increment count
    qid = qids[0]
    significant_quote_counters[qid] = significant_quote_counters.get(qid, 0) + 1

# Quote selection

In [11]:
poli_quotes = {}
poli_people = set()

In [12]:
def poli_initialize(out_file):
    global poli_quotes
    global poli_people
    global dem_list
    global rep_list
    
    poli_quotes = {}
    poli_people = set()
    
    for v in dem_list:
        poli_people.add(v['item'])
    for v in rep_list:
        poli_people.add(v['item'])

In [13]:
"""
Remember the quote, only if it belongs to one of the politicians in the set poli_people, and if the probability is over 80%.
"""
def save_politician_quotes(out_file, row):
    global poli_quotes
    global poli_people
    
    probas = row['probas']
    qids = row['qids']
    
    # Check if the probability field exists
    if (len(probas) == 0 or len(qids) == 0):
        return
    
    if (probas[0][0] == 'None'):
        return
    
    # Check if the probability is over 80%
    p = float(probas[0][1])
    if (p < 0.8):
        return
    
    # Check if the speaker is one of the 100 party members
    qid = qids[0]
    if qid not in poli_people:
        return
    
    # Remember only the quote and the probability
    data = {}
    data['quotation'] = row['quotation']
    data['proba'] = row['probas'][0][1]
    
    # Append the quote
    arr = poli_quotes.get(qid, [])
    arr.append(data)
    poli_quotes[qid] = arr

### Quote selection fix

In [14]:
# NOT_PARTY_MEMBER_QUOTE = False, 0, 0

def check_if_party_member_quote(row, party_member_quotes, party_list):
    probabilities = row['probas']
    qids = row['qids']
    
    # Check if the probas and qids values exist
    if (len(probabilities) == 0 or len(qids) == 0):
        return
    
    # Check if the speaker is not 'Unknown'
    if (probabilities[0][0] == 'None'):
        return
    
    # Check if the probability is over 80%
    p = float(probabilities[0][1])
    if (p < 0.8):
        return
    
    # Check if the speaker is on the party list
    qid = qids[0]
    if qid not in party_list:
        return
    
    # Remember only the quote and the probability
    data = {}
    data['quotation'] = row['quotation']
    data['proba'] = row['probas'][0][1]
    
    # Append the quote
    arr = party_member_quotes.get(qid, [])
    arr.append(data)
    party_member_quotes[qid] = arr

# Test

In [15]:
do_counting = False

In [16]:
# years = [2015, 2016, 2017, 2018, 2019, 2020]
years = [2020]
for year in years:
    path_to_input = PATTERN_INPUT.format(year)
    
    if do_counting:
        # Counting
#         run_through_quotes(
#             initialize, count_significant_quotes, year, "signi_quote_dict", path_to_input, name='signi-quote-count', chunk_size=1_048_576)
        run_through_quotes(
            initialize, check_if_significant_quote, year, "signi_quote_dict", path_to_input, name='signi-quote-count', chunk_size=1_048_576)
        print('')
        print(f'Finished counting quotes for the year {year}')
    else:
        # Quote selection
        run_through_quotes(
            poli_initialize, save_politician_quotes, year, "poli_quotes", path_to_input, name='politician-quotes', chunk_size=1_048_576)
        print('')
        print(f'Finished compiling quotes for the year {year}')

NameError: name 'dem_list' is not defined

### Fix testing

In [23]:
test_counting_fix = True

In [21]:
from functools import partial

# Get dem_list and rep_list
with open('../quotebank/top100_politicians_by_party.json', 'r') as f:
    party_lists = json.load(f)

dem_list = party_lists['dem']
rep_list = party_lists['rep']
    
# Join both party lists
dem_and_rep_set = set()    
for v in dem_list:
    dem_and_rep_set.add(v['item'])
for v in rep_list:
    dem_and_rep_set.add(v['item'])

# Define partial function check_if_dem_or_rep_quote using function check_if_party_member_quote
check_if_dem_or_rep_quote = partial(check_if_party_member_quote, party_list=dem_and_rep_set)

In [24]:
# years = [2015, 2016, 2017, 2018, 2019, 2020]
years = [2020]

output_list = []

print('Counting quotes' if test_counting_fix else 'Saving politician quotes')
for year in years:
    path_to_input = PATTERN_INPUT.format(year)
    
    # Process quote files
    if test_counting_fix:
        short_name = 'data/signi-quote-count'
        process_quote = check_if_significant_quote
        
        # Counting fix
#         output_name = process_compressed_json_file(path_to_input, 'data/signi-quote-count', year, check_if_significant_quote)
#         output_list.append(output_name)
    else:
        short_name = 'data/politician-quotes'
        process_quote = check_if_dem_or_rep_quote
        
        # Quote selection fix
#         output_name = process_compressed_json_file(path_to_input, '', year, check_if_dem_or_rep_quote)
#         output_list.append(output_name)

    output_name = process_compressed_json_file(path_to_input, short_name, year, process_quote)
    output_list.append(output_name)

print('\n\nOutput file names:')
for file_name in output_list:
    print(file_name)

Counting quotes
100.0% (~0.1s left)	year: 2020	number of entries: 171497	decoding errors: 0	parsing errors: 792       
File ../quotebank/quotes-2020.json.bz2 processed in 235.1s



Output file names:
data/signi-quote-count-2020_1636656766089.json
