In [1]:
from sparkle import Scope

scope = Scope(executors=1)
scope.conf.set('spark.executor.cores', 1)
spark = scope.spark
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark WARN [Thread-3] spark.util.Utils - spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
Spark WARN [Thread-3] spark.util.Utils - spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
Spark WARN [Thread-3] apache.spark.ExecutorAllocationManager - Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
git_repos = [
    "https://github.com/asemchenko/Hotello-Spring.git",
#     "https://github.com/maxliaops/Java_Web_Examples.git",
    "https://github.com/eomjinyoung/JavaWebProgramming.git",
    "https://github.com/Tastenkunst/brfv4_javascript_examples.git",
    "https://github.com/cschneider4711/Marathon.git",
    "https://github.com/mikemelon/java-signin.git",
    "https://github.com/sonngotung/JWebMVC.git",
    "https://github.com/tsultana2/EducationalWebSite.git",
    "https://github.com/mikemelon/JavaWebEducation.git",
    "https://github.com/Ocryst/Web3JavascriptEducation.git",
    "https://github.com/mihail-petrov/netit-webdev-java.git",
    "https://github.com/infinity23/family-education-platform.git",
    "https://github.com/vasyukvv42/metabot.git",
    "https://github.com/vasyukvv42/anek-api-rest.git",
    "https://github.com/vasyukvv42/course-work-semester-6.git",
    "https://github.com/vasyukvv42/java-labs-semester-6.git",
]

In [3]:
from pyspark.sql import types as t


df = spark.createDataFrame(git_repos, t.StringType())

In [4]:
from pyspark.sql import functions as f
from git import Repo
from pathlib import Path
import shutil

schema = t.ArrayType(
    t.StructType([
        t.StructField('path', t.StringType(), False),
        t.StructField('content', t.StringType(), True)
    ])
)


@f.udf(returnType=schema)
def load_repo(value: str) -> list:
    tmp_dir = Path(value.rsplit('/', 1)[1][:-4])
    if not tmp_dir.is_dir():
        Repo.clone_from(value, tmp_dir)
    files = []
    for file_path in tmp_dir.rglob('*'):
        path = file_path.as_posix()
        if not file_path.is_file() or '.git/' in path:
            continue
        with open(file_path, 'r') as file:
            try:
                content = file.read()
            except:
                content = None
            files.append((path, content))
    shutil.rmtree(tmp_dir)
    return files

In [5]:
df_with_repo_content = (
    df
    .withColumn('repo_content', f.explode(load_repo('value')))
    .select(f.col('value').alias('repo'), 'repo_content.*')
    .repartition(200)
    .cache()
)

In [None]:
%%time

df_with_repo_content.toPandas()

[Stage 0:===>                                                    (12 + 1) / 192]

In [None]:
import json
import re

with open('pathCheckers.json') as file:
    path_checkers = json.load(file)
    for issue in path_checkers:
        del issue['id']
        issue['pattern'] = re.compile(issue['pattern'])

with open('contentCheckers.json') as file:
    content_checkers = json.load(file)
    for issue in content_checkers:
        del issue['id']
        issue['pattern'] = re.compile(issue['pattern'])

In [None]:
issue_schema = t.ArrayType(
    t.StructType([
        t.StructField('issueType', t.StringType(), False),
        t.StructField('issueDescription', t.StringType(), True),
        t.StructField('lineNumber', t.IntegerType(), True),
    ])
)


@f.udf(returnType=issue_schema)
def parse_issues(path: str, content: str):
    issues = [{
        'issueType': check['issue']['issueType'],
        'issueDescription': check['issue']['issueDescription'],
    } for check in path_checkers if check['pattern'].match(path)]
    if not content:
        return issues

    for line_number, line in enumerate(content.splitlines(), start=1):
        issues += [{
            'issueType': check['issue']['issueType'],
            'issueDescription': check['issue']['issueDescription'],
            'lineNumber': line_number,
        } for check in content_checkers if check['pattern'].match(line)]
    return issues

In [None]:
report_df = (
    df_with_repo_content
    .withColumn('issues', f.explode(parse_issues('path', 'content')))
    .select('repo', 'path', 'issues.*')
    .cache()
)

In [None]:
%%time

pd_report = report_df.toPandas()
pd_report

In [None]:
pd_report.to_json('report.json', orient='records')