In [None]:
%load_ext autoreload
%autoreload 2


In [1]:
from pathlib import Path
from tqdm.auto import tqdm

import pandas as pd
import gzip
import json

import pickle

import numpy as np
import matplotlib.pyplot as plt

import ray
import dask.dataframe as dd

import text2code_dataset.dataset.gharchive.parser as parser
from toolkit_run.ray.server import LabRayToolkitServer

Matplotlib created a temporary config/cache directory at /tmp/matplotlib-_5widn_3 because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


## Ray cluster management

In [None]:
server = LabRayToolkitServer()

In [None]:
server.dashboard_url

In [None]:
server.scale_cluster(60)

In [None]:
process_archive(files[0])

In [None]:
server.scale_cluster(0)

In [None]:
ray.shutdown()

## Create repo to bucket dict

In [None]:
from text2code_dataset.dataset.licenses import safe_licenses

In [None]:
safe_licenses

In [None]:
df_min_repo_events = pd.read_parquet('/repo_workdir/min_repo_event_datetime_all.parquet')
    
df_min_repo_events = df_min_repo_events.rename(
    columns = {'date': 'min_repo_event_datetime'}
)

In [None]:
len(df_min_repo_events)

In [None]:
df_min_repo_events[df_min_repo_events['repo_name'] == 'LWFlouisa/NVLAIML']

In [None]:
ri = pd.read_parquet(buckets[0]['ri'])

In [None]:
ri = ri.merge(df_min_repo_events, left_on='name', right_on='repo_name', how='left')

In [None]:
with open('/dataset/repositories_2015.pkl', 'rb') as f:
    repos_2015 = pickle.load(f)


In [None]:
len(repos_2015)

In [None]:
with open('/repo_workdir/repositories_2015_v1.pkl', 'rb') as f:
    repos_2015_v1 = pickle.load(f)

In [8]:
with open('/repo_workdir/repositories_2015_min_event_times_v1.pkl', 'rb') as f:
    repos_2015_min_event_times_v1 = pickle.load(f)

In [None]:
repos_2015_min_event_times_v1_set = set(repos_2015_min_event_times_v1.keys())
repos_2015_set = set(repos_2015.keys())

In [None]:
repos_2015_min_event_times_v1_set.symmetric_difference(repos_2015_set)

In [None]:
len(repos_2015_min_event_times_v1)

In [None]:
len(repos_2015_v1)

In [None]:
set(repos_2015.keys()).symmetric_difference(repos_2015_v1.keys())

In [None]:
'LWFlouisa/NVLAIML' in repos_2015.keys()

In [None]:
df['repo_name'].nunique

In [11]:

df = pd.read_parquet(buckets[0]['ri'])



In [14]:
%%time
res = df['name'].map(lambda x: repos_2015_min_event_times_v1[x] if x in repos_2015_min_event_times_v1 else None)

CPU times: user 4.62 ms, sys: 96 µs, total: 4.72 ms
Wall time: 4.67 ms


In [None]:
res

0       2021-01-19 07:28:12+00:00
1       2021-01-19 07:31:15+00:00
2       2020-02-08 15:04:36+00:00
3       2018-12-04 07:32:33+00:00
4       2021-05-25 18:38:09+00:00
                  ...            
2039    2021-08-14 10:57:25+00:00
2040    2021-03-07 16:25:54+00:00
2041    2021-08-01 09:33:10+00:00
2042    2020-03-26 18:50:38+00:00
2043    2021-06-30 17:20:47+00:00
Name: name, Length: 2044, dtype: object

In [None]:
df = df.reset_index(drop=False)

In [None]:
df = df[['repo_name', 'date']]

In [None]:
df.to_parquet('/repo_workdir/min_repo_event_datetime_all.parquet')

