In [None]:
import os, sys

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import min, lit, lower, trim, split, sum, col, desc, when, date_format, log, log10, element_at, regexp_replace, collect_list
from pyspark.sql import SparkSession

import matplotlib.pyplot as plt

In [None]:
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

In [None]:
schema_aliases = StructType([
    StructField('email',
                StringType(), True),
    StructField('alias',
                StringType(), True)
])

aliases_df = spark.read.schema(schema_aliases).csv("emails/aliases.csv", header=True)

In [None]:
files = os.listdir('./commits/')

In [None]:
schema = StructType([
    StructField('commit-hash',
                StringType(), True),
    StructField('commit-authored-datetime',
                TimestampType(), True),
    StructField('commit-author-email',
                StringType(), True),
    StructField('commit-committed-datetime',
                TimestampType(), True),
    StructField('commit-committer-email',
                StringType(), True),
    StructField('file-name',
                StringType(), True),
    StructField('file-insertions',
                DoubleType(), True),
    StructField('file-deletions',
                DoubleType(), True)
])

In [None]:
combined_df = None

for file in files:
    df = spark.read.schema(schema).csv("commits/" + file, header=True)
    df = df.withColumn("source", lit(file.split('.')[0]))

    if combined_df is None:
        combined_df = df
    else:
        combined_df = combined_df.union(df)

combined_df.show()

In [None]:
consolidated_df = combined_df \
    .join(aliases_df.alias("a1"), trim(lower(col("commit-committer-email"))) == col("a1.email"), "left") \
    .join(aliases_df.alias("a2"), trim(lower(col("commit-author-email"))) == col("a2.email"), "left") \
    .select(combined_df["*"], col("a1.alias").alias("committer"), col("a2.alias").alias("author")) \
    .drop(col("commit-committer-email")) \
    .drop(col("commit-author-email")) \
    .filter(col("committer").isNotNull()) \
    .filter(col("author").isNotNull())

# Use case 0
## Commit size

In [None]:
df_human = combined_df.filter((col('file-deletions') < 100000) & (col('file-insertions') < 100000))
insertions = df_human.select(col('file-insertions')).toPandas()
deletions = df_human.select(col('file-deletions')).toPandas()

plt.scatter(insertions, deletions)

In [None]:
changes = combined_df \
    .withColumn('file-changes', col('file-insertions') + col('file-deletions')) \
    .select(col('file-changes')) \
    .filter(col('file-changes') < 100000) \
    .toPandas()

plt.hist(changes, bins=100, color='skyblue', edgecolor='black', log=True)

changes

In [None]:
# Import the necessaries libraries
import plotly.offline as pyo
import plotly.graph_objs as go
# Set notebook mode to work in offline
pyo.init_notebook_mode()

In [None]:
combined_df \
    .withColumn('file-changes', log10(col('file-insertions') + col('file-deletions'))) \
    .select(col('file-changes')) \
    .filter(col('file-changes') < 100000) \
    .plot.hist(bins=30)

# Use case 1
## Commits per month

In [None]:
commits_per_month_df = combined_df \
    .select(col("commit-committed-datetime")) \
    .withColumn("commit-committed-month", date_format(col("commit-committed-datetime"), "yyyy-MM")) \
    .groupBy(col("commit-committed-month")) \
    .count() \
    .orderBy(col("commit-committed-month"))

In [None]:
# Import the necessaries libraries
import plotly.offline as pyo
import plotly.graph_objs as go
# Set notebook mode to work in offline
pyo.init_notebook_mode()

commits_per_month_df.plot.bar(y="count", x="commit-committed-month")

# Use case 2
## Most productive committers all time/ per year

In [None]:
# .filter(date_format(col("commit-committed-datetime"), "yyyy-MM") == "2025-01") \

consolidated_df \
    .select(col("commit-committed-datetime"), col("committer")) \
    .groupBy(col("committer")) \
    .count() \
    .orderBy(col("count").desc()) \
    .plot.bar(y="count", x="committer", )

In [None]:
consolidated_df \
    .select(col("commit-committed-datetime"), col("committer")) \
    .filter(date_format(col("commit-committed-datetime"), "yyyy") == "2025") \
    .groupBy(col("committer")) \
    .count() \
    .orderBy(col("count").desc()) \
    .plot.bar(y="count", x="committer")

# Use case 3
## LOCs per languages per repo

In [None]:
source = "<repo>"

consolidated_df \
    .select(col("source"), col("file-name"), col("file-insertions"), col("file-deletions")) \
    .filter(col("source") == source) \
    .withColumn("file-lines", col("file-insertions") - col("file-deletions")) \
    .withColumn("lang-arr",  split(col("file-name"), "\\.")) \
    .withColumn("lang", regexp_replace(lower(element_at(col("lang-arr"), -1)), "[^\\w]", "")) \
    .groupBy(col("lang")) \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

# Use case 4
## ML

Classification problem?

Given changed files and the amount of changes, try to predict the most likely author alias

In [None]:
# how many different files in repo?

source = "<repo>"

consolidated_df \
    .filter(col("source") == source) \
    .select(col("file-name")) \
    .distinct() \
    .count()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import VectorAssembler

In [None]:
data = consolidated_df \
    .filter(col("source") == source) \
    .select(col("commit-hash"), col("file-name"), col("committer")) \
    .groupBy(col("commit-hash")) \
    .agg(collect_list("file-name").alias("files"), min("committer").alias("committer"))

In [None]:
cv = CountVectorizer(inputCol="files", outputCol="features")

model = cv.fit(data)

result = model.transform(data).select(col("commit-hash"), col("features"), col("committer"))

si = StringIndexer(inputCol="committer", outputCol="committer_i").fit(result)

its = IndexToString(inputCol="prediction", outputCol="predicted", labels=si.labels)

#.sample(fraction=.1) \
trainDF, testDF = \
    result \
        .randomSplit([.8, .2], seed=42)

lr = RandomForestClassifier(featuresCol = 'features', labelCol = 'committer_i')

pipeline = Pipeline(stages = [si, lr, its])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "committer", "predicted").show(10)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="committer_i", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predDF)
print(f"ACCURACY: {accuracy}")

evaluator.setMetricName("logLoss")
logLoss = evaluator.evaluate(predDF)
print(f"LOGLOSS: {logLoss}")