In [258]:
import pandas as pd
import numpy as np
import glob
import time
runflag=1
insertflag=1
NULL_INT=-1
single_record=-10 # index of record
max_num_records=120#00000
not_applieds={}
data_dir="workshop_dataset/workshop_dataset1/";

In [259]:
all_files = glob.glob(data_dir+"*.json")
file=all_files[0]

In [260]:
# Test the data for consistencies and make it importable
if(runflag == 1):
    total_files=0
    df_shapes=[]
    all_records = pd.DataFrame()
    time_taken = 0
    prev_time = time.time()
    for filename in all_files:
        df=pd.read_json(filename,orient='index',convert_dates=False,convert_axes=False,dtype={"tid": int})            
        df_shapes.append(df.shape)
        curr_set = set(df)
        total_files+=1
        if(total_files==1):
            all_records=df
        else:
            all_records = pd.concat([all_records,df])
            if(prev_set != curr_set):
                runflag=1
                print(filename, "New columns found!", curr_set - prev_set)
                break
        runflag=0
        if(all_records.shape[0] > max_num_records):
            break
        prev_set = curr_set
    time_taken= time.time()-prev_time

    # Rename columns that are keywords
    all_records = all_records.rename(columns={'date':'tweet_date','datetime':'tweet_datetime'})
    all_records['verified']=all_records['verified'].astype(bool)
    all_records['tid']=all_records['tid'].astype(int)
    all_records['quoted_source_id']=all_records['quoted_source_id'].fillna(NULL_INT).astype(int)
    all_records['replyto_source_id']=all_records['replyto_source_id'].fillna(NULL_INT).astype(int)
    all_records['retweet_source_id']=all_records['retweet_source_id'].fillna(NULL_INT).astype(int)
    

total_records= sum([ x for x,y in df_shapes])
total_records2= all_records.shape[0]
if(total_records!=total_records2):
    print("Error: Something's wrong with dataset, total counts not matching!",total_records,total_records2)
if(runflag==0):
    print("Done scanning %d file(s) with total of %d records(%.2f records/file), in %.2f seconds(%.2f files/sec)" %
          (total_files, total_records, float(total_records)/total_files,time_taken, float(total_files)/time_taken))


Done scanning 1 file(s) with total of 1000 records(1000.00 records/file), in 0.12 seconds(8.46 files/sec)


In [297]:
labels={
'Nodes':{
    #first col has to be id
    'Direct':{
        'Author':[ 'author_id', 'author', 'author_screen_name', 'author_profile_image' ], 
        'Tweet':[ 'tid', 'type', 'tweet_text', 'quote_count', 'reply_count', 'like_count', 'sentiment',
                 'retweet_count', 'tweet_date', 'tweet_datetime', 'verified'], 
        'Location':[ 'location', ], 
        },
    'Calc':{
        'Media':[ 'media_id', 'media_type', 'display_url', 'media_url', ], 
        'Url':[ 'url', ], 
        'Keyword':[ 'keyword', ], 
        'Hashtag':[ 'hashtag', ],
        }
    },
'Relations':{
    # First is start ID, second the end ID (Second column if from Tweet will be checked for -1)
    'Direct':{
        'Tweet_Location_AT': ['tid', 'location'],    
#         'Tweet_Tweet_QUOTED':['tid','quoted_source_id'],
#         'Tweet_Tweet_RETWEET_OF':['tid','retweet_source_id'],
#         'Tweet_Tweet_REPLY_TO':['tid','replyto_source_id'],
        'Author_Tweet_WROTE': ['author_id','tid'],
        },
    'Calc':{
        # special loops on things    
        'Tweet_Author_HAS_MENTION': ['tid','mention_author_id'],
        'Tweet_Keyword_HAS_KEYWORD':['tid','keyword'],
        'Tweet_Url_HAS_URL':['tid','url'],
        'Tweet_Hashtag_HAS_HASHTAG':['tid','hashtag'],
        'Tweet_Media_HAS_MEDIA':['tid','media_id'],
        }
    } 
}
calc_col_to_list={
    'Keyword':'keywords_processed_list',
    'Media' : 'media_list',
    'Hashtag' : 'hashtags',
    'Url' : 'url_list'
}

In [328]:
table_records={}
def create_or_add(labelName,x,cols):
    if labelName not in table_records:
        table_records[labelName]=pd.DataFrame(columns=cols)
    table_records[labelName]=table_records[labelName].append(x)
    
