In [72]:
from pyspark.sql.functions import to_timestamp, date_diff,col, explode, ceiling,avg, count, date_format, lag, when, months_between, desc
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window


# Creating SparkSession

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyPySparkApp") \
    .getOrCreate()
spark.sparkContext.setLogLevel("OFF")

# Reading csv file using pyspark and creating the dataframe and TempView.

In [2]:
spark_df = spark.read.csv("gh_issues.csv", header=True, inferSchema=True)
spark_df.createOrReplaceTempView("issues")

In [12]:
# # df_result = spark_df.withColumn("duration_months", months_between(col("closed_at"), col("created_at")).cast("int"))
# # print(df_result.where(col("duration_months")>1).show())
# df_result = spark_df.withColumn("duration_months_new",
#                           (year(col("closed_at")) - year(col("created_at"))) * 12 + \
#                           (month(col("closed_at")) - month(col("created_at")))
#                           )
# average_months_new = df_result.select(avg(col("duration_months_new")).alias("average_duration_months_new"))
# df_result.show()

+---+--------+--------------------+----------------+-----------+-------------------+-------------------+-----------+-------------------+
| id|issue_no|         issue_title|issue_created_by|issue_state|         created_at|          closed_at| commenters|duration_months_new|
+---+--------+--------------------+----------------+-----------+-------------------+-------------------+-----------+-------------------+
|  0|    1050|The reward criter...|       XXXJCSAMA|       open|2025-07-21 09:30:37|               NULL|         []|               NULL|
|  1|    1049|   Pool swapreferrer|         yurivin|     closed|2025-07-20 18:43:52|2025-07-20 18:54:56|         []|                  0|
|  2|    1048|                 Jub|         jovxx44|       open|2025-07-18 14:58:57|               NULL|         []|               NULL|
|  3|    1047|fix comment of `c...|      zhiqiangxu|       open|2025-07-17 23:53:47|               NULL|         []|               NULL|
|  4|    1046|               Juben|      

# Creating new columns from spark dataframe for Analysis

In [30]:
spark_df = (spark_df
            .withColumn("created_at", to_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("closed_at", to_timestamp("closed_at", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("resolution_days",date_diff("closed_at","created_at"))
            .withColumn("resolution_hours", (col("closed_at").cast("long") - col("created_at").cast("long")) / 3600)
            .withColumn("created_month", date_format("created_at", "yyyy-MM"))
            .withColumn("closed_month", date_format("closed_at", "yyyy-MM"))
            .withColumn("duration_months", months_between(col("closed_at"), col("created_at")).cast("int")))
spark_df.show()

+---+--------+--------------------+----------------+-----------+-------------------+-------------------+-----------+---------------+--------------------+-------------+------------+---------------+
| id|issue_no|         issue_title|issue_created_by|issue_state|         created_at|          closed_at| commenters|resolution_days|    resolution_hours|created_month|closed_month|duration_months|
+---+--------+--------------------+----------------+-----------+-------------------+-------------------+-----------+---------------+--------------------+-------------+------------+---------------+
|  0|    1050|The reward criter...|       XXXJCSAMA|       open|2025-07-21 09:30:37|               NULL|         []|           NULL|                NULL|      2025-07|        NULL|           NULL|
|  1|    1049|   Pool swapreferrer|         yurivin|     closed|2025-07-20 18:43:52|2025-07-20 18:54:56|         []|              0| 0.18444444444444444|      2025-07|     2025-07|              0|
|  2|    1048| 

# Getting top n commenters

In [None]:
top_n_commentators = 10
array_schema = ArrayType(StringType())
spark_df = spark_df.withColumn("evaluated_array", from_json(spark_df["commenters"], array_schema))
top_ten_names = spark_df.select(explode("evaluated_array")).groupBy("col").count().orderBy(desc("count")).limit(
    top_n_commentators)
top_ten_names.show()

# calculating the Average issues resolution time month wise

In [34]:
avg_resol_time = spark_df.where(col("closed_at").isNotNull()).groupBy("closed_month").agg(ceiling(avg("resolution_days")).alias("avg_resolution_days")).orderBy("closed_month")
avg_resol_time.show()

+------------+-------------------+
|closed_month|avg_resolution_days|
+------------+-------------------+
|     2020-06|                 11|
|     2020-07|                 23|
|     2020-08|                 25|
|     2020-09|                 10|
|     2020-10|                 38|
|     2020-11|                  5|
|     2020-12|                  8|
|     2021-01|                  6|
|     2021-02|                  6|
|     2021-03|                  2|
|     2021-04|                  6|
|     2021-05|                  1|
|     2021-06|                 21|
|     2021-08|                 20|
|     2021-09|                  0|
|     2021-10|                  0|
|     2021-11|                  0|
|     2021-12|                  1|
|     2022-01|                 17|
|     2022-03|                 14|
+------------+-------------------+
only showing top 20 rows


# Calculation issue month over month change percentage
## 3 steps:

      * Count the number of issues in a month.
      * Prepare for looking at the "previous" row (create previous month issue using current month issue)
      * Finally Calculating the percentage of issue by month over month.


In [36]:
df_monthly_issues = spark_df.groupBy("created_month") \
        .agg(count(col("issue_no")).alias("current_month_issues")) \
        .orderBy("created_month")
print("Issues Count month-wise:\n")
print(df_monthly_issues.show())


Issues Count month-wise:

+-------------+--------------------+
|created_month|current_month_issues|
+-------------+--------------------+
|      2020-05|                   1|
|      2020-06|                  28|
|      2020-07|                  13|
|      2020-08|                  21|
|      2020-09|                   8|
|      2020-10|                  40|
|      2020-11|                  60|
|      2020-12|                  75|
|      2021-01|                 124|
|      2021-02|                  57|
|      2021-03|                  23|
|      2021-04|                  13|
|      2021-05|                   6|
|      2021-06|                   1|
|      2021-07|                   1|
|      2021-09|                   1|
|      2021-10|                   1|
|      2021-11|                   5|
|      2021-12|                   5|
|      2022-01|                   6|
+-------------+--------------------+
only showing top 20 rows
None


In [38]:
# 2. Prepare for looking at the "previous" row
window_definition = Window.orderBy("created_month")

df_with_previous_count = df_monthly_issues.withColumn(
    "previous_month_issues",
    lag(col("current_month_issues"), 1).over(window_definition)
)
print("\nStep 2: Monthly Counts with Previous Month's Count:")
print(df_with_previous_count.show())





Step 2: Monthly Counts with Previous Month's Count:
+-------------+--------------------+---------------------+
|created_month|current_month_issues|previous_month_issues|
+-------------+--------------------+---------------------+
|      2020-05|                   1|                 NULL|
|      2020-06|                  28|                    1|
|      2020-07|                  13|                   28|
|      2020-08|                  21|                   13|
|      2020-09|                   8|                   21|
|      2020-10|                  40|                    8|
|      2020-11|                  60|                   40|
|      2020-12|                  75|                   60|
|      2021-01|                 124|                   75|
|      2021-02|                  57|                  124|
|      2021-03|                  23|                   57|
|      2021-04|                  13|                   23|
|      2021-05|                   6|                   13|
|  

In [39]:
# 3. Get the "Previous Month Issue Count"

df_mom_pct_change = df_with_previous_count.withColumn(
    "mom_percentage_change",
    when(col("previous_month_issues").isNull(), None)
    .when(col("previous_month_issues") == 0, None)
    .otherwise(
        ((col("current_month_issues") - col("previous_month_issues")) / col("previous_month_issues")) * 100
    )
)

print("\nStep 3: Monthly issues with month over month Percentage Change:")
print(df_mom_pct_change.show())


Step 3: Monthly issues with month over month Percentage Change:
+-------------+--------------------+---------------------+---------------------+
|created_month|current_month_issues|previous_month_issues|mom_percentage_change|
+-------------+--------------------+---------------------+---------------------+
|      2020-05|                   1|                 NULL|                 NULL|
|      2020-06|                  28|                    1|               2700.0|
|      2020-07|                  13|                   28|   -53.57142857142857|
|      2020-08|                  21|                   13|    61.53846153846154|
|      2020-09|                   8|                   21|  -61.904761904761905|
|      2020-10|                  40|                    8|                400.0|
|      2020-11|                  60|                   40|                 50.0|
|      2020-12|                  75|                   60|                 25.0|
|      2021-01|                 124|        

# Calculation of Issue duration time using sql query.

In [4]:
query="""
    SELECT
        DATE_FORMAT(closed_at, 'yyyy-MM') AS year_month,
        CEIL(AVG(
            (UNIX_TIMESTAMP(closed_at) - UNIX_TIMESTAMP(created_at)) / 3600
        )) AS avg_closing_hours,
        CEIL(AVG(DATEDIFF(closed_at, created_at))) AS avg_closing_days
    FROM issues WHERE closed_at IS NOT NULL
    GROUP BY
        year_month
    ORDER BY
        year_month

"""
spark.sql(query).show()
spark.stop()

+----------+-----------------+----------------+
|year_month|avg_closing_hours|avg_closing_days|
+----------+-----------------+----------------+
|   2020-06|              246|              11|
|   2020-07|              548|              23|
|   2020-08|              588|              25|
|   2020-09|              240|              10|
|   2020-10|              889|              38|
|   2020-11|               97|               5|
|   2020-12|              174|               8|
|   2021-01|              138|               6|
|   2021-02|              126|               6|
|   2021-03|               31|               2|
|   2021-04|              131|               6|
|   2021-05|               11|               1|
|   2021-06|              512|              21|
|   2021-08|              498|              20|
|   2021-09|                1|               0|
|   2021-10|                1|               0|
|   2021-11|                4|               0|
|   2021-12|               22|          