This notebook seeks to explore the gender diversity of the different apache projects & the process

In [2]:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import concat, collect_set, explode, from_json, format_string
from pyspark.sql.types import *

import json
import os
import meetup.api
from copy import copy
import time
import logging

from watson_developer_cloud import ToneAnalyzerV3

API key configuration

In [3]:
meetup_key = os.getenv("MEETUP_APIKEY")
tone_bluemix_user = os.getenv("TONE_BLUEMIX_USERNAME")
tone_bluemix_password = os.getenv("TONE_BLUEMIX_PASSWORD")

Less secret configuration

In [4]:
max_meetup_events = 400

In [5]:
session = SparkSession.builder.appName("whatCanWeLearnFromTheSixties").getOrCreate()
sc = session.sparkContext

The first thing we want to get is the committers and PMC members, this information is stored in LDAP but also available in JSON. Eventually we will want to enrich this with mailing list information

In [6]:
def loadFlatJsonFile(path, explodeKey, schema=None):
    """Load a flat multi-line json file and convert into Spark & explode"""
    rdd = sc.wholeTextFiles(path).values()
    df = (session.read.schema(schema)
            .json(rdd))
    return df.select(explode(explodeKey))

In [7]:
apache_people_schema = StructType([StructField("lastCreateTimestamp", StringType()),
                     StructField("people",
                                 MapType(StringType(), 
                                         StructType([StructField('name', StringType()),
                                                     StructField('key_fingerprints', ArrayType(StringType())),
                                                     StructField('urls', ArrayType(StringType())),
                                                    ]))
                                )])
apache_people_df = loadFlatJsonFile(path="http_data_sources/public_ldap_people.json", # http://people.apache.org/public/public_ldap_people.json
                                 explodeKey="people", schema=apache_people_schema)
apache_people_df = apache_people_df.select(apache_people_df.key.alias("username"), apache_people_df.value.alias("extra"))

In [8]:
# Construct a lazy urllib3 pool
from lazy_helpers import *
    
bcast_pool = sc.broadcast(LazyPool)
bcast_pool.value

lazy_helpers.LazyPool

In [9]:
def project_on_github(project):
    """Returns if a project is on github"""
    import urllib3
    http = bcast_pool.value.get()
    r = http.request('GET', "https://github.com/apache/{0}".format(project))
    return r.status == 200
session.catalog.registerFunction("on_github", project_on_github, BooleanType())
# Except I'm a bad person so....
from pyspark.sql.catalog import UserDefinedFunction
project_on_github_udf = UserDefinedFunction(project_on_github, BooleanType(), "on_github")
session.catalog._jsparkSession.udf().registerPython("on_github", project_on_github_udf._judf)

In [10]:
apache_committees_schema = StructType([StructField("lastCreateTimestamp", StringType()),
                     StructField("committees",
                                 MapType(StringType(), StructType([StructField('roster', ArrayType(StringType())),
                                                                  StructField('modifyTimestamp', StringType()),
                                                                  StructField('createTimestamp', StringType())
                                                                  ])))])
apache_committees_df = loadFlatJsonFile(path="http_data_sources/public_ldap_committees.json", # http://people.apache.org/public/public_ldap_people.json
                                 explodeKey="committees", schema=apache_committees_schema)
apache_committees_on_github_df = apache_committees_df.filter(project_on_github_udf(apache_committees_df.key))
apache_committees_on_github_df.persist(StorageLevel.MEMORY_AND_DISK)
committee_names_df = apache_committees_on_github_df.select(apache_committees_df.key.alias("project"))
committee_names_df.persist(StorageLevel.MEMORY_AND_DISK)
committee_names_df.count()

148

In [11]:
project_to_user_df = apache_committees_on_github_df.select(
    apache_committees_on_github_df.key.alias("project"),
    explode(apache_committees_on_github_df.value.roster).alias("username"))


user_to_project_df = project_to_user_df.groupBy(project_to_user_df.username).agg(
    collect_set(project_to_user_df.project).alias("projects"))
apache_people_df = apache_people_df.join(user_to_project_df, on="username")

In [12]:
apache_people_df.take(1)