all_records_tail = all_records.tail(max_num_records)
all_columns = all_records_tail.columns.values # or list(df)

for idx,row in all_records_tail.iterrows():
    for t,t_dict in labels['Nodes'].items():
        for label,cols in t_dict.items():
            if(t=='Direct'):
                if row[cols[0]] is not None:
                    create_or_add(label,row[cols],cols)
            else:
                list_entry =row[calc_col_to_list[label]]
                if list_entry is not None:
                    if(label=="Media"):
                            for m_item in list_entry.values():
                                m_item_list = [ m_item[c] for c in cols]
                                create_or_add(label,pd.DataFrame([m_item_list],columns=cols),cols)
                    else:
                        for l_item in list_entry:
                            #merge
                            create_or_add(label,pd.DataFrame([l_item],columns=cols),cols)

                
    for t,t_dict in labels['Relations'].items():
        for label,cols in t_dict.items():
            table1,table2,relname=label.split('_',maxsplit=2)
            if not (table2=='Tweet' and row[cols[1]]== NULL_INT):
                if(t=='Direct'):
                    create_or_add(relname,row[cols],cols)
# table_records                            
# insert_records = all_records_tail.as_matrix()


In [329]:
def clean(x):
    #neo4j-import doesn't support: multiline (coming soon), quotes next to each other and escape quotes with '\""'
    return x.replace('\n','').replace('\r','').replace('\\','').replace('"','')


In [330]:
for t,t_dict in labels['Nodes'].items():
    for label,cols in t_dict.items():
        if label in table_records:
            table_records[label]=table_records[label].drop_duplicates()
            
            if(label=="Tweet"):
                table_records[label]['tweet_text']=table_records[label]['tweet_text'].apply(clean)
            if(label=="Author"):
                table_records[label]['author']=table_records[label]['author'].apply(clean)
                table_records[label]['author_screen_name']=table_records[label]['author_screen_name'].apply(clean)
            #Add :INT to cols
#             table_records[label].to_csv('csvs/'+label+'.csv',index=False,header=[cols[0]+":ID("+label+")"]+cols[1:])
# Label names will be given in the shell command

In [333]:
# for t,t_dict in labels['Relations'].items():
#     for label,cols in t_dict.items():
#         table1,table2,relname=label.split('_',maxsplit=2)
#         if relname in table_records:
#             table_records[relname].to_csv('csvs/'+relname+'.csv',index=False,header=[":START_ID("+table1+")",":END_ID("+table2+")"]+cols[2:])


In [346]:
from py2neo import Graph, Node, Relationship
g = Graph()
#clean the graph
insert_nodes=1
insert_rels=1

{}

In [351]:
import time
prev_time= time.time()
g.delete_all()
tx=graph.begin()
media_cols=[ 'media_id', 'media_type', 'display_url', 'media_url', ]
for index, row in all_records.iterrows():
    params=row.to_dict()
    
    m_item_list=[]
    if(params['media_list'] is not None):
        for r in params['media_list'].values():
            m_item_list.append([ m_item[c] for c in media_cols])
    params['list_media_items']=m_item_list
    print(m_item_list)
    statement="""
      MERGE (p:Author {author_id: {author_id}})
          REMOVE p:Shadow
          SET p.author = {author},
              p.author_screen_name = {author_screen_name},
              p.author_profile_image = {author_profile_image}
          
      MERGE (t:Tweet {tid: {tid}})
          REMOVE p:Shadow
          SET t.type = {type},
              t.tweet_date = {tweet_date},
              t.tweet_datetime = {tweet_datetime},
              t.tweet_text = {tweet_text},
              t.quote_count = {quote_count},
              t.reply_count = {reply_count},
              t.like_count = {like_count},
              t.sentiment = {sentiment},
              t.retweet_count = {retweet_count},
              t.verified = {verified},
              t.lang = {lang},
              t.like_count = {like_count}
      MERGE (p)-[:WROTE]->(t)
       
      FOREACH(locn in CASE WHEN NOT {location} is null THEN[1] ELSE [] END |        
        MERGE (loc:Location {location: {location}})
        MERGE (t)-[:AT]->(loc)
      )            

      FOREACH( htag IN {hashtags} |
        MERGE (htg:Hashtag {hashtag: htag})
        ON CREATE SET htg:Shadow
        MERGE (t)-[:HAS_HASHTAG]->(htg)
      ) 
      
      FOREACH( mention IN {mentions} |
        MERGE (mnt_node:Mention {mention: mention})
        ON CREATE SET mnt_node:Shadow
        MERGE (t)-[:HAS_MENTION]->(mnt_node)
      )

      FOREACH( u IN CASE WHEN NOT {url_list} is null THEN [1] ELSE [] END |
        MERGE (url_node:Url {url: u})
        ON CREATE SET url_node:Shadow
        MERGE (t)-[:HAS_URL]->(url_node)
      )

      FOREACH( k IN CASE WHEN NOT {keywords_processed_list} is null THEN [1] ELSE [] END |
        MERGE (keyword_node:Keyword {keyword: k})
        ON CREATE SET keyword_node:Shadow
        MERGE (t)-[:HAS_KEYWORD]->(keyword_node)
      )

    """
    tx.append(statement, params)
    tx.process()
