In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("github_commit_analysis").getOrCreate()

df = spark.read.csv("../data/full.csv", inferSchema=True, header=True)
print(df.printSchema())

In [None]:
# 1
from pyspark.sql import functions as F

df_non_null = df.filter(df.repo.isNotNull())
df_grouped = df_non_null.groupBy("repo").agg(F.count("*").alias("commit_count")).orderBy(F.desc("commit_count")).limit(10)

df_grouped.show(truncate=100)

In [None]:
# 2
df_spark = df_non_null.filter(df_non_null.repo == 'apache/spark')
# df_spark.cache()
df_spark_grouped = df_spark.groupBy("author").agg(F.count("*").alias("commit_count")).orderBy(F.desc("commit_count"))
df_spark_grouped_top = df_spark_grouped.limit(1)

df_spark_grouped_top.show(truncate=100)

In [None]:
from pyspark.sql.functions import regexp_extract,to_timestamp, expr, col, concat_ws, slice, split

# 3
# Mon Apr 19 20:38:03 2021 +0100
date_format = "MMM d HH:mm:ss yyyy Z"

#df_spark.show(truncate=False)
#df_spark_copy = df_spark.limit(20)
#df_spark.show(truncate=False)
#df_spark_extract = df_spark.withColumn("date", regexp_extract(df_spark["date"], "(\\w{3} \\d{1,2} \\d{2}:\\d{2}:\\d{2} \\d{4} (\\+|\\-)\\d{4})", 1))
df_spark_extract = df_spark.withColumn("date", concat_ws(" ", slice(split(df_spark["date"], " "), 2, int(1e9))))


df_spark_extract.show(truncate=False)
# Convertir la colonne 'date' en timestamp
df_convert = df_spark_extract.withColumn("date", to_timestamp(col("date"), date_format))
#df_convert.show(truncate=False)
df_spark_four_year = df_convert.filter((df_convert.date >= expr("date_sub(current_date(), 4 * 365)")))
df_spark_four_year.orderBy(F.asc("date")).show(truncate=False)

df_spark_four_year_grouped = df_spark_four_year.groupBy("author").agg(F.count("*").alias("commit_count")).orderBy(F.desc("commit_count"))
 
#print(df_time.printSchema())
df_spark_four_year_grouped.show(truncate=False)

In [None]:
# 4
from pyspark.sql import functions as F
from pyspark.ml.feature import StopWordsRemover

stop_word_remover = StopWordsRemover()
stop_word_remover.setInputCol("words_split")
stop_word_remover.setOutputCol("no_stop_words")

df_non_null = df.filter(df.repo.isNotNull()).filter(df.message.isNotNull())
df_grouped = df_non_null.withColumn('words_split', F.split(df_non_null.message, " "))
df_grouped = stop_word_remover.transform(df_grouped)
df_grouped = df_grouped.withColumn('word', F.explode(df_grouped.no_stop_words))
df_grouped = df_grouped.filter(df_grouped.word != '')
df_grouped = df_grouped.groupBy("word").agg(F.count("word").alias("word_count")).orderBy(F.desc("word_count")).limit(10)
df_grouped.show(truncate=100)