[Row(username='a_horuzhenko', extra=Row(name='Artyom Horuzhenko', key_fingerprints=None, urls=None), projects=['openmeetings'])]

Attempt to fetch relevant past & present meetups for each project - idea based on the listing at https://www.apache.org/events/meetups.html but different code

We want to do a non-blocking count to materialize the meetup RDD

In [13]:
# Some async helpers, in Scala we would use AsyncRDDActions but its not currently available in Python
# Support is being considered in https://issues.apache.org/jira/browse/SPARK-20347
def non_blocking_rdd_count(rdd):
    import threading
    def count_magic():
        rdd.count()
    thread = threading.Thread(target=count_magic)
    thread.start()

def non_blocking_rdd_save(rdd, target):
    import threading
    def save_panda():
        rdd.saveAsPickleFile(target)
    thread = threading.Thread(target=save_panda)
    thread.start()

def non_blocking_df_save(df, target):
    import threading
    def save_panda():
        df.write.save(target)
    thread = threading.Thread(target=save_panda)
    thread.start()

In [22]:
logger = logging.getLogger()
logger.setLevel("WARN")
def lookup_relevant_meetup(project_name, max_meetup_events=0):
    """Lookup relevant meetups for a specific project."""
    import logging
    import time
    import meetup.api
    logger = logging.getLogger()
    meetup_delay = 30
    meetup_reset_delay = 1800 # 30 minutes
    standard_keys = {"text_format": "plain", "trending": "desc=true", "and_text": "true", "city": "san francisco", "country": "usa", "text": "apache " + project_name, "radius": 10000}
    results = {"upcoming": [], "past": []}
    for status in ["upcoming", "past"]:
        keys = copy(standard_keys)
        keys["status"] = status
        count = 200
        base = 0
        while (count == 200 and (max_meetup_events == 0 or base < max_meetup_events)):
            logging.debug("Fetch {0} meetups for {1} on base {2}".format(status, project_name, base))
            project_name = "spark"
            client = client = meetup.api.Client(meetup_key)
            if base > 0:
                keys["page"] = base
            # Manually sleep for meetup_reset_delay on failure, the meetup-api package retry logic sometimes breaks :(
            response = None
            retry_count = 0
            while response is None and retry_count < 10:
                try:
                    response = client.GetOpenEvents(**keys)
                except:
                    response = None
                    retry_count += 1
                    time.sleep(meetup_reset_delay)
                    try:
                        response = client.GetOpenEvents(**keys)
                    except:
                        response = None
            try:
                count = response.meta['count']
                base = base + count
                results[status].append(response.results)
                time.sleep(meetup_delay)
            except:
                count = 0
    return (project_name, results)

project_meetups_rdd = committee_names_df.repartition(200).rdd.map(lambda x: x.project).map(lambda name: lookup_relevant_meetup(name, max_meetup_events))

In [23]:
project_meetups_rdd.persist(StorageLevel.MEMORY_AND_DISK)
non_blocking_rdd_save(project_meetups_df, "mini_meetup_data")

For the provided projects attempt to lookup their GitHub

In [24]:
def lookup_project_git(project):
    """Returns the project github for a specific project. Assumes project is git hosted"""
    return "git://git.apache.org/{0}".format(project)
    

In [25]:
def fetch_project_github_data(project):
    from perceval.backends.core.github import GitHub as perceval_github
    gh_backend = perceval_github(owner="apache", repository=project)
    # The backend return a generator - which is awesome. However since we want to pull this data into Spark 
    return list(gh_backend.fetch())

In [26]:
def fetch_project_git_data(project):
    project_git = lookup_project_git(project)
    from perceval.backends.core.git import Git as perceval_git
    git_backend = perceval_git(owner="apache", repository=project)
    return list(gh_backend.fetch())

In [27]:
from lazy_helpers import *

bcast_driver = sc.broadcast(LazyDriver)

