In [None]:
!pip install -U jk-pypiorgapi jk-json clean-text

In [None]:
!mkdir smart-repo_output

In [10]:
!pip freeze > requirements.txt

In [None]:
!ls -a

.  ..  .config	requirements.txt  sample_data  smart-repo_output


In [None]:
# run this at the first time to touch the files
import os.path

INDEXES = [
           "0","1","2","3","4","5",
           "6","7","8","9",
           "a","b","c","d","e","f",
           "g","h","i","j","k","l",
           "m","n","o","p","q","r",
           "s","t","u","v","w","x",
           "y","z"
]
for i in INDEXES:
  with open(os.path.join("smart-repo_output", f"pypi_api.data.{i}.json"), "a"):
    pass

In [None]:
import jk_pypiorgapi
import jk_json
import json
import logging
import cleantext.clean
import os

OUTPUT_PATH = "smart-repo_output"
BASE_FILE_NAME = "pypi_api.data."

INDEXES = [
           "0","1","2","3","4","5",
           "6","7","8","9",
           "a","b","c","d","e","f",
           "g","h","i","j","k","l",
           "m","n","o","p","q","r",
           "s","t","u","v","w","x",
           "y","z"
]

logging.basicConfig(
    filename='app.log',
    filemode='w', format='%(name)s - %(levelname)s - %(message)s'
)

COMMON_USELESS_INFO:list = ["UNKNOWN", "", "..."]
MINIMUM_DOC_LENGTH:int = 24
MINIMUM_SUMMARY_LENGTH:int = 16

api:object = jk_pypiorgapi.PyPiOrgAPI()

PACKAGES:list = api.listAllPackages()
logging.info(f'Collected {len(PACKAGES)} packages from PyPi')

def get_file_name(char:str) -> str:
  return BASE_FILE_NAME+char+".json"

def clean_data(text:str) -> str:
  return cleantext.clean(
      text,
      fix_unicode=True,
      to_ascii=True,
      lower=False,
      no_line_breaks=False,
      no_urls=False,
      no_emails=False,
      no_phone_numbers=False,
      no_numbers=False,
      no_digits=False,
      no_currency_symbols=False,
      no_punct=False,
      #replace_with_punct="",
      #replace_with_url="<URL>",
      #replace_with_email="<EMAIL>",
      #replace_with_phone_number="<PHONE>",
      #replace_with_number="<NUMBER>",
      #replace_with_digit="0",
      #replace_with_currency_symbol="<CUR>",
      lang="en"                  
    )

def fetch_data(packages:list, max_packages:int=-1) -> dict:
  
  num_of_fetched:int = 0
  num_of_no_fetched:int = 0
  packages_info:dict = {}
  
  for package in packages:
    
    package_name:str = package[1]
    package_data:dict = api.getPackageInfoJSON(package_name)

    try:
      package_info:dict = package_data["info"]
      description:str = clean_data(package_info["description"])
      summary:str = clean_data(package_info["summary"])
      if (
          len(description) > MINIMUM_DOC_LENGTH
          or len(summary) > MINIMUM_SUMMARY_LENGTH
          and description not in COMMON_USELESS_INFO
        ):
        packages_info[package_name] = {
            "author":package_info["author"],
            "classifiers":package_info["classifiers"],
            "description":description,
            "description_content_type":package_info["description_content_type"],
            "docs_url":package_info["docs_url"],
            "home_page":package_info["home_page"],
            "keywords":package_info["keywords"],
            "license":package_info["license"],
            "summary":summary,
            "requires_python":package_info["requires_python"],
            "version":package_info["version"],
            "vulnerabilities":package_data["vulnerabilities"]
        }
        num_of_fetched += 1
        
        if max_packages > 0:
          if num_of_fetched >= max_packages:
            break

    except TypeError:
      num_of_no_fetched += 1
    
  return packages_info, num_of_fetched, num_of_no_fetched


def save_data(data:dict, filename:str) -> None: 
  with open(os.path.join(OUTPUT_PATH, filename), "w") as fp:
    json.dump(data, fp)

def show_data(filename:str) -> None:
  with open(os.path.join(OUTPUT_PATH, filename), "r") as fp:
    jk_json.prettyPrint(json.load(fp))

def main() -> None:
  packages_with_info:int = 0
  packages_without_info:int = 0  

  for idx in INDEXES:
    res = input(f"Fetch with: {idx}?\n(Y) to do (S) to skip (N) to quit\n:[] ").lower()
    if res not in {"n","s"}:

      packages  = filter(lambda x: x[1].startswith(idx), PACKAGES)

      data, num_of_fetched, num_of_no_fetched = fetch_data(packages)
      packages_with_info += num_of_fetched
      packages_without_info += num_of_no_fetched

      filename = get_file_name(idx)
      save_data(data, filename)
      #show_data(filename)
      print(f"done with {idx}")
    
    elif res == "s":
      continue

    else:
      break

  logging.warning(f'Were found {packages_without_info} packages without info')
  logging.warning(f'Were found {packages_with_info} packages with info')

if __name__ == "__main__":
  main()

In [None]:
import jk_json
import json
import os

OUTPUT_PATH = "smart-repo_output"

def show_data(filename:str) -> None:
  with open(filename, "r") as fp:
    jk_json.prettyPrint(json.load(fp))

show_data(os.path.join(OUTPUT_PATH, "pypi_api.data.x.json"))