In [2]:
from text2code_dataset.dataset.filter_dask import DaskDataframesFilterGroupApplyBucketed
buckets = DaskDataframesFilterGroupApplyBucketed.get_folder_buckets([
    Path('/dataset/repositories_zipped2_logs/indexes/repo_info/df_2022-01-24_all_licenses_no_vanity'),
    Path('/dataset/repositories_zipped2_logs/indexes/repo_info/df_github_v2_all_licenses_clean')
])


In [6]:
def process_ri_clean(bucket):
    ri = pd.read_parquet(bucket['ri'])
    if not 'min_repo_event_datetime' in ri.columns:
        #print('processed')
        return
    
    ri = ri.drop(columns=['min_repo_event_datetime'])
    ri = ri.drop_duplicates(ignore_index=True)
    
    tmp_fn = bucket['ri'].parent / ('___tmp___' + bucket['ri'].name)
    ri.to_parquet(tmp_fn)
    tmp_fn.rename(bucket['ri'])
    
process_ri_clean_ray = ray.remote(process_ri_clean).options(num_cpus=2, scheduling_strategy="SPREAD")

In [18]:
for bucket in tqdm(buckets):
    process_ri_add_min_repo_event(bucket, repos_2015_min_event_times_v1)

  0%|          | 0/22674 [00:00<?, ?it/s]

In [17]:

def process_ri_add_min_repo_event(bucket, repos_2015_min_event_times_v1):
    ri = pd.read_parquet(bucket['ri'])
    if 'min_repo_event_datetime' in ri.columns:
        print('processed')
        return
    
    ri['min_repo_event_datetime'] = df['name'].map(
        lambda x: repos_2015_min_event_times_v1[x] if x in repos_2015_min_event_times_v1 else None
    )

    tmp_fn = bucket['ri'].parent / ('___tmp___' + bucket['ri'].name)
    ri.to_parquet(tmp_fn)
    tmp_fn.rename(bucket['ri'])
    
process_ri_add_min_repo_event_ray = ray.remote(process_ri_add_min_repo_event).options(num_cpus=2, scheduling_strategy="SPREAD")

In [None]:
@ray.remote(num_cpus=2, scheduling_strategy="SPREAD")
def process_ri_add_min_repo_event_if_processed(bucket):
    ri = pd.read_parquet(bucket['ri'])
    if 'min_repo_event_datetime' in ri.columns:
        return 1
    return 0

In [None]:
@ray.remote(scheduling_strategy="SPREAD")
def process_ri(bucket):
    ri = pd.read_parquet(bucket['ri'])
    if 'issues_count' in ri.columns:
        print('processed')
        return
    
    df_forks = pd.read_parquet('/repo_workdir/forks_per_repos_all.parquet')
    df_stars = pd.read_parquet('/repo_workdir/watchers/all.parquet')
    df_issues = pd.read_parquet('/repo_workdir/issues_per_repos_all.parquet')
    df_forks = df_forks.reset_index(drop=False)
    df_forks = df_forks.rename(
        columns = {'src_repo': 'repo_name', 'count': 'forks_count', 'event_min_datetime': 'forks_event_min_datetime', 'event_max_datetime': 'forks_event_max_datetime'}
    )
    df_forks = df_forks.sort_values('repo_name')
    df_stars = df_stars.reset_index(drop=True)
    df_stars = df_stars.rename(
        columns = {'count': 'stars_count', 'min_date': 'stars_event_min_datetime', 'max_date': 'stars_event_max_datetime'}
    )
    df_stars = df_stars.sort_values('repo_name')
    df_issues = df_issues.reset_index(drop=False)
    df_issues = df_issues.rename(
        columns = {'event_repo_name': 'repo_name', 'count': 'issues_count', 'event_min_datetime': 'issues_event_min_datetime', 'event_max_datetime': 'issues_event_max_datetime'}
    )
    df_issues = df_issues[['repo_name', 'issues_count', 'issues_event_min_datetime', 'issues_event_max_datetime']]
    df_issues = df_issues.sort_values('repo_name')
    
    ri = ri.merge(df_stars, left_on='name', right_on='repo_name', how='left')
    ri = ri.drop(columns=['repo_name'])
    ri = ri.merge(df_forks, left_on='name', right_on='repo_name', how='left')
    ri = ri.drop(columns=['repo_name'])
    ri = ri.merge(df_issues, left_on='name', right_on='repo_name', how='left')
    ri = ri.drop(columns=['repo_name'])
    tmp_fn = bucket['ri'].parent / ('___tmp___' + bucket['ri'].name)
    ri.to_parquet(tmp_fn)
    tmp_fn.rename(bucket['ri'])

