In [24]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# git commits
gitCommitsdf = spark.read.csv('git-log-all.csv', header=True)
#gitCommitsdf.take(1)


In [25]:
# keep only required columns. i.e commit,author,date,insertions,deletions
unique_git_commit_with_required_cols_df = gitCommitsdf.select("commit","author","date","insertions","deletions")
#git_commit_with_required_cols_df.take(5)

In [26]:
# drop duplicates 
# quesiton : does resuffle happens internally? I guess it does not happen in just one partition.[inefficient]
#unique_git_commit_with_required_cols_df = git_commit_with_required_cols_df.dropDuplicates(subset = ['commit'])
#print('Original record count = ', git_commit_with_required_cols_df.count())
#print('Dropped record count = ', unique_git_commit_with_required_cols_df.count())
#unique_git_commit_with_required_cols_df.filter(unique_git_commit_with_required_cols_df.author=="Mohammed@talentica-all.com").show(truncate=False)

In [27]:
#convert date to bit integer
#Fri Mar 12 00:00:43 2021 +0530
def binary_rep(datetimestamp):
    loc = int(datetimestamp.split(" ")[2])-1
    return 1<<loc
#print(binary_rep("Fri Mar 12 00:00:43 2021 +0530"))

In [28]:
# month field funtion
#Fri Mar 12 00:00:43 2021 +0530
def get_month_year(datetimestamp):
    day = 1
    datetimestampArray = datetimestamp.split(" ")
    month = datetimestampArray[1]
    year = datetimestampArray[4]
    return "0{fday}-{fmonth}-{fyear}".format(fday = day, fmonth = month,fyear=year)
#print(get_month_year("Fri Mar 12 00:00:43 2021 +0530"))


In [29]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType,StringType
binary_rep_udf = udf(lambda d:binary_rep(d),IntegerType())
get_month_year_udf = udf(lambda d:get_month_year(d),StringType())
# transformation to be done on the date column
#unique_git_commit_with_required_cols_df.take(5)
trans_unique_git_commit_with_required_cols_df = unique_git_commit_with_required_cols_df.select(col("author"),\
                                                                                               col("insertions"),\
                                                                                               col("deletions"),\
                                                                                               binary_rep_udf(col("date")).alias("bday"),\
                                                                                               get_month_year_udf(col("date")).alias("month"))                                                                                                                                                                                      
#trans_unique_git_commit_with_required_cols_df.show(truncate=False)

In [30]:
# read email aliases
email_alias_df = spark.read.json("emailAliases.json")
required_email_alias_df = email_alias_df.select("alias","email")
#print(emailAliasdf)
#required_email_alias_df.show(truncate=False)
#remove duplicates
#unique_required_email_alias_df = required_email_alias_df.drop_duplicates(subset=['email'])
#print(unique_required_email_alias_df.count() == required_email_alias_df.count())

In [32]:
joined_commit_df = trans_unique_git_commit_with_required_cols_df.join(required_email_alias_df,\
                                                  trans_unique_git_commit_with_required_cols_df.author == required_email_alias_df.email,\
                                                  'left')
final_df = joined_commit_df.select("alias","insertions","bday","deletions","month")
final_df.show()
#print(final_df.count())

+--------------------+----------+--------+---------+-----------+
|               alias|insertions|    bday|deletions|      month|
+--------------------+----------+--------+---------+-----------+
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|rakeshp@talentica...|        63|    2048|        2|01-Mar-2021|
|Sagar.rajak@talen...|         2|     256|        2|01-Mar-2021|
|Sagar.rajak@talen...|         2|     256|        3|01-Mar-2021|
|Sagar.rajak@talen...|       174|     128|      146|01-Mar-2021|
|Sagar.rajak@talen...|       174|     128|      146|01-Mar-2021|
|Sagar.rajak@talen...|       174|     128|      146|01-Mar-2021|
|rakeshp@talentica...|   

In [44]:
""" 
1.Create a user defined aggregate function. The problem is that you will need to write the user defined aggregate function in scala and wrap it to use in python.
2.(inefficient)You can use the collect_list function to collect all values to a list and then write a UDF to combine them.
3.(inefficient)You can move to RDD and use aggregate or aggregate by key.
sum("insertions","deletions").show()
"""
from pyspark.sql.functions import sum,avg,collect_list,collect_set
df = final_df.withColumn("insertions",final_df.insertions.cast('int')).\
    withColumn("deletions",final_df.deletions.cast('int')).\
    groupBy("alias","month").agg(sum("deletions").alias("deletions"),\
                                 sum("insertions").alias("insertions"),\
                                 collect_set("bday").alias("days"))


In [45]:
# binary days merge function
from datetime import datetime
def calculate_work_day(binary_rep_day,month):
    dayOfWeek = datetime.strptime(month,"%d-%b-%Y").weekday()
    dist=-1
    total_days=0
    for shift in range(0,31):
        pos_b = 1 << shift
        currdayOfWeek = (dayOfWeek + shift) % 7
        if(currdayOfWeek == 6 or currdayOfWeek == 7 ):
            continue
        if dist != -1:
            dist = dist+1
        if(binary_rep_day & pos_b):
            if dist == -1:
                dist = 0
            else:
                if(dist <= 3):
                    total_days = total_days + dist
                dist = 0
    return total_days
#print(calculate_work_day([32,16,2,64],"01-Oct-2020"))
def merge(daylist):
    result = 0
    for day in daylist:
        result = result | day
    return result

In [46]:
merge_udf = udf(lambda d:merge(d),IntegerType())
intermediate_result_df = df.select(col("alias"),\
                                  col("month"),\
                                  merge_udf("days").alias("days"),\
                                  col("insertions"),\
                                  col("deletions"))
#intermediate_result_df.take(5)

In [47]:
calculate_work_day_udf = udf(lambda d,m:calculate_work_day(d,m),IntegerType())

In [49]:
result_group_by_author_month_df = intermediate_result_df.select(col("alias"),\
                                  col("month"),\
                                  calculate_work_day_udf("days","month").alias("days"),\
                                  col("insertions"),\
                                  col("deletions"))
result_group_by_author_month_df.show(truncate=False)
#result_df.write.format("json").mode("overwrite").save("results.json")
#print(result_group_by_author_month_df.count())

+------------------------------+-----------+----+----------+---------+
|alias                         |month      |days|insertions|deletions|
+------------------------------+-----------+----+----------+---------+
|ratneshp@talentica.com        |01-Sep-2020|0   |335       |57       |
|rakeshp@talentica.com         |01-Oct-2020|8   |6271      |3390     |
|mohammed@talentica-all.com    |01-Sep-2020|1   |16617     |0        |
|ratneshp@talentica.com        |01-Jan-2021|15  |30171     |22364    |
|Sagar.rajak@talentica.com     |01-Dec-2020|16  |9008      |3024     |
|shravan.kumar@talentica.com   |01-Jan-2021|10  |1568      |623      |
|Shubham.Patidar@talentica.com |01-Mar-2021|4   |264485    |21639    |
|rakeshp@talentica.com         |01-Jun-2020|1   |424       |247      |
|ratneshp@talentica.com        |01-May-2020|1   |721       |32       |
|Sagar.rajak@talentica.com     |01-Feb-2021|18  |41767     |88362    |
|ratneshp@talentica.com        |01-Mar-2021|4   |89        |86       |
|shrav