In [None]:
import requests
import collections
import pandas as pd
from urllib.error import HTTPError
from urllib.parse import quote_plus
import os

# 0. Input parameters

In [None]:
modules_and_repos = {
    'SA': ['MON', 'POL', 'PLA'],
    'DevOps': ['osmclient'],
    'N2VC': ['N2VC'],
    'NBI': ['NBI'],
    'IM': ['IM'],
    'NG-UI': ['NG-UI'],
    'RO': ['RO', 'common'],
    'LCM': ['LCM']
}

repo_full_name_format = 'osm/{module}.git'
git_web_url_format = 'https://osm.etsi.org/gitweb/?p={repo_full_name};a=blob_plain;f={req_file_name};hb=refs/heads/{branch}'

default_branch = 'master'
# Options: 'master', 'v10.0', 'v9.0'

In [None]:
# Output spreadsheet with results of online license scan from repositories
output_online_scan_filename = 'OUTPUT - Online License Scan - OSM Dependencies.xlsx'

# Optional
#----------
# Input spreadsheet from static scans.
# Needs to be available (uploaded) in the base folder of the notebook environment.
static_input_filename = 'OSM Dependencies License Compatibility.xlsx'
output_static_scan_filename = 'OUTPUT - Static License Scan - OSM Dependencies.xlsx'

In [None]:
# Fixes to component licenses retrieved from online sources
license_corrections_table_file = 'License_corrections.csv'
license_corrections_table_url = 'License_corrections.csv' # FIXME:

# Mappings to normalize license names
license_mappings_file = 'License_mappings.csv'
license_mappings_file_url = 'License_mappings.csv' # FIXME

In [None]:
# If there is a local file, it takes it; otherwise, retrieves table from URL
if os.path.isfile(license_corrections_table_file):
  license_corrections_table = license_corrections_table_file
else:
  license_corrections_table = license_corrections_table_url

if os.path.isfile(license_mappings_file):
  license_mappings_table = license_mappings_file
else:
  license_mappings_table = license_mappings_url

# 1. Online analysis of merged repos

## Retrieve `requirements.txt` or `Package.json` from each relevant repo

In [None]:
# Uncomment for debugging pipelines, if needed
#
# def show_me(df):
#   display(df)
#   return df

In [None]:
def get_module_pip_components(repo, branch='master'):

  repo_full_name = repo_full_name_format.format(module=repo)
  git_web_url = git_web_url_format.format(req_file_name='requirements.txt', repo_full_name=repo_full_name, branch=branch)

  try:
    df = pd.read_table(git_web_url, header=None)
  except HTTPError:
    print(f'{repo} does not have a standard pip requirement list')
    return None

  return (
      df
      .rename(columns={0: 'component'})
      .assign(component = lambda x: x.component.str.strip())
      .assign(is_comment = lambda x: x.component.str.startswith('#'))
      .query('~ is_comment')
      .drop(columns='is_comment')
      .reset_index()
      ['component']
      .str.split('==', expand=True)
      .rename(columns={0: 'component', 1: 'version'})
  )

In [None]:
# Uncomment for testing
#
# branch = 'master'
# repo = 'RO'
# #repo = 'NG-UI' # Non-Pypi components, should throw errors
# get_module_pip_components(repo, branch)

In [None]:
def get_module_npm_components(repo, branch='master'):

  repo_full_name = repo_full_name_format.format(module=repo)

  git_web_url = git_web_url_format.format(req_file_name='package.json', repo_full_name=repo_full_name, branch=branch)

  r = requests.get(git_web_url)
  data = r.json()
  if r.status_code == 404:
    print(f'{repo} does not have a standard npm requirement list')
    return None

  components, versions = list(zip(*data['dependencies'].items()))

  return pd.DataFrame({'component': components, 'version': versions})

In [None]:
# Uncomment for testing
#
# branch = 'master'
# repo = 'NG-UI'
# get_module_npm_components(repo, branch)

In [None]:
def get_all_module_components(modules_and_repos, branch='master'):

  empty_df = pd.DataFrame(columns=['module', 'repo', 'component', 'version', 'source_type'])
  list_of_df = [empty_df]

  for module, repos in modules_and_repos.items():
    for repo in repos:
      print(f"\nRetrieving {module}'s repo {repo}...", end='')

      df = get_module_pip_components(repo, branch=branch)

      if df is not None:
        df['source_type'] = 'pip'

      else:
        print('\t retrying as npm package list...', end='')
        df = get_module_npm_components(repo, branch=branch)

        if df is not None:
          df['source_type'] = 'npm'

        else:
          df = pd.DataFrame(columns=['component', 'version', 'source_type'])

      df['module'] = module
      df['repo'] = repo

      list_of_df.append(df)

  return (
      pd.concat(list_of_df)
      .reset_index(drop=True)
  )

