# Document to Graph database processing

This project processes data from twitter stream to a graph.
The goal of this project is to analyze the interaction between people on Twitter by analyzing their discussion.

This project takes data from a document MongoDB database and process it into a graph Neo4J database.
Each nodes of the graph contains one user and each relation are one sided.
There is only one type of relation : (user1) --- speaksOf ---> (user2)
Each user has a weight which is the number of time one user spoke about him
Each relation has a weight that represent how many time user1 spoke about user2

Imports pymongo and neo4j

In [19]:
import pymongo
from neo4j import GraphDatabase, AsyncGraphDatabase
import re

MongoDB configuration (change with your own config)

In [20]:
client = pymongo.MongoClient("mongodb://localhost:27017")

Neo4j configuration (change with your own config)

In [None]:
URI = "bolt://localhost:7689"
AUTH = ("neo4j", "12341234")

MongoDB Variables (change with your own config)

In [None]:
# Data collected from the nodejs project that read the twitter stream
# I used the 1% stream (which is free) to get the data through the Twitter Developer Portal

# database (change with database name)
db = client["twitter"]
# data of tweets collected
# Require at least the "in_reply_to_user_id" and the "text" field
# Optionally the "lang" field
tweets = db["tweets"]
# data of users
# Are obtainable with specific argument while reading the Twitter Stream
users = db["users"]
# Not used but I collected in relation tweets of the one I collected
# subs = db["tweetsSub"]

MongoDB functions

In [21]:
# Fetches the username from the user Id
def get_username_from_id(id):
    return users.find_one({"id" : id})["username"]

# Fetches the username mentionned in the text field of a tweet
def get_usernames_from_text(text):
    result = re.findall("@[a-zA-Z]*", text)
    rep = lambda e : e.replace("@", "")
    return map(rep, result)

# Fetches the username of the user that sent the tweet
def get_from_user (tweet):
    return get_username_from_id(tweet["author_id"])

# Fetches the usernames of the users that were mentioned in the tweet
def get_to_users(tweet) :
    return get_usernames_from_text(tweet["text"])

Neo4J functions

In [23]:
# Returns the weight of an user
def get_weight(tx, username):
    result = tx.run("MATCH (a:User) WHERE a.name = $name RETURN a.weight", name=username)
    single = result.single()
    if single is None : return None
    return single[0]

# Checks if an user already exists in the database
def exists(tx, username):
    return get_weight(tx, username) is not None

# Adds an user if it doesn't exist
# Updates if it already exists
def add_user(tx, username, lang = None):
    result =  tx.run(
        "MERGE (a:User {name: $name}) " +
        "ON CREATE " +
        "   SET a = {name: $name, weight : 0, lang : %s}" % ("null" if lang is None else "$lang")  +
        "ON MATCH " +
        "   SET a = {name: $name, weight : a.weight, lang : %s}" % ("a.lang" if lang is None else "$lang")  +
        "RETURN a.name, a.weight, a.lang",
        name = username,
        lang = lang
    )
    return result

# Adds a relation between to users
# Updates it if the realation already exists
def add_relation(tx, fr, to):
    result = tx.run(
        "MATCH (a:User), (b:User) " +
        "WHERE a.name = $a AND b.name = $b " +
        "MERGE (a)-[r:speaksOf]->(b) " +
        "   ON CREATE " +
        "       SET r.weight = 1, b.weight = b.weight + 1 " +
        "   ON MATCH " +
        "       SET r.weight = r.weight + 1, b.weight = b.weight + 1 " +
        "RETURN type(r) ",
        a = fr,
        b = to
    )
    return result

# Process a tweet into the Neo4J Database following the popularity algorithm
# In this function, we create users with a weight of 0 and each time they are mentionned, their weight increase by 1.
# Same thing happens for the relation. They are created with a weight of one and each time an user speaks to an user, this relations increase by 1.
async def process_user_popularity(tweet):
    fr = get_from_user(tweet)
    lang = tweet["lang"]
    to = get_to_users(tweet)
    async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
        async with driver.session() as session :
            await session.execute_write(add_user, fr, lang)
            for t in to :
                await session.execute_write(add_user, t)
                await session.execute_write(add_relation, fr, t)

Fetches the tweets which are interacting directly with one or many other users (can be with himself too. I did not make an exception on it).


In [24]:
tweetsToProcess = tweets.find({
    "$or" : [
        {"text" : { "$regex" : "^RT @"}},
        {"in_reply_to_user_id" : {"$exists" : True}}
    ]
})
maxCount = tweetsToProcess.count()

  maxCount = tweetsToProcess.count()


Creates the Graph database.

In [25]:
count = 0
last_count = 158592
# Process fails after a certain period of time (I don't know why)
# @TODO: Resuming automatically this process could be done...
for t in tweetsToProcess :
    if count > 0 : # Change the value to resume the process to the desired value
        await process_user_popularity(t)
    count += 1
    # Prints progress every 100 tweets processed
    if count % 100 == 0 :
        percent = count / maxCount * 100
        print (f"Processed tweet #{count} on {maxCount} ~ {percent:.2f}% done ")

Processed tweet #100 on 7334404 ~ 0.00% done 
Processed tweet #200 on 7334404 ~ 0.00% done 
Processed tweet #300 on 7334404 ~ 0.00% done 
Processed tweet #400 on 7334404 ~ 0.01% done 
Processed tweet #500 on 7334404 ~ 0.01% done 
Processed tweet #600 on 7334404 ~ 0.01% done 
Processed tweet #700 on 7334404 ~ 0.01% done 
Processed tweet #800 on 7334404 ~ 0.01% done 
Processed tweet #900 on 7334404 ~ 0.01% done 
Processed tweet #1000 on 7334404 ~ 0.01% done 
Processed tweet #1100 on 7334404 ~ 0.01% done 
Processed tweet #1200 on 7334404 ~ 0.02% done 
Processed tweet #1300 on 7334404 ~ 0.02% done 
Processed tweet #1400 on 7334404 ~ 0.02% done 
Processed tweet #1500 on 7334404 ~ 0.02% done 
Processed tweet #1600 on 7334404 ~ 0.02% done 
Processed tweet #1700 on 7334404 ~ 0.02% done 
Processed tweet #1800 on 7334404 ~ 0.02% done 
Processed tweet #1900 on 7334404 ~ 0.03% done 
Processed tweet #2000 on 7334404 ~ 0.03% done 
Processed tweet #2100 on 7334404 ~ 0.03% done 
Processed tweet #2200 

CursorNotFound: cursor id 3281865858233297188 not found, full error: {'ok': 0.0, 'errmsg': 'cursor id 3281865858233297188 not found', 'code': 43, 'codeName': 'CursorNotFound'}

In [26]:
count


158592