## Scraping Data from CodeClimate


In [115]:
import json
import os
from dotenv import load_dotenv
import subprocess as sp
import pandas as pd
from collections import defaultdict
from datetime import datetime

In [116]:
load_dotenv()

True

In [117]:
github_token = os.getenv("GITHUB_ACCESS_TOKEN")
sonarqube_token = os.getenv("SONARQUBE_ACCESS_TOKEN_VUE")
sonarqube_server_url = os.getenv("SONARQUBE_SERVER_URL")

In [118]:
demo_github_slug = "vuejs"
demo_repo_name = "vue"
demo_repo_path = "C:/SWE Class/Github Desktop/TestProjects/vue"

In [119]:
clean_code_attribute_categories = [
    "RESPONSIBLE",
    "ADAPTABLE",
    "CONSISTENT",
    "INTENTIONAL",
]

severities = ["LOW", "MEDIUM", "HIGH"]

quality_attributes = [
    "RELIABILITY",
    "SECURITY",
    "MAINTAINABILITY",
]

code_metrics = [
    "sqale_debt_ratio",
    "vulnerabilities",
    "security_rating",
    "sqale_rating",
    "complexity",
    "cognitive_complexity",
    "ncloc",
]

indexing_fields = ["version", "release_date", "timestamp"]

code_metrics_query_string = ",".join(code_metrics)
# code_metrics_query_string

In [120]:
metrics_dict = defaultdict(list)

for metric in indexing_fields + code_metrics:
    metrics_dict[metric] = []

metrics_dict

defaultdict(list,
            {'version': [],
             'release_date': [],
             'timestamp': [],
             'sqale_debt_ratio': [],
             'vulnerabilities': [],
             'security_rating': [],
             'sqale_rating': [],
             'complexity': [],
             'cognitive_complexity': [],
             'ncloc': []})

In [121]:
clean_code_attribute_dict = defaultdict()

for category in clean_code_attribute_categories:
    clean_code_attribute_dict[category] = defaultdict(list)

clean_code_attribute_dict

defaultdict(None,
            {'RESPONSIBLE': defaultdict(list, {}),
             'ADAPTABLE': defaultdict(list, {}),
             'CONSISTENT': defaultdict(list, {}),
             'INTENTIONAL': defaultdict(list, {})})

In [122]:
sonarqube_token, sonarqube_server_url

('sqp_6a56b3fba4ce9fb8a298c75695285615a47d87dc', 'http://localhost:9000')

## Git CLI Data Extraction


In [123]:
def get_tag_list(repo_path: str, max_output: int = 20) -> list[str]:
    # Subprocess to get the git directory
    sp.run(
        ["git", "rev-parse", "--git-dir"],
        cwd=repo_path,
        capture_output=True,
        check=True,
    )

    # Subprocess to get the git tags in the repo
    result = sp.run(
        ["git", "tag", "-l"],
        cwd=repo_path,
        capture_output=True,
        check=True,
    )

    tag_list = result.stdout.decode("utf-8").split("\n")
    tag_list = [tag for tag in tag_list if tag != ""]

    if max_output is not None and len(tag_list) > max_output:
        tag_list = tag_list[-max_output:]

    return tag_list

In [124]:
def get_tag_and_timestamp(repo_path: str) -> list[tuple[str, str, int]]:
    tag_list = get_tag_list(repo_path, None)
    tag_and_timestamp = []

    for tag in tag_list:
        result = sp.run(
            ["git", "log", "-1", "--format=%cd %ct", "--date=short", tag],
            cwd=repo_path,
            capture_output=True,
            check=True,
            text=True,
        )

        date, timestamp = result.stdout.strip().split()
        tag_and_timestamp.append((tag, date, int(timestamp)))
        # print(result.stdout.strip())
    return tag_and_timestamp

