# Debate a base
#### Update script (Parlamint 4.1)


### Functions
- Uploads all Parlamint data to ElasticSearch for the `ngrams` page
- BEWARE THIS WILL TAKE DAYS

### Don't forget to:
- Make sure that nothing else inside of the `data/original/EU` and `data/original/EN` folders besides what was mentioned in the step-by-step guide
- Fill in `es_host`, `es_user` and `es_password` so you can connect with your ElasticSearch instance

In [None]:
# Fill in credentials over here!
es_host = "https://localhost:9200"
es_user = "CHANGEME"
es_password = "CHANGEME"

In [1]:
# imports
import os
import time
import datetime
import csv
import re

import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd

from elasticsearch import Elasticsearch, helpers
from sklearn.feature_extraction.text import CountVectorizer
from joblib import Parallel, delayed

In [2]:
# establish connection with elasticsearch

# Elastic host
es = Elasticsearch(
    hosts=[
            es_host
    ],
    http_auth=(es_user, es_password),
#     use_ssl=True,
    verify_certs=False,
#     ca_certs="./ca.crt"
    timeout=30, 
    max_retries=10, 
    retry_on_timeout=True
)

# dir
translated_csv_dir = "../data/preprocessed"

In [3]:
# it is VERY dangerous to uncomment this
# es.indices.delete(index='ngrams1')
# es.indices.delete(index='ngrams2')
# es.indices.delete(index='ngrams3')
# es.indices.delete(index='ngrams4')
# es.indices.delete(index='ngrams5')

In [20]:
# return list met dict [{Land1}, {Land 1}]
def get_csv_files(country_selection):
    # os.chdir(translated_csv_dir)
    
    country_return_list = []

    # loop door alle folders die hierboven zijn geprint
    for country in os.listdir(translated_csv_dir):
        
        # filter op specifiek land (IN BOX 2)
        if country == country_selection:
            paths_dict = {}

            # ga door alle inhoud van de landfolder heen
            for root, dirs, files in os.walk(os.path.join(translated_csv_dir, country)):
                file_data = []
                
                # loop door files van een folder
                for file in files:
                    
                    file_data.append(file)

                paths_dict[root.split("\\")[1]] = file_data

            country_return_list.append(paths_dict)
        
    return country_return_list

In [5]:
# haal jaar maand en dag uit filename
def extract_file_date(file_name):
    
    year_month_day = re.search(r"\d{4}-\d{2}-\d{2}", file_name)
#     year, month, day = year_month_day[0].split("-")
    
    return year_month_day[0]

### Shingles from CSV

In [18]:
# geeft een dict met dates als keys en files als vals
def get_files_per_date(country, after=None):
    files = get_csv_files(country)[0][country]
    
    dates_dict = {}
    
    if after is not None:
        
        after = time.mktime(datetime.datetime.strptime(after, "%Y-%m-%d").timetuple())
    
    # loop door alle files heen
    for file in files:
        
        date = extract_file_date(file)
        
        # filter al geuploade dates uit de dict
        if after is not None:
            timestamp = time.mktime(datetime.datetime.strptime(date, "%Y-%m-%d").timetuple())
            
            if (timestamp - after) <= 0:
                
                continue
        
        if date in dates_dict.keys():
            
            dates_dict[date] += [file]
            
        else:
            
            dates_dict[date] = [file]
            
    return dates_dict

In [7]:
# maakt paths aan voor alle files die bij een date horen
def get_date_paths(root, files):
    # os.chdir(translated_csv_dir)
    
    paths = []
    
    for file in files:
        paths.append(os.path.join(translated_csv_dir, root, file))
        
    return paths

In [8]:
# maakt ngrams van alle tekst in een csv file
def get_date_vocabulary(paths):
    # os.chdir(translated_csv_dir)
    
    vocabulary = np.array(())
    
    for path in paths:
        df = pd.read_csv(path)
    
        # slecht vertaalde notebooks met alleen een index negeren
        if len(df.index) == 0:
            continue
            
        lines = [x for x in df["value"].to_list() if str(x) != 'nan']
    
        victor = CountVectorizer(ngram_range=(1, 5), token_pattern=r"(?u)\b\w+\b")
        victor.fit_transform(lines)
        
        vocabulary = np.unique(np.hstack((vocabulary, victor.get_feature_names_out()))) 
        
    return vocabulary

In [9]:
def get_date_counts(paths, vocabulary):
    # os.chdir(translated_csv_dir)
    
    counts = np.zeros(shape=(len(vocabulary)), dtype=int)
    
    for path in paths:
        df = pd.read_csv(path)
    
        # slecht vertaalde notebooks met alleen een index negeren
        if len(df.index) == 0:
            continue
            
        lines = [x for x in df["value"].to_list() if str(x) != 'nan']
    
        victor = CountVectorizer(ngram_range=(1, 5), token_pattern=r"(?u)\b\w+\b")
        victor.fit_transform(vocabulary)
        
        counts = counts + np.sum(victor.transform(lines).toarray(), axis=0)
    
    return counts