In [None]:
process_ri(buckets[1], df_stars, df_forks, df_issues)

In [None]:
tmp_fn

In [None]:
@ray.remote
def process(bucket):
    lic = pd.read_parquet(bucket['lic'])
    idx = lic.groupby(["ri_id", 'file'])['confidence'].transform(max) == lic['confidence']
    lic = lic[idx]
    lic = lic.groupby('ri_id').filter(lambda x: x['license'].isin(safe_licenses).all())
    lic = set(lic['ri_id'])
    fi = pd.read_parquet(bucket['fi'])
    fi = fi[fi['ext_key'] == 'py']
    fi = set(fi['ri_id'])
    lic = lic.intersection(fi)
    ri = pd.read_parquet(bucket['ri'])
    ri = ri[ri['id'].isin(lic)]
    ri = set(ri['name'])
    return ri

In [None]:
@ray.remote
def process_lic_hist(bucket):
    lic = pd.read_parquet(bucket['lic'])
    lic = lic[['ri_id', 'confidence', 'license']].groupby('ri_id').max()
    lic = lic.groupby('license').count()
    return lic

In [None]:
@ray.remote
def process_lic_hist_old(bucket):
    df = pd.read_parquet(bucket['lic'], engine='fastparquet')
    df = df.sort_values('confidence').groupby('ri_id').last()
    return df.groupby('license')['license'].count().to_frame('count')

In [None]:

def process_lic_hist_old_vs_new(bucket):
    dfl = pd.read_parquet(bucket['lic'], engine='fastparquet')
    df_old =  dfl.sort_values('confidence').groupby('ri_id').last()
    
    df_new = dfl[['ri_id', 'confidence', 'license']].groupby('ri_id').max()
    
    df_diff = df_new.merge(df_old, on='ri_id')
    return df_diff, dfl
    

In [None]:
lic1 = pd.read_parquet(buckets[0]['lic'])
lic = lic1[['ri_id', 'confidence', 'license']].groupby('ri_id').max()
lic = lic.groupby('license').count()

In [None]:
df_diff, dfl = process_lic_hist_old_vs_new(buckets[0])

In [None]:
df_diff

In [None]:
dfl[dfl['ri_id'] == 3]max

In [None]:
lic1['ri_id'].nunique()

In [None]:
pd.options.display.max_rows = 600

In [None]:
df_stars_f = ray.put(df_stars)
df_forks_f = ray.put(df_forks)
df_issues_f = ray.put(df_issues)

In [None]:
del df_stars_f
del df_forks
del df_issues_f

In [None]:
res = []
for bucket in  tqdm(buckets):
    res.append(process_ri_add_min_repo_event_if_processed.remote(bucket))
res = ray.get(res)

In [None]:
len(res) - sum(res)

In [None]:
ready, not_ready = ray.wait(res, num_returns=len(res), fetch_local=False, timeout=1)


In [None]:
len(ready)

In [None]:
len(not_ready)

In [None]:
for el in res:
    ray.cancel(el)

In [None]:
res_permissive = set().union(*res)

In [None]:
len(res_permissive)

In [None]:
pickle_save(res_permissive, '/repo_workdir/permissive_repos_v2alfa2_py.pkl')

In [None]:
license_hist_old = pd.concat(res).groupby(['license']).sum().reset_index()

In [None]:
license_hist_old