def lookup_crunchbase_info(people_and_projects):
    """Lookup a person a crunch base and see what the gender & company is.
    Filter for at least one mention of their projects."""
    from bs4 import BeautifulSoup
    import re
    driver = bcast_driver.value.get()
    import time
    import random
    for (username, name, projects, urls) in people_and_projects:
        time.sleep(random.randint(0, 15))
        # robots.txt seems to be ok with person for now, double check before re-running this
        url = "https://www.crunchbase.com/person/{0}".format(name.replace(" ", "-"))
        try:
            driver.get(url)
            text = driver.page_source
            lower_text = text.lower()
            if any(project.lower() in lower_text for project in projects) or any(url.lower in lower_text for url in urls):
                soup = BeautifulSoup(text, "html.parser")
                stats = soup.findAll("div", { "class" : "info-card-overview-content"})[0]
                # Hacky but I'm lazy
                result = {}
                result["username"] = username
                try:
                    m = re.search("Gender:\</dt\>\<dd\>(.+?)\<", str(stats))
                    result["gender"] = m.group(1)
                except:
                    # If nothing matches thats ok
                    pass
                try:
                    m = re.search("data-name=\"(.+?)\" data-permalink=\"\/organization", str(stats))
                    result["company"] = m.group(1)
                except:
                    # No match no foul
                    pass
                yield result
        except:
            pass

In [16]:
result = lookup_crunchbase_info([("holden", "holden karau", ["spark"], ["http://www.holdenkarau.com"])])
list(result)

[{'company': 'IBM', 'gender': 'Female', 'username': 'holden'}]

Augment the committer info

In [17]:
# We do this as an RDD transformation since the cost of the transformation dominates
relevant_info = apache_people_df.select(
    apache_people_df.username,
    apache_people_df.extra.getField("name").alias("name"),
    apache_people_df.projects,
    apache_people_df.extra.getField("urls").alias("urls"))
crunchbase_info_rdd = relevant_info.rdd.map(lambda row: (row.username, row.name, row.projects, row.urls)).mapPartitions(lookup_crunchbase_info)
crunchbase_info_rdd.persist(StorageLevel.MEMORY_AND_DISK)
schema = StructType([
    StructField("username", StringType()),
    StructField("gender", StringType()),
    StructField("company", StringType())])
crunchbase_info_df = crunchbase_info_rdd.toDF(schema = schema)

In [28]:
non_blocking_df_save(crunchbase_info_df, "crunchbase_out_1")

In [29]:
apache_people_df.count()

2164

Export to Mechnical turk format

In [30]:
def mini_concat_udf(array_strs):
    """Concat the array of strs"""
    if array_strs == None:
        return ""
    else:
        return ' '.join(array_strs)

# Except I'm a bad person so....
from pyspark.sql.catalog import UserDefinedFunction
mini_concat_udf = UserDefinedFunction(mini_concat_udf, StringType(), "mini_concat_udf")
session.catalog._jsparkSession.udf().registerPython("mini_concat_udf", mini_concat_udf._judf)

apache_people_df.select(
    apache_people_df.username,
    apache_people_df.extra.getField("name").alias("name"),
    mini_concat_udf(apache_people_df.extra.getField("urls")).alias("personal_websites"),
    mini_concat_udf(apache_people_df.projects).alias("projects")
    ).write.csv("./apache_people.csv", header=True)

AnalysisException: 'path file:/home/holden/repos/diversity-analytics/apache_people.csv already exists.;'

In [None]:
crunchbase_info_rdd.collect()

One of the things that is interesting is understanding what the tones of the meetup descriptions & mailing list posts are. We can use https://www.ibm.com/watson/developercloud/tone-analyzer/api/v3/?python#introduction

In [None]:
def lookup_tone(document):
    """Looks up the tone for a specific document. Returns a json blob."""
    from watson_developer_cloud import ToneAnalyzerV3
    tone_analyzer = ToneAnalyzerV3(
        username=tone_bluemix_user,
        password=tone_bluemix_password,
        version='2016-05-19 ')
    return tone_analyzer.tone(text=document)

In [None]:
oh_no_you = lookup_tone("oh no you didn't girl")

In [None]:
oh_no_you

Ok its time to find some mailing list info

