# Commandes

In [None]:
# cd ~/twinews-logs ; jupython --venv st-venv ~/notebooks/twinews/indexing.ipynb

# Init

In [None]:
isNotebook = '__file__' not in locals()

In [None]:
from systemtools.hayj import *
from systemtools.location import *
from systemtools.basics import *
from systemtools.file import *
from systemtools.printer import *
from databasetools.mongo import *
from newstools.goodarticle.utils import *
from nlptools.preprocessing import *
from nlptools.news import parser as newsParser
from machinelearning.iterator import *
from twinews.utils import *

In [None]:
logger = Logger(tmpDir('logs') + "/twinews-indexing.log") if isNotebook else Logger("indexing.log")
tt = TicToc(logger=logger)
tt.tic()

In [None]:
TEST = False
initialDatasetVersion = 1 if isNotebook else 3
datasetVersion = "1.0"

In [None]:
dataRootPath = dataDir() + "/Twinews/" + "twinews" + str(initialDatasetVersion)

In [None]:
newsFiles = sortedGlob(dataRootPath + "/news/*.bz2")
bp(newsFiles, logger)
bp(list(NDJson(random.choice(newsFiles)))[0].keys(), 5, logger)
bp(list(NDJson(random.choice(newsFiles)))[0]['scrap'].keys(), 5, logger)
if TEST:
    newsFiles = newsFiles[:10]

In [None]:
usersFiles = sortedGlob(dataRootPath + "/users/*.bz2")
bp(usersFiles, logger)
bp(list(NDJson(random.choice(usersFiles)))[0].keys(), 5, logger)

In [None]:
(hjuser, hjpass, hjhost) = getMongoAuth(user='hayj')

In [None]:
newsCollection = getNewsCollection(logger=logger, user=hjuser, password=hjpass, host=hjhost)
usersCollection = getUsersCollection(logger=logger, user=hjuser, password=hjpass, host=hjhost)

In [None]:
exit()

In [None]:
# !!!!!! DELETION OF THE DATABASE !!!!!!
if False:
    newsCollection.resetCollection(security=False)
    usersCollection.resetCollection(security=False)

In [None]:
assert len(newsCollection) == 0
assert len(usersCollection) == 0

In [None]:
tt.tic("Init done")

# News

In [None]:
def preprocessNews(row, logger=None, verbose=True):
    try:
        if isinstance(row, str):
            text = row
        else:
            if dictContains(row, "scrap"):
                row = row['scrap']
            if dictContains(row, "text"):
                text = row['text']
            else:
                raise Exception("No text found in " + b(row))
        text = newsPreclean(text)
        isGood = isGoodArticle(text)
        rawText = text
        (text, tokens) = newsParser.parseNews(rawText, logger=logger, verbose=verbose)
        return (rawText, text, tokens, isGood)
    except Exception as e:
        logException(e, logger, verbose=verbose)

In [None]:
def newsGenFunct(containers, logger=None, verbose=True):
    if not isinstance(containers, list):
        containers = [containers]
    for container in containers:
        for row in NDJson(container, logger=logger, verbose=verbose):
            
            current = dictSelect(row, {'domain', 'lastUrlDomain', 'url', 'title', 'redirected', 'lastUrl'})
            scrap = row['scrap']
            current["title"] = preprocess(current["title"], doRemoveUrls=True, unescapeHtml=True,
                                  removeHtml=True, doQuoteNormalization=True,
                                  doReduceBlank=True, keepNewLines=False, logger=logger)
            (rawText, text, sentences, isGood) = preprocessNews(scrap, logger=logger)
            current["rawText"] = rawText
            current["text"] = text
            current["sentences"] = sentences
            # current["isGoodArticle"] = isGood
            if isGood and text is not None and rawText is not None and sentences is not None and len(sentences) > 0:
                yield current
            else:
                yield None

In [None]:
newsFilesChunks = chunks(newsFiles, int(len(shuffle(newsFiles)) / (cpuCount() * 16)))
bp(newsFilesChunks, logger)