In [None]:
license_hist = pd.concat(res).groupby(['license']).sum().reset_index()

In [None]:
license_hist_old = license_hist_old.rename(columns={'license': 'TheStack_license_id', 'count': 'TheStack_repo_count'})

In [None]:
license_hist.to_json('/repo_workdir/lic_hist_dataset_v2.json')


In [None]:
permissive_list = pd.read_json('https://blueoakcouncil.org/list.json')
copyleft_list = pd.read_json('https://blueoakcouncil.org/copyleft.json')
copyleft_list = copyleft_list.reset_index(drop=False)

In [None]:
for copyleft_list['families']

In [None]:
data = []
for r in permissive_list.iterrows():
    version = r[1]['version']
    name = r[1]['ratings']['name']
    notes = r[1]['ratings']['notes']
    for lic in r[1]['ratings']['licenses']:
        lic = {'license_'+k: v for k, v in lic.items()}
        lic['license_type'] = 'permissive'
        lic['ratings_version'] = version
        lic['ratings_name'] = name
        lic['ratings_notes'] = notes
        data.append(lic)
    

In [None]:
for r in copyleft_list.iterrows():
    rating_name = r[1]['index']
    ratigs_version = r[1]['version']
    for family in r[1]['families']:
        family_name = family['name']
        for version in family['versions']:
            lic = {'license_'+k: v for k, v in version.items()}
            lic['license_type'] = 'copyleft'
            lic['ratings_version'] = ratigs_version
            lic['ratings_name'] = rating_name
            lic['license_family_name'] = family_name
            data.append(lic)

In [None]:
licenses_solicitors = pd.DataFrame(data)

In [None]:
len(data)

In [None]:
licenses_solicitors[licenses_solicitors['license_id'] =='GPL-3.0-or-later']

In [None]:
licenses_x = licenses_solicitors.merge(license_hist_old, left_on='license_id', right_on='TheStack_license_id', how='outer')


In [None]:
len(licenses_x.columns)

In [None]:
#license_identifier, count, type, rating name, license_url, ratings_version, ratings_notes
licenses_x = licenses_x[[
        'license_id', 'TheStack_license_id', 'TheStack_repo_count', 'license_type', 'ratings_name', 'license_url', 'ratings_version', 'ratings_notes',
        'license_name', 'license_family_name'
    ]]

In [None]:
licenses_x = licenses_x.sort_values('TheStack_repo_count', ascending=False)

In [None]:
licenses_x.to_csv('/repo_workdir/license_x.csv')

In [None]:
licenses_x

In [None]:
from toolkit_run.util.io import pickle_save

In [None]:
lic = lic.sort_values('confidence',ascending=False)
lic = lic.groupby('ri_id').filter(lambda x: all(x['license'].isin(safe_licenses))) 

In [None]:
lic['license_file_conf'] = lic[['license', 'file', 'confidence']].values.tolist()

In [None]:
lic = lic[['ri_id', 'license_file_conf']]

In [None]:
lic['ri_index'] = lic.groupby('ri_id').cumcount()

In [None]:
max_index = lic['ri_index'].max()

In [None]:
lic['ri_index'] = lic['ri_index'].astype(str)

In [None]:
lic = lic.pivot('ri_id', columns='ri_index')

In [None]:
lic.columns = lic.columns.map('_'.join)

In [None]:
lic

In [None]:
lic['size'] = lic.groupby('ri_id').size()

In [None]:
tuple(lic.groupby('ri_id').size().agg(['idxmax','max']))

In [None]:
ri =  pd.read_parquet(buckets[0]['ri'])

In [None]:
ri[ri['id'] == 1821]

In [None]:
pd.set_option('display.max_rows', 500)

In [None]:
lic[lic['ri_id'] == 1821]

In [None]:
lic.pivot(index='ri_id',columns=['license'], values=['file', 'confidence'])

In [None]:
lic1.reset_index().unstack().to_frame().sort_index(level=1)

