## Reading Git Final Project

In [None]:
import os
import subprocess
import datetime
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import functions as F
from pyspark.sql.types import *

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

In [None]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [None]:
gcs_folder = 'gs://msca-bdp-data-open/final_project_git'

#### Check data size in GCS

In [None]:
cmd = 'gsutil du -s -h ' + gcs_folder

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (f'Total directory size: {line}')
    
retval = p.wait() # Wait for the child process to terminate.

### Read Git data from GCS

#### Languages
Programming languages by repository as reported by GitHub's https://developer.github.com/v3/repos/#list-languages API

In [None]:
%%time   
    
df_languages = spark.read.parquet(os.path.join(gcs_folder, 'languages'))
print(f'Records read from dataframe *languages*: {df_languages.count():,.0f}')

In [None]:
df_languages.printSchema()

#### Licenses
Open source license SPDX code for each repository as detected by https://developer.github.com/v3/licenses/

In [None]:
%%time   
    
df_licenses = spark.read.parquet(os.path.join(gcs_folder, 'licenses'))
print(f'Records read from dataframe *licenses*: {df_licenses.count():,.0f}')

In [None]:
df_licenses.printSchema()

#### Commits
Unique Git commits from open source repositories on GitHub, pre-grouped by repositories they appear in.

In [None]:
%%time   
    
df_commits = spark.read.parquet(os.path.join(gcs_folder, 'commits'))
print(f'Records read from dataframe *commits*: {df_commits.count():,.0f}')

In [None]:
df_commits.printSchema()

#### Contents
Unique file contents of text files under 1 MiB on the HEAD branch.  
Can be joined to `files` dataset using the id columns to identify the repository and file path.

In [None]:
%%time   
    
df_contents = spark.read.parquet(os.path.join(gcs_folder, 'contents'))
print(f'Records read from dataframe *commits*: {df_contents.count():,.0f}')

In [None]:
df_contents.printSchema()

#### Files
File metadata for all files at HEAD.  
Join with `contents` dataset on id columns to search text.

In [None]:
%%time   
    
df_files = spark.read.parquet(os.path.join(gcs_folder, 'files'))
print(f'Records read from dataframe *files*: {df_files.count():,.0f}')

In [None]:
df_files.printSchema()

## EDA

### What is the timeline of the data?

#### Identify peaks and valleys in commit activity.
#### Detect data gaps and unexpected spikes.
#### Use visualization (e.g., Matplotlib, Seaborn, Plotly, Databricks display function)

In [None]:
import matplotlib.pyplot as plt

# Convert timestamps and aggregate data over time
commits_df = commits_df.withColumn("date", col("commit_date").cast("date"))
timeline_df = commits_df.groupBy("date").count().toPandas()

plt.figure(figsize=(12,6))
plt.plot(timeline_df["date"], timeline_df["count"], label="Commits Over Time")
plt.xlabel("Date")
plt.ylabel("Number of Commits")
plt.title("GitHub Commits Timeline")
plt.legend()
plt.show()

### Most popular programming languages on GitHub

#### Analyze the frequency of different languages from the Languages dataset.

In [None]:
languages_df = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/languages")
top_languages_df = languages_df.groupBy("language").sum("bytes").orderBy("sum(bytes)", ascending=False)
top_languages_df.show(10)

### Trending Programming Languages Over Time

In [None]:
from pyspark.sql.functions import col, year, sum

# Load languages data
languages_df = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/languages")

# Extract year from the repository timestamps
languages_df = languages_df.withColumn("year", year(col("timestamp")))

# Aggregate language usage by year
language_trend_df = (
    languages_df.groupBy("year", "language")
    .agg(sum("bytes").alias("total_bytes"))
    .orderBy("year", "total_bytes", ascending=[True, False])
)

# Show trends
language_trend_df.show(20)


### Most Popular Repositories (Commit Volume, Stars, Forks)

In [None]:
from pyspark.sql.functions import count

# Load commits data
commits_df = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/commits")

# Count commits per repository
repo_popularity_df = (
    commits_df.groupBy("repo_name")
    .agg(count("*").alias("commit_count"))
    .orderBy(col("commit_count").desc())
)

# Show top repositories by commit volume
repo_popularity_df.show(10)

### Identifying AI & Data Science Repositories

In [None]:
from pyspark.sql.functions import lower

# Define AI-related keywords
ai_keywords = ["machine learning", "deep learning", "pytorch", "tensorflow", "scikit-learn", "ml", "ai"]

# Filter repositories with AI keywords in their name or description
ai_repos_df = (
    commits_df.filter(
        col("repo_name").rlike("|".join(ai_keywords)) | col("message").rlike("|".join(ai_keywords))
    )
)

# Show AI-related repositories
ai_repos_df.select("repo_name", "message").show(20, truncate=False)


### Most Common Commit Reasons (NLP Analysis)

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Load commit messages
commit_msgs_df = commits_df.select("message").dropna()

# Tokenize words
tokenizer = Tokenizer(inputCol="message", outputCol="words")
words_data = tokenizer.transform(commit_msgs_df)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_words = remover.transform(words_data)

# Convert to numerical features
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
featurized_data = hashing_tf.transform(filtered_words)

idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

# Show keyword importance
rescaled_data.select("filtered_words", "features").show(10, truncate=False)

### Most Influential Committers (Top Contributors & Commit Distribution)

In [None]:
from pyspark.sql.functions import count

# Count commits per author
top_committers_df = (
    commits_df.groupBy("author_name")
    .agg(count("*").alias("commit_count"))
    .orderBy(col("commit_count").desc())
)

# Show top 10 committers
top_committers_df.show(10)

In [None]:
import matplotlib.pyplot as plt

# Convert to Pandas for visualization
top_committers_pd = top_committers_df.limit(20).toPandas()

plt.figure(figsize=(12,6))
plt.bar(top_committers_pd["author_name"], top_committers_pd["commit_count"])
plt.xlabel("Committer")
plt.ylabel("Number of Commits")
plt.xticks(rotation=45)
plt.title("Top 20 Most Prolific Committers")
plt.show()

### Commit Message Uniqueness Analysis (LSH for Similarity Detection)

In [None]:
from pyspark.ml.feature import CountVectorizer, MinHashLSH

# Tokenize words from commit messages
tokenizer = Tokenizer(inputCol="message", outputCol="words")
words_data = tokenizer.transform(commits_df.select("message"))

# Convert words into feature vectors
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=5000, minDF=5)
cv_model = cv.fit(words_data)
vectorized_df = cv_model.transform(words_data)

# Apply LSH to detect similarity
lsh = MinHashLSH(inputCol="features", outputCol="hashValues", numHashTables=5)
lsh_model = lsh.fit(vectorized_df)

# Finding duplicate commit messages
duplicates_df = lsh_model.approxSimilarityJoin(vectorized_df, vectorized_df, 0.8, distCol="JaccardDistance")

duplicates_df.select("datasetA.message", "datasetB.message", "JaccardDistance").show(10, truncate=False)