In [None]:
# We add all news (22 mins for the v1 dataset):
mli = MLIterator(newsFilesChunks, newsGenFunct, logger=logger, parallelProcesses=cpuCount(), printRatio=0.001)
notGoodCount = 0
totalCount = 0
duplicatesCount = 0
hashes = set()
for row in mli:
    if row is None:
        notGoodCount += 1
    else:
        h = objectToHash(row["text"])
        if h in hashes:
            duplicatesCount += 1
        hashes.add(h)
        newsCollection.insert(row)
    totalCount += 1
log("We removed " + str(int(notGoodCount / totalCount * 100)) + "% of news.", logger)
log("Count of duplicates: " + str(duplicatesCount), logger)
log("% of duplicates: " + str(duplicatesCount / totalCount * 100), logger)

In [None]:
# We construct a strucutre: hash of the text --> url
newsToKeepByHash = dict()
for _id, row in newsCollection.items():
    h = objectToHash(row["text"])
    newsToKeepByHash[h] = row['url']
newsToKeep = set(newsToKeepByHash.values())
bp(newsToKeepByHash, logger)
assert len(newsToKeep) == len(newsToKeepByHash)
# We construct a structure: the duplicate --> the reference news to take into account
duplicates = dict()
for _id, row in newsCollection.items():
    h = objectToHash(row["text"])
    if newsToKeepByHash[h] != row['url']:
        duplicates[row['url']] = newsToKeepByHash[h]
bp(duplicates, logger)
# We remove duplicates in news:
for dup in duplicates:
    newsCollection.delete({"url": dup})

In [None]:
tt.tic("News indexed")

# Users

In [None]:
def usersGenFunct(containers, maxUsersPerContainer=None, logger=None, verbose=True):
    if not isinstance(containers, list):
        containers = [containers]
    for container in containers:
        usersCount = 0
        for row in NDJson(container, logger=logger, verbose=verbose):
            try:
                assert dictContains(row, "tweets")
                # We handle news:
                currentNews = dict()
                for tweet in row["tweets"]:
                    for n in tweet['news']:
                        currentNews[n] = tweet['timestamp']
                newCurrentNews = dict()
                for n, ts in currentNews.items():
                    if n in duplicates:
                        n = duplicates[n]
                    if n in newsToKeep:
                        newCurrentNews[n] = ts
                newCurrentNews = sortBy(newCurrentNews, index=1)
                row["news"] = [e[0] for e in newCurrentNews]
                row["timestamps"] = [e[1] for e in newCurrentNews]
                # We parse tweets:
                for tweet in row["tweets"]:
                    if dictContains(tweet, "text"):
                        tweet["text"] = preprocess\
                        (
                            tweet["text"], logger=logger,
                            doQuoteNormalization=True,
                            doReduceBlank=True,
                            keepNewLines=True,
                            stripAccents=True,
                            doRemoveUrls=True,
                            doLower=False,
                            doBadlyEncoded=True,
                            doReduceCharSequences=True,
                            charSequencesMaxLength=3,
                            replaceUnknownChars=True,
                            unknownReplacer=" ",
                            doSpecialMap=True,
                            doNormalizeEmojis=True,
                            doTokenizingHelp=True,
                        )
                yield row
                usersCount += 1
                if maxUsersPerContainer is not None and usersCount >= maxUsersPerContainer:
                    break
            except Exception as e:
                logException(e, logger)

In [None]:
mli = MLIterator\
(
    usersFiles, usersGenFunct,
    genKwargs={"maxUsersPerContainer": 30 if TEST else None},
    logger=logger, parallelProcesses=cpuCount(),
    printRatio=0.001,
)
for row in mli:
    usersCollection.insert(row)

In [None]:
tt.tic("Users indexed")

# Adding users references in news

