In [1]:
from optimus import Optimus
from IPython.display import display
import tweepy
import nest_asyncio
import sqlite3
import time 
from pyspark.sql import SparkSession
import ipyparallel as ipp
nest_asyncio.apply()

In [2]:
spark = SparkSession.builder.appName("jaccard").getOrCreate()
dbname = "onpoli_graph.db"
conn = sqlite3.connect(dbname)

In [10]:
#   ****JACARD INDEX FUNCTION****
#
#   The commented out lines finds union 
#   directly from the Spark DataFrames. It doesn't work because 
#   data corruption occurs when loading the SQLite db into Spark
#   via JDBC--some ids are negative and unsearchable. It could 
#   have to do with the fact that when populating the sqlite db 
#   the 'bio' data was not cleaned (removal of emojis etc.) and 
#   this caused some unforseen bugs. 
#
#   To compensate there is code that fetches the union data straight 
#   from the SQLite db. This ISN'T expensive because the union
#   of followers between two pillar members are simply the total
#   sum of followers between them minus the intersect (which is 
#   expensive to calculate hence Spark). However the sum is already 
#   partially stored in the pillars TABLE, granted they
#   still need to be added. 
#
#   To further imporve speeds I tried using multiprocessing
#   and ipyparallel to call this function but unfortunately I ran 
#   into too many problems.

def jaccard(id_1, id_2, df_mega, f_count):
    start = time.time()
    intersect = df_mega.filter(df_mega.following == id_1)\
        .select("id")\
        .intersect(df_mega.filter(df_mega.following == id_2)\
        .select("id"))\
        .count()
    union = f_count.get(id_1) + f_count.get(id_2) - intersect
    print(time.time()-start)
    
    return intersect/union, intersect

    # f_count1 = df_pillars.filter(df_pillars.id == id_1)\
    #     .select("followers_count").collect()[0][0]
    # f_count2 = df_pillars.filter(df_pillars.id == id_2)\
    #     .select("followers_count").collect()[0][0]
    # union = f_count1 + f_count2 - intersect

In [11]:
#   Loads Dataframe into spark

df_mega = spark.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC',
                 dbtable='mega_ids',
                 url="jdbc:sqlite:C:/Users/Daniel/ProjTwit/onpoli_graph.db",
                 numPartitions = 1,
                 )\
        .load()

#   Don't need this df, but it's nice to have if you want to see the data 
#   Also usable if the dataframe's data wasn't corrupt

# df_pillars = spark.read.format('jdbc') \
#         .options(driver='org.sqlite.JDBC', dbtable='pillars',
#                  url="jdbc:sqlite:C:/Users/Daniel/ProjTwit/onpoli_graph.db",
#                  numPartitions = 1)\
#         .load()

In [12]:
with conn:
#   Code fetches union data directly from SQLite table
    cursor = conn.cursor()
    query = "SELECT id FROM pillars"
    cursor.execute(query)
    keys = [x[0] for x in cursor.fetchall()]
    query = "SELECT followers_count FROM pillars"
    cursor.execute(query)
    vals = [x[0] for x in cursor.fetchall()]
    followers_count = dict(zip(keys, vals))

#   This is the only way to sequentially calculate Jaccard
#   indexes between each and every node. 
#   It runs in (O(n)^2-O(n))/2 time--which is clear
#   by the nature of the forloops
    count=0
    start_big = time.time()
    for i in range(len(keys)):
        node = keys[i]
        for k in range(i+1, len(keys)):
            count += 1
            node2 = keys[k]
            start = time.time()
            jaccard_i, intersect = jaccard(
                node,
                node2, 
                df_mega, 
                followers_count
            )
            query = "INSERT OR IGNORE INTO jaccard_edge VALUES(?,?,?)"
            entry = (node, node2, jaccard_i)
            cursor.execute(query, entry)
            conn.commit()
            query = "INSERT OR IGNORE INTO follower_intersect VALUES(?,?,?)"
            entry = (node, node2, intersect)
            cursor.execute(query, entry)
            conn.commit()
            duration = time.time()- start
            print(f"{count}---{node}:{node2} took {duration}")
    
    dur = time.time() - start_big
    print(f"total time {dur}")
    


3.7074451446533203
1---14216661:14260108 took 3.7084436416625977


TypeError: 'float' object is not callable