In [125]:
def get_version_tags_by_year(data: list[tuple[str, str, int]], max_per_year=5):
    versions_by_year = defaultdict(list)
    datetime_converted_data = [
        (version, datetime.strptime(date, "%Y-%m-%d"), timestamp)
        for version, date, timestamp in data
    ]

    for version, release_date, timestamp in datetime_converted_data:
        year = release_date.year
        versions_by_year[year].append((version, release_date, timestamp))

    selected_versions = []

    for year in sorted(versions_by_year.keys()):
        versions_of_this_year = versions_by_year[year]
        selected_versions.extend(
            versions_of_this_year[: min(max_per_year, len(versions_of_this_year))]
        )

    return [
        (version, release_date.strftime("%Y-%m-%d"), timestamp)
        for version, release_date, timestamp in selected_versions
    ]

In [126]:
tags_and_timestamps = get_version_tags_by_year(get_tag_and_timestamp(demo_repo_path))
tags_and_timestamps

[('0.6.0', '2013-12-07', 1386462737),
 ('v0.7.0', '2013-12-23', 1387855865),
 ('v0.7.1', '2013-12-24', 1387922313),
 ('v0.7.2', '2013-12-28', 1388209420),
 ('0.10.0-rc', '2014-03-19', 1395263823),
 ('0.11.0', '2014-11-06', 1415325162),
 ('0.11.0-rc', '2014-09-26', 1411781027),
 ('0.11.0-rc2', '2014-10-07', 1412701646),
 ('0.11.0-rc3', '2014-10-24', 1414123890),
 ('0.11.10', '2015-05-07', 1431025236),
 ('0.11.5', '2015-02-05', 1423171588),
 ('0.11.6', '2015-04-18', 1429341079),
 ('0.11.7', '2015-04-20', 1429586386),
 ('0.11.8', '2015-04-21', 1429647040),
 ('v1.0.14', '2016-01-11', 1452543146),
 ('v1.0.14-csp', '2016-01-11', 1452544748),
 ('v1.0.15', '2016-01-18', 1453146224),
 ('v1.0.15-csp', '2016-01-18', 1453146702),
 ('v1.0.16', '2016-01-30', 1454146899),
 ('v2.1.10', '2017-01-17', 1484673432),
 ('v2.1.9', '2017-01-16', 1484610487),
 ('v2.2.0', '2017-02-25', 1488083295),
 ('v2.2.0-beta.1', '2017-02-23', 1487910141),
 ('v2.2.0-beta.2', '2017-02-24', 1487980870),
 ('v2.5.14', '2018-03-

In [127]:
# Checkout to a specific tag and show the output of the operation
def checkout_to_tag(repo_path: str, tag: str):
    result = sp.run(
        ["git", "checkout", tag],
        cwd=repo_path,
        capture_output=True,
        check=True,
        text=True,
    )
    print(result.stdout.strip())

In [128]:
# Checkout to a specific tag use sonar-scanner to scan the project
def sonar_scan(repo_path: str, tag: str):
    result = sp.run(
        [
            "sonar-scanner",
            "-D",
            f"sonar.projectBaseDir={demo_repo_path}",
            "-D",
            "sonar.projectVersion=" + tag,
            "-D",
            f"sonar.projectKey={demo_repo_name}",
            "-D",
            "sonar.sources=.",
            "-D",
            f"sonar.host.url={sonarqube_server_url}",
            "-D",
            "sonar.login=" + sonarqube_token,
        ],
        cwd=repo_path,
        capture_output=True,
        check=True,
        text=True,
        shell=True,
    )
    # print(result.stdout)

In [129]:
# Call the sonarqube api to get the metrics of the project
def get_metrics() -> dict:
    result = sp.run(
        [
            "curl.exe",
            f"{sonarqube_server_url}/api/measures/component?component={demo_repo_name}&metricKeys={code_metrics_query_string}",
        ],
        capture_output=True,
        check=True,
        text=True,
        # shell=True,
    )
    return json.loads(result.stdout)

In [130]:
# Get metrics of the project
def get_metrics_from_version_tags(
    tag: str, date: str, timestamp: int, output_dict=metrics_dict
):
    metrics_result = get_metrics()

    output_dict["version"].append(tag)
    output_dict["release_date"].append(date)
    output_dict["timestamp"].append(timestamp)

    for metric in metrics_result["component"]["measures"]:
        output_dict[metric["metric"]].append(metric["value"])

    return output_dict

In [131]:
# A mock call to test sonar-scanner
sp.run(
    ["sonar-scanner", "-h"],
    capture_output=True,
    check=True,
    text=True,
    shell=True,
)

CompletedProcess(args=['sonar-scanner', '-h'], returncode=0, stdout='INFO: \nINFO: usage: sonar-scanner [options]\nINFO: \nINFO: Options:\nINFO:  -D,--define <arg>     Define property\nINFO:  -h,--help             Display help information\nINFO:  -v,--version          Display version information\nINFO:  -X,--debug            Produce execution debug output\n', stderr='')

## Issues Narrowing from SonarQube


In [132]:
# A function that can convert given time string into minutes. The time string may include hours, minutes and seconds. It is in the following format: "1d2h3min". The days, hours or minutes may be missing. If the string is empty, return 0.


def time_str_to_minutes(time: str) -> int:
    if time == "":
        return 0

    time = time.replace("d", " ").replace("h", " ").replace("min", " ").split()
    time = [int(t) for t in time]

    if len(time) == 1:
        return time[0]
    elif len(time) == 2:
        return time[0] * 60 + time[1]
    elif len(time) == 3:
        return time[0] * 24 * 60 + time[1] * 60 + time[2]
    else:
        raise ValueError("Invalid time string")

In [133]:
def get_quality_issue_values_from_api(clean_code_category: str, severity: str):
    """
    return a tuple of (total_security_issues, total_reliability_issues, total_maintainability_issues, total_debt)
    """

    result = sp.run(
        [
            "curl.exe",
            f"{sonarqube_server_url}/api/issues/search?cleanCodeAttributeCategories={clean_code_category}",
        ],
        capture_output=True,
        check=True,
        text=True,
        # shell=True,
    )

    issues_list = json.loads(result.stdout)["issues"]

    # Get the software quality, maintainability and debt from the issues list
    filtered_issues_list = [
        {
            "software_quality_category": issue["impacts"][0]["softwareQuality"],
            "severity": issue["impacts"][0]["severity"],
            "debt": time_str_to_minutes(issue["debt"]),
        }
        for issue in issues_list
        if issue["type"] == "CODE_SMELL" and issue["impacts"][0]["severity"] == severity
    ]

    # Get the total number of security, reliability and maintainability quality issue count and debt
    total_security_issues = len(
        [
            issue
            for issue in filtered_issues_list
            if issue["software_quality_category"] == "SECURITY"
        ]
    )
    total_reliability_issues = len(
        [
            issue
            for issue in filtered_issues_list
            if issue["software_quality_category"] == "RELIABILITY"
        ]
    )
    total_maintainability_issues = len(
        [
            issue
            for issue in filtered_issues_list
            if issue["software_quality_category"] == "MAINTAINABILITY"
        ]
    )

    total_debt = sum([issue["debt"] for issue in filtered_issues_list])

    return (
        total_security_issues,
        total_reliability_issues,
        total_maintainability_issues,
        total_debt,
    )

In [134]:
def extract_clean_code_data(tag: str, date: str, timestamp: int):
    for clean_code_category in clean_code_attribute_categories:
        clean_code_attribute_dict[clean_code_category]["version"].append(tag)
        clean_code_attribute_dict[clean_code_category]["date"].append(date)
        clean_code_attribute_dict[clean_code_category]["timestamp"].append(timestamp)

        for severity in severities:
            (
                total_security_issues,
                total_reliability_issues,
                total_maintainability_issues,
                total_debt,
            ) = get_quality_issue_values_from_api(clean_code_category, severity)

            clean_code_attribute_dict[clean_code_category][
                f"security_issues_{severity.lower()}"
            ].append(total_security_issues)

            clean_code_attribute_dict[clean_code_category][
                f"reliability_issues_{severity.lower()}"
            ].append(total_reliability_issues)

            clean_code_attribute_dict[clean_code_category][
                f"maintainability_issues_{severity.lower()}"
            ].append(total_maintainability_issues)

            clean_code_attribute_dict[clean_code_category][
                f"total_debt_{severity.lower()}"
            ].append(total_debt)

In [135]:
for tag_and_timestamp in tags_and_timestamps[:1]:
    tag, date, _ = tag_and_timestamp
    checkout_to_tag(demo_repo_path, tag)
    sonar_scan(demo_repo_path, tag)
    get_metrics_from_version_tags(*tag_and_timestamp)
    extract_clean_code_data(*tag_and_timestamp)




In [136]:
metrics_dataset = pd.DataFrame(metrics_dict, index=None)
metrics_dataset

Unnamed: 0,version,release_date,timestamp,sqale_debt_ratio,vulnerabilities,security_rating,sqale_rating,complexity,cognitive_complexity,ncloc
0,0.6.0,2013-12-07,1386462737,1.6,0,1.0,1.0,1274,672,6820


In [None]:
metrics_dataset.to_csv(f"../Datasets/RQ1/{demo_repo_name}_metrics.csv")

In [137]:
for clean_code_attribute, attribute_dict in clean_code_attribute_dict.items():
    dataframe = pd.DataFrame(attribute_dict, index=None)
    print(dataframe)
    dataframe.to_csv(
        f"../Datasets/RQ2/{demo_repo_name}/{demo_repo_name}_{clean_code_attribute.lower()}_issues.csv",
        index=False,
    )

  version        date   timestamp  security_issues_low  \
0   0.6.0  2013-12-07  1386462737                    0   

   reliability_issues_low  maintainability_issues_low  total_debt_low  \
0                       0                           0               0   

   security_issues_medium  reliability_issues_medium  \
0                       0                          0   

   maintainability_issues_medium  total_debt_medium  security_issues_high  \
0                              0                  0                     0   

   reliability_issues_high  maintainability_issues_high  total_debt_high  
0                        0                            0                0  
  version        date   timestamp  security_issues_low  \
0   0.6.0  2013-12-07  1386462737                    0   

   reliability_issues_low  maintainability_issues_low  total_debt_low  \
0                       0                           0               0   

   security_issues_medium  reliability_issues_medium  

In [138]:
# metrics_dataset.to_csv(f"../Datasets/{demo_repo_name}_metrics.csv", index=False)

In [139]:
# for clean_code_attribute, attribute_dict in clean_code_attribute_dict.items():
#     dataframe = pd.DataFrame(attribute_dict, index=None)
#     dataframe.to_csv(
#         f"../Datasets/{demo_repo_name}/{demo_repo_name}_{clean_code_attribute.lower()}_issues.csv",
#         index=False,
#     )

In [140]:
# # For each version, get the total number of security, reliability and maintainability quality issue count and debt and store them in the corrsponding dictionary based on the clean code category

# for tag_and_timestamp in tags_and_timestamps:
#     tag, date, timestamp = tag_and_timestamp
#     checkout_to_tag(demo_repo_path, tag)
#     sonar_scan(demo_repo_path, tag)

#     for clean_code_category in clean_code_attribute_categories:
#         clean_code_attribute_dict[clean_code_category]["version"].append(tag)
#         clean_code_attribute_dict[clean_code_category]["date"].append(date)
#         clean_code_attribute_dict[clean_code_category]["timestamp"].append(timestamp)

#         for severity in severities:
#             (
#                 total_security_issues,
#                 total_reliability_issues,
#                 total_maintainability_issues,
#                 total_debt,
#             ) = get_quality_issue_values_from_api(clean_code_category, severity)

#             clean_code_attribute_dict[clean_code_category][
#                 f"security_issues_{severity.lower()}"
#             ].append(total_security_issues)

#             clean_code_attribute_dict[clean_code_category][
#                 f"reliability_issues_{severity.lower()}"
#             ].append(total_reliability_issues)

#             clean_code_attribute_dict[clean_code_category][
#                 f"maintainability_issues_{severity.lower()}"
#             ].append(total_maintainability_issues)

#             clean_code_attribute_dict[clean_code_category][
#                 f"total_debt_{severity.lower()}"
#             ].append(total_debt)