In [None]:
lic1.reset_index().unstack().to_frame().sort_index(level=1).T

In [None]:
lic.groupby('ri_id').filter(lambda x: all(x['license'].isin(safe_licenses)))

In [None]:
@ray.remote
def process(bucket, dst):
    

In [None]:
files = list(Path('/dataset/repositories_zipped2_logs/indexes/repo_info/df_2022-01-24_all_licenses_no_vanity/ri').glob('data*.parquet'))
files += list(Path('/dataset/repositories_zipped2_logs/indexes/repo_info/df_github_v2_all_licenses_clean/ri').glob('data*.parquet'))

In [None]:
len(files)

In [None]:
files[0]

In [None]:
df = pd.read_parquet(files[0])

In [None]:
data = []
for file in tqdm(files):
    bucket_name =  f'{file.parent.parent.stem}/{file.parent.stem}/{file.stem}'
    df = pd.read_parquet(file)
    df = df[['name']]
    df['bucket_name'] = bucket_name
    df = df.set_index('name')
    data.append(df)

In [None]:
bucket_name

In [None]:
df = pd.concat(data)

In [None]:
repo_to_bucket_dict = df['bucket_name'].to_dict()

In [None]:
len(repo_to_bucket_dict)

In [None]:
%%timeit
repo_to_bucket_dict['mustakim150/work-space']

In [None]:
with open('/repo_workdir/dataset_v2_clean_repo_to_bucket_dict.pkl', 'wb') as f:
    pickle.dump(repo_to_bucket_dict, f)

## Split database files reopos to buckets

In [None]:
path = Path('/data/hf_repos/multi_safe_license_raw/data')
files = list(path.glob('*/data*.jsonl'))

In [None]:
dest_path = Path('/repo_workdir/dataset_v2_add_stars_hashes')
dest_path.mkdir(parents=True, exist_ok=True)

In [None]:
from collections import defaultdict
from text2code_dataset.dataset.postprocessing.near_dedup.util import enum_json_lines
import hashlib
def process(files, index):
    with open('/repo_workdir/dataset_v2_clean_repo_to_bucket_dict.pkl', 'rb') as f:
        repo_to_bucket_dict = pickle.load(f)
    split_data = defaultdict(list)
    for file in tqdm(files):
        for data, line in enum_json_lines(file):
            dataset_path = f'{file.parent.stem}/{file.stem}' 
            repo = data['repository_name']
            file_path = data['path']

            hash_object = hashlib.sha256(data['content'].encode())
            hex_dig = hash_object.hexdigest()
            split_data[repo_to_bucket_dict[repo]].append((repo, file_path, dataset_path, hex_dig))
        

In [None]:
with open('/repo_workdir/dataset_v2_clean_repo_to_bucket_dict.pkl', 'rb') as f:
    repo_to_bucket_dict = pickle.load(f)


In [None]:
split_data = defaultdict(list)
for file in tqdm(files):
    for data, line in enum_json_lines(file):
        dataset_path = f'{file.parent.stem}/{file.stem}' 
        repo = data['repository_name']
        file_path = data['path']

        hash_object = hashlib.sha256(data['content'].encode())
        hex_dig = hash_object.hexdigest()
        split_data[repo_to_bucket_dict[repo]].append((repo, file_path, dataset_path, hex_dig))

In [None]:
res = []
for file in files:
    res.append(process_archive_ray.remote(file))
ray.get(res)

In [None]:
import  random

In [None]:
random.shuffle(files)

In [None]:
sizes = []
n = 70
for i in range(0, len(files), n):
    sz = 0
    for file in files[i: i+n]:
        sz += file.stat().st_size
    sizes.append(sz)

In [None]:
plt.plot(sizes)

In [None]:
from typing import Optional
import requests
import datetime
from requests.adapters import HTTPAdapter, Retry