tx.commit()
time_nodes=time.time()-prev_time
print("Time",time_nodes)

[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg'], ['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe'




[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg'], ['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg'], ['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg'], ['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[['956226637414223872', 'photo', 'pic.twitter.com/9Nnx36fZqe', 'https://pbs.twimg.com/media/DUUzD3fXUAA3KDJ.jpg']

In [342]:
# import time
# prev_time= time.time()
# insert_nodes=0

# if(insert_nodes==1):
#     insert_nodes=0
#     for t,t_dict in labels['Nodes'].items():
#         for label,cols in t_dict.items():
#             if label in table_records:
#                 node_data=""
#                 for i,c in enumerate(cols):
#                     if(i>0):
#                         node_data += ","
#                     node_data += c+":{"+c+"}"
#                 for idx, row in table_records[label].iterrows():
#                     g.run("MERGE (:"+label+" { "+node_data+" } )",row.to_dict())

#     time_nodes=time.time()-prev_time
# print("Time for creating Nodes",time_nodes)

# insert_rels=1

# prev_time= time.time()
# if(insert_rels==1):
#     insert_rels=0
#     for t,t_dict in labels['Relations'].items():
#         for idx,row in table_records['WROTE'].iterrows():
#             print(row)
#             break
#     time_rels=time.time()-prev_time

# print("Time for creating Relations",time_rels)

# # ab = Relationship(a, "KNOWS", b)
# # tx.merge(ab)
# # tx.commit()
# # g.exists(ab)

author_id             232532394
tid          956677973742051329
Name: 956677973742051329, dtype: object


In [28]:
uniq_constraints={
    'Author':["author_id","author_screen_name","author_profile_image"],
    'Tweet': ["tid",],
    'Media':[ 'media_id',],
    'Url':[ 'url', ], 
    'Keyword':[ 'keyword', ], 
    'Location':[ 'location', ], 
    'Hashtag':[ 'hashtag', ] 
}
for k,v in uniq_constraints.items():
    for attr in v:
        print("CREATE CONSTRAINT ON ({0}:{0}) ASSERT {0}.{1} IS UNIQUE".format(k,attr))
        

## Create CSVs:


## Insert data:        


# Adding indices may take time. And make inserts slow. Hence do after inserts    
indices={
    #Wherever we gonna search
    'Author':['author_screen_name'],
    'Tweet' : ['quote_count', 'reply_count', 'like_count','retweet_count','date','datetime',],
    'Location':[ 'location', ], 
    'Hashtag':[ 'hashtag', ] ,
    # 'Url':[ 'url', ], 
    # 'Keyword':[ 'keyword', ], 
}
for k,v in indices.items():
    for attr in v:
        print("CREATE INDEX ON :{0}({1});".format(k,attr))
        

CREATE CONSTRAINT ON (Author:Author) ASSERT Author.author_id IS UNIQUE
CREATE CONSTRAINT ON (Author:Author) ASSERT Author.author_screen_name IS UNIQUE
CREATE CONSTRAINT ON (Author:Author) ASSERT Author.author_profile_image IS UNIQUE
CREATE CONSTRAINT ON (Tweet:Tweet) ASSERT Tweet.tid IS UNIQUE
CREATE CONSTRAINT ON (Media:Media) ASSERT Media.media_id IS UNIQUE
CREATE CONSTRAINT ON (Url:Url) ASSERT Url.url IS UNIQUE
CREATE CONSTRAINT ON (Keyword:Keyword) ASSERT Keyword.keyword IS UNIQUE
CREATE CONSTRAINT ON (Location:Location) ASSERT Location.location IS UNIQUE
CREATE CONSTRAINT ON (Hashtag:Hashtag) ASSERT Hashtag.hashtag IS UNIQUE
CREATE INDEX ON :Author(author_screen_name);
CREATE INDEX ON :Tweet(quote_count);
CREATE INDEX ON :Tweet(reply_count);
CREATE INDEX ON :Tweet(like_count);
CREATE INDEX ON :Tweet(retweet_count);
CREATE INDEX ON :Tweet(date);
CREATE INDEX ON :Tweet(datetime);
CREATE INDEX ON :Location(location);
CREATE INDEX ON :Hashtag(hashtag);


In [None]:
"""
https://neo4j.com/docs/operations-manual/current/tools/import/file-header-format/#import-tool-id-spaces
'Post' is an ID-space here-

posts.writerow(['postId:ID(Post)', 'title', 'body','score','views','comments'])
posts_rel.writerow([':START_ID(Post)', ':END_ID(Post)'])
tags_posts_rel.writerow([':START_ID(Post)', ':END_ID(Tag)'])


"""

In [None]:
"""
tweets = json.loads(file.read())
# Convert timestamps 
# 

    for tweet in tweets:
        created_at = calendar.timegm(parser.parse(tweet["created_at"]).timetuple())

        params = {
            "tweetId": tweet["id"],
            "createdAt": created_at,
            "text": tweet["text"],
            "userId": tweet["user"]["id"],
            "inReplyToTweetId": tweet["in_reply_to_status_id"],
            "userMentions": [user for user in tweet["entities"]["user_mentions"]],
            "urls": [url for url in tweet["entities"]["urls"]]
        }

        statement = 
                    MERGE (tweet:Tweet {id: {tweetId}})
                    SET tweet.text = {text}, tweet.timestamp = {createdAt}
                    REMOVE tweet:Shadow
                    WITH tweet
                    MATCH (person:Person {twitterId: {userId}})
                    MERGE (person)-[:TWEETED]->(tweet)
                    WITH tweet

                    FOREACH(user in {userMentions} |
                        MERGE (mentionedUser:Person {twitterId: user.id})
                        SET mentionedUser.screenName = user.screen_name
                        MERGE (tweet)-[:MENTIONED_USER]->(mentionedUser)
                    )

                    FOREACH(url in {urls} |
                        MERGE (u:URL {value: url.expanded_url})
                        MERGE (tweet)-[:MENTIONED_URL]->(u)
                    )

                    FOREACH(ignoreMe in CASE WHEN NOT {inReplyToTweetId} is null THEN [1] ELSE [] END |
                        MERGE (inReplyToTweet:Tweet {id: {inReplyToTweetId}})
                        ON CREATE SET inReplyToTweet:Shadow
                        MERGE (tweet)-[:IN_REPLY_TO_TWEET]->(inReplyToTweet)
                    )
                    """
#         tx.append(statement, params)
#         tx.process()
# tx.commit()

In [None]:

    query_string = """
        MERGE (u:User {id_str:{id_str}}) 
        ON CREATE SET
"""+   (('u:'+',u:'.join(labels)+",") if labels else '') +"""
            u.name={name},
            u.screen_name={screen_name},
            u.description={description},
            u.url={url},
            u.followers_count={followers_count},
            u.friends_count={friends_count},
            u.listed_count={listed_count},
            u.statuses_count={statuses_count},
            u.favourites_count={favourites_count},
            u.location={location},
            u.time_zone={time_zone},
            u.utc_offset={utc_offset},
            u.lang={lang},
            u.profile_image_url={profile_image_url},
            u.geo_enabled={geo_enabled},
            u.verified={verified},
            u.notifications={notifications}
""" +   (("ON MATCH SET\n  u:"+',u:'.join(labels)) if labels else '') +"""
        RETURN u
    """
n=neo4j.CypherQuery(graph_db,query_string).execute_one(**data)
"""

In [None]:

# create index on :Post(title);
# create index on :Post(score);
# create index on :Post(views);
# create index on :Post(favorites);
# create index on :Post(answers);
# create index on :Post(score);

# create index on :User(name);
# create index on :User(createdAt);
# create index on :User(reputation);
# create index on :User(age);



In [None]:

    neo4j.CypherQuery(graph_db,"""
        CREATE CONSTRAINT ON (u:User) 
        ASSERT u.id_str IS UNIQUE
    """).execute()

In [253]:
from py2neo import Node, Relationship
a = Node("Person", name="Alice")
b = Node("Person", name="Bob")
ab = Relationship(a, "KNOWS", b)
ab

(alice)-[:KNOWS]->(bob)