In [2]:
from odps import ODPS
from odps import options
import oss2
from odps.df import DataFrame
import datetime
import pandas as pd
import numpy as np
import threading
import queue
import time

# connect信息
from ACCESS import ACCESS_ID
from ACCESS import SECRET_ACCESS_KEY
from ACCESS import ODPS_PROJECT
from ACCESS import ODPS_ENDPOINT

# # 此为外网地址
OSS_ENDPOINT = 'http://oss-cn-shanghai.aliyuncs.com'
OSS_BUCKET = 'hypertrons'

# oss实例
auth = oss2.Auth(ACCESS_ID, SECRET_ACCESS_KEY)
bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET)

# 创建odps实例
o = ODPS(ACCESS_ID, SECRET_ACCESS_KEY, project=ODPS_PROJECT, endpoint=ODPS_ENDPOINT)
options.tunnel.limit_instance_tunnel = False
# options.read_timeout = 3600000

hints = {'odps.sql.allow.fullscan': 'true', 'odps.sql.submit.mode': 'script'}

In [34]:
# 创建Yue_ORG表(排除同一个actor加入同一个组织的记录)
# 用户属于某组织
drop_sql = '''
DROP TABLE IF EXISTS {TEMP_TABLE_NAME};
CREATE TABLE IF NOT EXISTS {TEMP_TABLE_NAME}
(
    actor_id STRING,
    actor_org_id STRING,
    actor_org_login STRING
);
'''.format(
    TEMP_TABLE_NAME='Yue_ORG'
)
o.execute_sql(drop_sql, hints=hints)

select_sql = '''
    INSERT INTO {TEMP_TABLE_NAME}
    SELECT DISTINCT actor_id, org_id, org_login
    FROM ods_github_log
    WHERE type='MemberEvent'
    '''.format(
        TEMP_TABLE_NAME='Yue_ORG'
    )
o.execute_sql(select_sql, hints=hints)
tmp = DataFrame(o.get_table('Yue_ORG'))
tmp.count()

In [3]:
# 创建Yue_PR表(排除同一个actor提交给同一个组织的PR)
# 用户给某仓库提PR
drop_sql = '''
DROP TABLE IF EXISTS {TEMP_TABLE_NAME};
CREATE TABLE IF NOT EXISTS {TEMP_TABLE_NAME}
(
    actor_id STRING,
    repo_id STRING,
    repo_org_id STRING,
    repo_org_login STRING
);
'''.format(
    TEMP_TABLE_NAME='Yue_PR'
)
o.execute_sql(drop_sql, hints=hints)

select_sql = '''
    INSERT INTO {TEMP_TABLE_NAME}
    SELECT DISTINCT actor_id, repo_id, org_id, org_login
    FROM ods_github_log
    WHERE type='PullRequestEvent' AND org_id IS NOT NULL AND org_login IS NOT NULL
    '''.format(
        TEMP_TABLE_NAME='Yue_PR'
    )
o.execute_sql(select_sql, hints=hints)
tmp = DataFrame(o.get_table('Yue_PR'))
tmp.count()

In [4]:
# 创建Yue_ORG_To_ORG表
# 组织给某组织提PR,相当于一个组织与另一个组织存在一条有向边
drop_sql = '''
DROP TABLE IF EXISTS {TEMP_TABLE_NAME};
CREATE TABLE IF NOT EXISTS {TEMP_TABLE_NAME}
(
    actor_org_id STRING,
    actor_org_login STRING,
    repo_org_id STRING,
    repo_org_login STRING
);
'''.format(
    TEMP_TABLE_NAME='Yue_ORG_To_ORG'
)
o.execute_sql(drop_sql, hints=hints)

select_sql = '''
    INSERT INTO {TEMP_TABLE_NAME}
    SELECT DISTINCT 
            Yue_ORG.actor_org_id, 
            Yue_ORG.actor_org_login, 
            Yue_PR.repo_org_id, 
            Yue_PR.repo_org_login
    FROM Yue_PR INNER JOIN Yue_ORG ON Yue_PR.actor_id = Yue_ORG.actor_id
    WHERE Yue_PR.repo_org_id != '0'
    '''.format(
        TEMP_TABLE_NAME='Yue_ORG_To_ORG'
    )
o.execute_sql(select_sql, hints=hints)

tmp = DataFrame(o.get_table('Yue_ORG_To_ORG'))
tmp.count()

In [64]:
# 对每个组织,统计来自不同组织的pr的数量(入度),并排序
select_sql = '''
    set odps.sql.validate.orderby.limit=false;
    SELECT repo_org_id, repo_org_login, count(actor_org_id) AS count
    FROM Yue_ORG_To_ORG
    GROUP BY repo_org_id, repo_org_login
    ORDER BY count DESC
    '''.format(
        TEMP_TABLE_NAME='Yue_ORG_To_ORG'
    )
table = o.execute_sql(select_sql, hints={'odps.sql.allow.fullscan': 'true', 'odps.sql.submit.mode': 'script'})

list_ = []
i=0
with table.open_reader(tunnel=True) as reader:
    for record in reader:
        print(record.repo_org_id, record.repo_org_login, record.count)
        i += 1
        if i > 10:
            break

1342004 google 6817
47359 apache 6311
69631 facebook 6127
1503512 Homebrew 5445
6154722 microsoft 5107
3637556 DefinitelyTyped 4816
9919 github 4772
6154722 Microsoft 4622
13629408 kubernetes 4099
761456 hashicorp 3067
5429470 docker 2823


In [65]:
# 初始化组织表，并统计出度
drop_sql = '''
DROP TABLE IF EXISTS Yue_org_degree;
CREATE TABLE IF NOT EXISTS Yue_org_degree
(
    org_id STRING,
    org_login STRING,
    degree BIGINT
);
'''

o.execute_sql(drop_sql, hints=hints)

select_sql = '''
    INSERT INTO Yue_org_degree
    SELECT actor_org_id, actor_org_login, count(repo_org_id)
    FROM Yue_ORG_To_ORG
    GROUP BY actor_org_id, actor_org_login
    HAVING actor_org_id!= '0'
    '''
o.execute_sql(select_sql, hints=hints)
tmp = DataFrame(o.get_table('Yue_org_degree'))
print('组织数：',tmp.count())


select_sql = '''
    set odps.sql.validate.orderby.limit=false;
    SELECT *
    FROM Yue_org_degree
    ORDER BY degree DESC
    '''
table = o.execute_sql(select_sql, hints=hints)
list_ = []
i=0
with table.open_reader(tunnel=True) as reader:
    for record in reader:
        print(record.org_id, record.org_login, record.degree)
        i += 1
        if i > 10:
            break

组织数： 224910
1342004 google 2910
131524 mozilla 1953
6154722 microsoft 1831
107424 jenkinsci 1332
6154722 Microsoft 1226
2810941 GoogleCloudPlatform 1219
1459110 IBM 1210
9919 github 1177
25064361 alchemycodelab 1113
66423638 IIM-Creative-Technology 1107
6844498 Azure 1087