class Download():
    def __init__(self, user_agent: str = 'big_code_bot_v01', num_retry: int = 3, proxies: Optional[list[str]] = None):
        self.s = requests.Session()
        self.retries = Retry(total=num_retry, backoff_factor=1, status_forcelist=[429, 502, 503, 504])
        self.s.mount('http://', HTTPAdapter(max_retries=self.retries))
        self.s.mount('https://', HTTPAdapter(max_retries=self.retries))
        self.user_agent = user_agent
        sefl.proxies = proxies

    def get(url: str) -> str:
        headers = {
            'User-Agent': self.user_agent
        }
        try:
            resp = self.s.get(url, headers=headers, self.proxies=proxies)
            html = resp.text
            if resp.status_code > 400:
                print('status code: ', resp.status_code)
                html = None
        except requests.exceptions.RequestException as e:
            print(e)
            return None
        return html, 

def get_a_text_w_text(html_obj, key):
    return ' '.join(html_obj.xpath(f"//a[contains(.,'{key}')]")[0].text_content().split()).split()[0]

def to_number(data):
    if data[-1] == 'K' or data[-1] == 'k':
        return float(data[:-1]) * 1000
    if data[-1] == 'M' or data[-1] == 'm':
        return float(data[:-1]) * 1000000
    return float(data)

def parse_github_repo_home(html):
    html_obj = fromstring(html).cssselect('div.Layout-sidebar')[0]
    stars = get_a_text_w_text(html_obj, 'stars')
    watchers = get_a_text_w_text(html_obj, 'watching')
    forks = get_a_text_w_text(html_obj, 'forks')
    return {
        'stars': to_number(stars),
        'watchers': to_number(watchers),
        'forks': to_number(forks)
    }

def get_repo_data(repo_name):
    url = 'https://github.com/' + repo_name
    now = datetime.datetime.now(datetime.timezone.utc)
    html = download(url)
    res = {
        'datetime': str(now),
        'repo_name': repo_name,
    }
    if html is None:
        res['error'] = 'download_error'
        return res 
    try:
        data = parse_github_repo_home(html)
        res.update(data)
        return res
    except Exception as e:
        res['error'] = 'parse_error'
        return res

In [None]:
html = download(
    'https://github.com/aaronr/data_display', 
)

In [None]:
from lxml.html import fromstring

In [None]:
! pip install  --user lxml cssselect

In [None]:
html_obj = fromstring(html)

In [None]:
html_obj.cssselect('div.Layout-sidebar')

In [None]:
res = get_repo_data('facebook/react')

In [None]:
res

In [None]:
html_obj.xpath("//a[contains(.,'stars')]")[0].text_content()

In [None]:
html_obj.xpath("//a[contains(.,'watching')]")[0].text_content()

In [None]:
 get_a_text_w_text(html_obj, 'stars')

In [None]:
def get_a_text_w_text(html_obj, key):
    return ' '.join(html_obj.xpath(f"//a[contains(.,'{key}')]")[0].text_content().split()).split()[0]

def to_number(data):
    if data[-1] == 'K' or data[-1] == 'k':
        return float(data[:-1]) * 1000
    if data[-1] == 'M' or data[-1] == 'm':
        return float(data[:-1]) * 1000000
    return float(data)

def parse_github_repo_home(html):
    html_obj = fromstring(html).cssselect('div.Layout-sidebar')[0]
    stars = get_a_text_w_text(html_obj, 'stars')
    watchers = get_a_text_w_text(html_obj, 'watching')
    forks = get_a_text_w_text(html_obj, 'forks')
    return {
        'stars': to_number(stars),
        'watchers': to_number(watchers),
        'forks': to_number(forks)
    }

In [None]:
parse_github_repo_home(html)

In [None]:
from toolkit_run.util.io import pickle_load
repos_2015 = pickle_load(Path('/dataset/repositories_list_2015.pkl'))

In [None]:
data = []
for repo in tqdm(repos_2015):
    data.append(get_repo_data(repo))
    

In [None]:
data

In [None]:
float('0')