In [None]:
# We iterate all users to make the newsUsersMapping:
newsUsersMapping = dict()
ids = usersCollection.distinct("user_id")
for userId in pb(ids, logger=logger, message="Finding all users by news"):
    data = usersCollection.findOne({"user_id": userId})
    for i in range(len(data["news"])):
        news = data["news"][i]
        ts = data["timestamps"][i]
        if news not in newsUsersMapping:
            newsUsersMapping[news] = dict()
        newsUsersMapping[news][userId] = ts
bp(newsUsersMapping, logger)
tt.tic("Users by news collected")

In [None]:
countOfGreaterThan1 = 0
total = 0
for url, users in newsUsersMapping.items():
    if len(users) > 1:
        # log(url + " --> " + b(users), logger)
        countOfGreaterThan1 += 1
    total += len(users)

In [None]:
log("total: " + str(total), logger)
log("countOfGreaterThan1: " + str(countOfGreaterThan1), logger)
log("Mean users amount per news: " + str(total / len(newsUsersMapping)), logger)

In [None]:
# We insert the newsUsersMapping:
for url, users in pb(newsUsersMapping.items(), logger=logger, message="Inserting user list in all news rows"):
    users = sortBy(users, index=1)
    newsCollection.updateOne({"url": url},
    {
        "$set":
        {
            "users": [e[0] for e in users],
            "timestamps": [e[1] for e in users]
        }
    })
tt.tic("newsUsersMapping inserted")

# Deleting news having no users

In [None]:
# Searching news that has no users:
toDeleteNews = []
for _id, row in newsCollection.items():
    if not dictContains(row, 'users') or len(row['users']) == 0:
        toDeleteNews.append(row['url'])
bp(toDeleteNews, logger)
log(str(len(toDeleteNews) / len(newsCollection) * 100) + " % of news to delete because no user shared it.", logger)

In [None]:
# Deleting news:
for url in toDeleteNews:
    newsCollection.delete({'url': url})

In [None]:
tt.tic("News having no users deleted")

# Adding minTimestamp and maxTimestamp

In [None]:
newsMinTimestamp = dict()
newsMaxTimestamp = dict()
for news in newsCollection.find({}, projection={'url': True, 'timestamps': True}):
    newsMinTimestamp[news['url']] = news['timestamps'][0]
    newsMaxTimestamp[news['url']] = news['timestamps'][-1]

In [None]:
for url in newsMinTimestamp.keys():
    newsCollection.updateOne({'url': url},
                    {'$set': {'minTimestamp': newsMinTimestamp[url],
                              'maxTimestamp': newsMaxTimestamp[url]}})

In [None]:
usersMinTimestamp = dict()
usersMaxTimestamp = dict()
for user in usersCollection.find({}, projection={'user_id': True, 'timestamps': True}):
    if dictContains(user, 'timestamps') and len(user['timestamps']) > 0:
        usersMinTimestamp[user['user_id']] = user['timestamps'][0]
        usersMaxTimestamp[user['user_id']] = user['timestamps'][-1]

In [None]:
bp(usersMinTimestamp)

In [None]:
for userId in pb(list(usersMinTimestamp.keys()), logger=logger):
    usersCollection.updateOne({'user_id': userId},
                    {'$set': {'minTimestamp': usersMinTimestamp[userId],
                              'maxTimestamp': usersMaxTimestamp[userId]}})

In [None]:
tt.tic("minTimestamp and maxTimestamp added")

# Deleting users having no news

In [None]:
toDeleteUsers = []
for user in usersCollection.find({}, projection={'user_id': True, 'news': True}):
    if not dictContains(user, 'news') or len(user['news']) == 0:
        toDeleteUsers.append(user['user_id'])
bp(toDeleteUsers, logger)
log(str(len(toDeleteUsers) / len(usersCollection) * 100) + " % of users to delete because no news in.", logger)

In [None]:
# Deleting news:
for userId in toDeleteUsers:
    usersCollection.delete({'user_id': userId})

In [None]:
tt.tic("News having no users deleted")

# End

In [None]:
tt.toc()