In [None]:
df_online_scan_input = get_all_module_components(modules_and_repos, default_branch)

In [None]:
display(df_online_scan_input)

# # Removes temporarily the display limits, for convenience
# default_max_rows = pd.options.display.max_rows
# pd.options.display.max_rows = 999

# display(df_online_scan_input.query("repo == 'RO'"))

# # Restores display defaults
# pd.options.display.max_rows = default_max_rows

## Methods to retrieve license information

In [None]:
df_online_scan_input

### `pip` packages

In [None]:
def retrieve_pip_license(component_name):

  license_info = collections.OrderedDict()
  license_info['component'] = component_name

  url = f"https://pypi.org/pypi/{component_name}/json"
  r = requests.get(url)

  # If it does not exist in PyPi, returns unknown
  if r.status_code == 404:
    license_info['osi_approved'] = None
    license_info['license'] = None
    return license_info

  data = r.json()
  line = [c for c in data['info']['classifiers'] if 'license' in c.lower()]

  if line:  # Commonest format
    fields = line[0].split(' :: ')
    license_info['osi_approved'] = ('OSI Approved' in fields)
    license_info['license'] = fields[-1]
  else:     # Alternative format
    license_info['osi_approved'] = None
    license_info['license'] = data['info'].get('license', None)

  return license_info

In [None]:
# Uncomment for testing the 3 known cases
#
# retrieve_pip_license('Jinja2')
# retrieve_pip_license('vcd-api-schemas-type')
# retrieve_pip_license('emitter-component')

### `npm` packages

In [None]:
def retrieve_npm_license(component_name):

  license_info = collections.OrderedDict()
  license_info['component'] = component_name
  license_info['osi_approved'] = None # NPMS does not provide this info

  safe_component_name = quote_plus(component_name)
  url = f"https://api.npms.io/v2/package/{safe_component_name}"

  r = requests.get(url)

  # If it does not exist in npm, returns unknown
  if r.status_code == 404:
    license_info['license'] = None
  else:
    data = r.json()
    license_info['license'] = data['collected']['metadata'].get('license', None)

  return license_info

In [None]:
# Uncoment for testing
#
# #component_name = 'angular-notifier'
# component_name = '@angular/flex-layout'

# retrieve_npm_license(component_name)

### Common procedures for license retrieval

In [None]:
def retrieve_license(component_name, source_type='pip'):
  if source_type == 'pip':
    return retrieve_pip_license(component_name)
  else: # We can safely try with npm (if unknown, it will return a safe dataframe)
    return retrieve_npm_license(component_name)

In [None]:
# Uncomment for testing all known cases
#
# retrieve_license('Jinja2')
# retrieve_license('vcd-api-schemas-type')
# retrieve_license('emitter-component')
# retrieve_license('@angular/flex-layout', 'npm')

In [None]:
def add_license_details(row):

  return (
      row
      .to_frame()
      .T
      .merge(
          pd.DataFrame(retrieve_license(row.component, row.source_type), index=[0]),
          how = 'left',
          left_on = 'component',
          right_on = 'component'
      )
      .iloc[0]
  )

## Results of online scan


### Populate and display results of online scan

In [None]:
# Fix licenses not reported to main sources, based on human-provided mapping table
def fix_not_reported_modules(df):
  df = df.copy()

  license_corrections = (
      pd.read_csv(license_corrections_table)
      [['Component', 'Real license']]
      .set_index('Component')
      ['Real license']
      .to_dict()
  )

  # Saves a copy of original license assessment, for trazability
  df['license_reported_by_source'] = df['license']

  # Fixes licenses that exist in the mapping
  df['license'] = (
      df['component']
      .map(license_corrections)
      .fillna(df['license'])  # Where there is no mapping, original is preserved
  )

  return df

In [None]:
def normalize_licenses(df):

  df = df.copy()

  license_mappings = (
      pd.read_csv(license_mappings_table)
      .set_index('License')
      ['Mapped License']
      .to_dict()
  )

  # Saves a copy of raw license assessment, for trazability
  df['license_before_normalization'] = df['license']

  # Normalizes licenses that exist in the mapping
  df['license'] = (
      df['license']
      .replace(license_mappings)
  )

  return df