In [10]:
def get_es_date_dict(vocabulary, country, date, counts, index="ngrams"):
    year, month, day = re.search(r"\d{4}-\d{2}-\d{2}", date)[0].split("-")
    
    bulk = []
    
    for ngram, count in zip(vocabulary, counts):
        content_dict, es_content_dict = {}, {}
        
        # entry inhoud
        content_dict["ngram"] = ngram
        content_dict["country"] = country
        content_dict["year"] = year
        content_dict["month"] = month
        content_dict["day"] = day
        content_dict["count"] = count
        
        # entry technische dingen
        es_content_dict["_index"] = f"{index}{len(ngram.split(' '))}"
        es_content_dict["_source"] = content_dict
        
        # voeg entry toe aan bulk return lijst
        bulk.append(es_content_dict)
        
    return bulk

In [11]:
# # functie testruimte
# t_folder = "BE"
# t_date = '2014-11-18'

# test_dates_dict = get_files_per_date(t_folder)
# print(test_dates_dict)
# test_paths = get_date_paths(t_folder, test_dates_dict[t_date ])
# print(test_paths)
# test_vocabulary = get_date_vocabulary(test_paths)
# print(test_vocabulary)
# test_counts = get_date_counts(test_paths, test_vocabulary)
# print(test_counts)
# test_date_es_dict = get_es_date_dict(test_vocabulary, t_folder, t_date, test_counts, 0)
# print(test_date_es_dict)

### Error management

In [12]:
log_loc = os.path.join("../data", "uploaded_ngram_dates.csv")

# houd bij welke files zijn geupload tijdens multiprocessing
def log_date_upload(country, date):
    with open(log_loc, 'a', newline='', encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow([country, date])

In [13]:
# filter de dates die al indexed zijn
def get_remaining_dates(country, dates):
    df_processed_dates = pd.read_csv(os.path.join("../data", "uploaded_ngram_dates.csv"))
    
    df_processed_dates_cutout = df_processed_dates[df_processed_dates["key"] == country]
    
    if len(df_processed_dates_cutout) != 0:
        disposable_dates = list(df_processed_dates_cutout["value"])
        
        for disposable_date in disposable_dates:
            if disposable_date in dates.keys():
                dates.pop(disposable_date)
                
    return dates

In [14]:
### Countvectorizer 1.3
# Elastic (index = ngrams):
# - | Shingle | Land | Jaar | Maand | Dag | Aantal | Percentage |
#  - Shingles doen voor een datum, voor een land
#  - Percentage berekenen
#  - Uploaden in ES

def upload_ngrams(countries):
    
    for country in countries:
        
        # vraag de dates op die geprocessed moeten worden
        dates = get_remaining_dates(country, get_files_per_date(country))
        if len(dates) == 0:
            continue
            
        # dingen voor overzicht in prints
        timer = time.time()
        total_dates = len(dates.keys())
        
        def multithread_dates(date):        
            paths = get_date_paths(country, dates[date])
            
            vocabulary = get_date_vocabulary(paths)
            counts = get_date_counts(paths, vocabulary)
            
            elastic_dict = get_es_date_dict(vocabulary, country, date, counts)
            
            # upload dict to elastic
            helpers.bulk(es, elastic_dict)
            log_date_upload(country, date)
            print(f"[Info]: {round((time.time() - timer) / 60, 2)}m, Uploaded: {date} {country}")
            
        # process multiple dates at the same time
        Parallel(n_jobs=15, prefer="threads")(delayed(multithread_dates)(date) for date in dates)
            
        print(f"[Info]: {country} finished")
        
    return True

### Progress
- If everything is working, you should see print statements of which data and country got Ngrams uploaded to Elasticsearch
- This will take about 3 days with `n_jobs=15`, which means 15 dates are being processed at the same time
- Change `n_jobs=15` in the cell above if you want more or less processing power dedicated to the Ngram generation task
- When everything is done, the cell below will print: `True`

In [None]:
# werkende landen
todo = ['AT', 'BA', 'BE', 'BG', 'CZ', 'DK', 'EE', 'ES', 'ES-CT', 'ES-GA', 'ES-PV', 'FI', 'FR', 'GR', 'HR', 'HU', 'IS', 'IT', 'LV', 'NL', 'NO', 'PL', 'PT', 'RS', 'SE', 'SI', 'TR', 'UA', 'GB']

done = []

upload = upload_ngrams(todo)

print(upload)

# 4245 minutes in my case --> 'AT', 'BA', 'BE', 'BG', 'CZ', 'DK', 'EE', 'ES', 'ES-CT', 'ES-GA', 'ES-PV', 'FI', 'FR', 'GR', 'HR', 'HU', 'IS', 'IT', 'LV', 'NL', 'NO', 'PL', 'PT', 'RS', 'SE', 'SI', 'TR', 'UA', 'GB'