In [31]:
def fetch_mbox_ids(project_name):
    """Return the mbox ids"""
    import itertools

    def fetch_mbox_ids_apache_site(box_type):
        """Fetches all of the mbox ids from a given apache project and box type (dev or user)"""
        root_url = "http://mail-archives.apache.org/mod_mbox/{0}-{1}".format(project_name, box_type)
        
        # Fetch the page to parse
        pool = bcast_pool.value.get()
        result = pool.request('GET', root_url)
        
        
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(result.data, "html.parser")
        mbox_ids = set(map(lambda tag: tag.get('id'), soup.findAll("span", { "class" : "links"})))
        return map(lambda box_id: (project_name, box_type, box_id), mbox_ids)

    return list(itertools.chain.from_iterable(map(fetch_mbox_ids_apache_site, ["dev", "user"])))
        
        
def fetch_and_process_mbox_records(project_name, box_type, mbox_id):
        import tempfile
        import shutil
        from perceval.backends.core.mbox import MBox as perceval_mbox

        def process_mbox_directory(base_url, dir_path):
            mbox_backend = perceval_mbox(base_url, dir_path)
            return mbox_backend.fetch()
        
        def append_project_info(result):
            """Add the project information to the return from perceval"""
            result["project_name"] = project_name
            result["box_type"] = box_type
            result["mbox_id"] = mbox_id

        # Make a temp directory to hold the mbox files
        tempdir = tempfile.mkdtemp()

        try:
            root_url = "http://mail-archives.apache.org/mod_mbox/{0}-{1}".format(project_name, box_type)
            mbox_url = "{0}/{1}.mbox".format(root_url, mbox_id)
            filename = "{0}/{1}.mbox".format(tempdir, mbox_id)
        
            print("fetching {0}".format(mbox_url))

            pool = bcast_pool.value.get()
            with pool.request('GET', mbox_url, preload_content=False) as r, open(filename, 'wb') as out_file:       
                shutil.copyfileobj(r, out_file)
                return list(map(append_project_info, process_mbox_directory(root_url, tempdir)))
        finally:
            shutil.rmtree(tempdir)

In [None]:
fetched_mbox_ids = fetch_mbox_ids("spark")
list(fetched_mbox_ids)[0]
fetched_mbox_data = fetch_and_process_mbox_records('spark', 'dev', '201308')

In [None]:
#fetch_and_process_mbox_records

In [32]:
def random_key(x):
    import random
    return (random.randint(0, 40000), x)

def de_key(x):
    return x[1]

mailing_list_posts_mbox_ids = committee_names_df.repartition(400).rdd.flatMap(lambda row: fetch_mbox_ids(row.project))
# mbox's can be big, so break up how many partitions we have
mailing_list_posts_mbox_ids = mailing_list_posts_mbox_ids.map(random_key).repartition(2000).map(de_key)
mailing_list_posts_rdd = mailing_list_posts_mbox_ids.flatMap(lambda args: fetch_and_process_mbox_records(*args))
mailing_list_posts_rdd.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[91] at RDD at PythonRDD.scala:48

In [35]:
schema = StructType([
    StructField("project_name",StringType()),
    StructField("box_type",StringType()), # dev or user
    StructField("mbox_id",StringType()),
    StructField("backend_name",StringType()),
    StructField("backend_version",StringType()),
    StructField("category",StringType()),
    StructField("data", MapType(StringType(),StringType())), # The "important" bits
    StructField("origin",StringType()),
    StructField("perceval_version",StringType()),
    StructField("tag",StringType()),
    StructField("timestamp",DoubleType()),
    StructField("updated_on",DoubleType()),
    StructField("uuid",StringType())])
mailing_list_posts_mbox_df_raw = mailing_list_posts_rdd.toDF(schema=schema)
mailing_list_posts_mbox_df_raw.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[project_name: string, box_type: string, mbox_id: string, backend_name: string, backend_version: string, category: string, data: map<string,string>, origin: string, perceval_version: string, tag: string, timestamp: double, updated_on: double, uuid: string]

In [36]:
non_blocking_df_save(mailing_list_posts_mbox_df_raw, "mailing_list_info")

In [None]:
mailing_list_posts_mbox_df = mailing_list_posts_mbox_df_raw.select("*",
                                                               mailing_list_posts_mbox_df_raw.data.getField("From").alias("from"),
                                                               mailing_list_posts_mbox_df_raw.data.getField("body").alias("body")
                                                              )

In [None]:
df.schema