In [None]:
df_output_with_licenses = (
    df_online_scan_input
    .apply(add_license_details, axis=1)

    # Fix module licenses not reported to sources:
    .pipe(fix_not_reported_modules)

    # Normalize license names
    .pipe(normalize_licenses)
)

In [None]:
# Removes temporarily the display limits, for convenience
default_max_rows = pd.options.display.max_rows
pd.options.display.max_rows = 999

display(df_output_with_licenses)

# Restores display defaults
pd.options.display.max_rows = default_max_rows

Licenses unknown by PyPi and NPM, or not identified as unreported:

In [None]:
df_output_with_licenses[df_output_with_licenses.license.isna()]

All detected licenses:

In [None]:
df_output_with_licenses.license.value_counts()

### Save results of online scan to spreadsheet

In [None]:
#df_output_with_licenses.set_index(['Module', 'Repo']).to_excel(output_filename_name)
df_output_with_licenses.to_excel(output_online_scan_filename, index=False)

# 2. Process static input spreadsheet for license scan

## Import and clean input spreadsheet

In [None]:
def load_input_table(filename):
  
  return (
      pd.read_excel(filename)
      .assign(Module = lambda x: x.Module.fillna(method='ffill'))
      .assign(Repo = lambda x: x.Repo.fillna(method='ffill'))
      .drop(columns = 'Unnamed: 6')
  )

In [None]:
def tidy_input_table(df_input):
  return(
      df_input
      .set_index(['Module', 'Repo'])
      .Component
      # .str.split('\n|,|(and)|/', expand=True)
      .str.split('\n|,|(and)', expand=True)
      .stack()
      .reset_index()
      .drop(columns='level_2')
      .rename(columns={0: 'module_and_version'})
      .set_index(['Module', 'Repo'])
      .module_and_version
      .str.strip()
      .reset_index()
      .assign(valid = lambda x: ~ x.module_and_version.str.contains('more'))
      .query('valid')
      .drop(columns='valid')
      .set_index(['Module', 'Repo'])
      .module_and_version
      .str.split(expand=True)
      .rename(columns={0: 'component', 1: 'version'})
      .dropna(subset=['component'])
      .query('component != "and"')
      # .assign(component = lambda x: x.component.str.strip(to_strip='@'))
      .assign(version = lambda x: x.version.str.strip(to_strip='()'))
      .reset_index()
  )

In [None]:
# Loads input XLSX table (if available)
# df_input = pd.DataFrame(columns=['Module', 'Repo', 'Component', 'version', 'source_type'])
# there_is_input_file = os.path.isfile(static_input_filename)
# if there_is_input_file:
#   df_input = load_input_table(static_input_filename)

try:
  df_input = load_input_table(static_input_filename)
except FileNotFoundError:
  df_input = None

In [None]:
df_input

In [None]:
# Tidies imported XLSX table
try: 
  df_tidy_input = tidy_input_table(df_input)
except AttributeError:
  df_tidy_input = df_input

In [None]:
df_tidy_input

## Results of static scan

### Populate and display results of static scan

In [None]:
# First guess: Assume all come from pip by default, except NG-UI, from npm
def assign_1st_guess_sources(df):
  df = df.copy()

  df['source_type'] = 'pip'
  df.loc[(df.Module == 'NG-UI'), 'source_type'] = 'npm'

  return df

In [None]:
try:
  df_output_with_licenses = (
      df_tidy_input

      # Assumes the most likely sources:
      .pipe(assign_1st_guess_sources)
      .apply(add_license_details, axis=1)

      # Fix known packages not reported to npm
      .pipe(fix_not_reported_modules)

      # Normalize license names
      .pipe(normalize_licenses)
  )
except AttributeError:
  df_output_with_licenses = None

In [None]:
# Removes temporarily the display limits, for convenience
dafault_max_rows = pd.options.display.max_rows
pd.options.display.max_rows = 999

display(df_output_with_licenses)

# Restores display defaults
pd.options.display.max_rows = dafault_max_rows

Unknown licenses:

In [None]:
try:
  df_output_with_licenses[df_output_with_licenses.license.isna()]
except AttributeError:
  display(None)

### Save results of static scan to spreadsheet

In [None]:
try:
  #df_output_with_licenses.set_index(['Module', 'Repo']).to_excel(output_static_scan_filename)
  df_output_with_licenses.to_excel(output_static_scan_filename, index=False)
except AttributeError:
  print('Nothing to save: No input file')