#### Load Data:
- Loads the data with the defined schema.

#### Explode Nested Columns:
- Explodes the KPIs column and flattens it.
- Explodes the lines_per_repo column and flattens it, extracting the keys and values from the map.

#### Clean Data:
- Imputes missing dates with January 1, 2024 as the distribution seems to fit that date better.
- Converts the completed column to Boolean.
- Replaces nulls in engineer with "Unknown".
- Imputes missing num_slack_messages and num_hours with their respective medians after checking summary statitics.
- Handles missing or invalid values in ticket_description, initiative, new_revenue, repo_name, and lines_added.
- Ensures jira_ticket_id is unique.
- Validates num_hours and num_slack_messages to ensure they are non-negative.

In [1]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
def create_spark_session():
    return SparkSession.builder.appName("JiraMetrics").getOrCreate()

In [3]:
def load_data(spark: SparkSession, file_path: str) -> DataFrame:
    # Define the expected schema
    schema = T.StructType([
        T.StructField("jira_ticket_id", T.IntegerType(), True),
        T.StructField("date", T.DateType(), True),
        T.StructField("completed", T.StringType(), True),  # Initially as StringType to handle "yes", "True", "None"
        T.StructField("num_slack_messages", T.IntegerType(), True),
        T.StructField("num_hours", T.FloatType(), True),
        T.StructField("engineer", T.StringType(), True),
        T.StructField("ticket_description", T.StringType(), True),
        T.StructField("KPIs", T.ArrayType(
            T.StructType([
                T.StructField("initiative", T.StringType(), True),
                T.StructField("new_revenue", T.FloatType(), True)
            ])
        ), True),
        T.StructField("lines_per_repo", T.ArrayType(
            T.MapType(T.StringType(), T.IntegerType())
        ), True)
    ])

    # Load the JSON data into a DataFrame
    df = spark.read.schema(schema).json(file_path)

    return df

def analyze_null_distribution(df: DataFrame):
    # Count nulls in each column
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
    null_counts.show()

In [4]:
spark = create_spark_session()
df = load_data(spark, "../data/data.json")
analyze_null_distribution(df)
df.show(5)

+--------------+----+---------+------------------+---------+--------+------------------+----+--------------+
|jira_ticket_id|date|completed|num_slack_messages|num_hours|engineer|ticket_description|KPIs|lines_per_repo|
+--------------+----+---------+------------------+---------+--------+------------------+----+--------------+
|             0|1970|    14285|                 0|        0|   16315|                 0|   0|             0|
+--------------+----+---------+------------------+---------+--------+------------------+----+--------------+

+--------------+----------+---------+------------------+---------+--------+--------------------+--------------------+--------------------+
|jira_ticket_id|      date|completed|num_slack_messages|num_hours|engineer|  ticket_description|                KPIs|      lines_per_repo|
+--------------+----------+---------+------------------+---------+--------+--------------------+--------------------+--------------------+
|             0|2023-03-18|      yes|

In [5]:
def inspect_nested_columns(df: DataFrame):
    # Explode KPIs column and inspect nested fields
    print("Exploring KPIs column:")
    df.select(F.col("jira_ticket_id"), F.explode("KPIs").alias("KPI")).select("jira_ticket_id", "KPI.*").show()

    # Explode lines_per_repo column and inspect nested fields
    print("Exploring lines_per_repo column:")
    df.select(F.col("jira_ticket_id"), F.explode("lines_per_repo").alias("repo")) \
      .select("jira_ticket_id", F.expr("map_keys(repo)[0]").alias("repo_name"), F.expr("map_values(repo)[0]").alias("lines_added")) \
      .show()

In [6]:
inspect_nested_columns(df)

Exploring KPIs column:
+--------------+------------+-----------+
|jira_ticket_id|  initiative|new_revenue|
+--------------+------------+-----------+
|             0|New Customer|  4349.9053|
|             1|  Efficiency|  5295.6553|
|             1|New Customer|   4360.982|
|             2|     Support|  1034.0549|
|             2|     Support|  6768.2495|
|             3|New Customer|   9062.461|
|             3|     Support|  7093.9927|
|             4|New Customer|  782.30023|
|             4|  Efficiency|   2014.079|
|             5|New Customer|  503.27695|
|             5|New Customer|  2257.7773|
|             6|New Customer|   3002.858|
|             7|  Efficiency|   3005.427|
|             7|     Support|  7779.5337|
|             8|  Efficiency|  5770.9575|
|             8|  Efficiency|  268.91583|
|             9|  Efficiency|  3023.7705|
|            10|New Customer|   5879.525|
|            11|New Customer|  1704.4084|
|            12|  Efficiency|  3425.6223|
+----------

In [7]:
def explode_nested_columns(df: DataFrame) -> DataFrame:
    # Explode KPIs column
    df = df.withColumn("KPIs_exploded", F.explode_outer("KPIs")) \
           .select("*", "KPIs_exploded.*").drop("KPIs", "KPIs_exploded")

    # Explode lines_per_repo column
    df = df.withColumn("lines_per_repo_exploded", F.explode_outer("lines_per_repo")) \
           .select("*", F.expr("map_keys(lines_per_repo_exploded)[0]").alias("repo_name"),
                   F.expr("map_values(lines_per_repo_exploded)[0]").alias("lines_added")) \
           .drop("lines_per_repo", "lines_per_repo_exploded")

    return df

In [8]:
exploded_df = explode_nested_columns(df)
exploded_df.show(5)

+--------------+----------+---------+------------------+---------+--------+--------------------+------------+-----------+---------+-----------+
|jira_ticket_id|      date|completed|num_slack_messages|num_hours|engineer|  ticket_description|  initiative|new_revenue|repo_name|lines_added|
+--------------+----------+---------+------------------+---------+--------+--------------------+------------+-----------+---------+-----------+
|             0|2023-03-18|      yes|               237|22.898462|  Sandra|reclusive initiat...|New Customer|  4349.9053|        G|         52|
|             0|2023-03-18|      yes|               237|22.898462|  Sandra|reclusive initiat...|New Customer|  4349.9053|        O|         23|
|             0|2023-03-18|      yes|               237|22.898462|  Sandra|reclusive initiat...|New Customer|  4349.9053|        H|         31|
|             1|2023-03-31|     true|               276|62.134537|    Dale|Taurus both absen...|  Efficiency|  5295.6553|        G|     

In [9]:
# Analyze the date distribution
date_distribution = exploded_df.groupBy(F.year("date").alias("year"), F.month("date").alias("month")).count().orderBy("year", "month")
date_distribution.show()

+----+-----+-----+
|year|month|count|
+----+-----+-----+
|NULL| NULL| 8941|
|2023|    1|35608|
|2023|    2|34066|
|2023|    3|37320|
|2023|    4|36710|
|2023|    5|37308|
|2023|    6|36706|
|2023|    7|36778|
|2023|    8|37064|
|2023|    9|35987|
|2023|   10|37625|
|2023|   11|35667|
|2023|   12|37903|
|2024|    1| 1188|
+----+-----+-----+



In [10]:
import matplotlib.pyplot as plt
import pandas as pd

def get_summary_statistics(df: DataFrame, column: str):
    # Describe the column to get summary statistics
    summary = df.select(column).summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").show()

    # Calculate the median using the approximate quantile method
    median = df.approxQuantile(column, [0.5], 0.01)[0]
    print(f"Median of {column}: {median}")

# Get summary statistics
get_summary_statistics(exploded_df, "num_slack_messages")
get_summary_statistics(exploded_df, "num_hours")

+-------+------------------+
|summary|num_slack_messages|
+-------+------------------+
|  count|            448871|
|   mean| 250.4349044603015|
| stddev|144.68258453512533|
|    min|                 1|
|    25%|               125|
|    50%|               251|
|    75%|               376|
|    max|               500|
+-------+------------------+

Median of num_slack_messages: 255.0
+-------+------------------+
|summary|         num_hours|
+-------+------------------+
|  count|            448871|
|   mean|48.926089710604224|
| stddev|30.569540820445347|
|    min|        -99.794624|
|    25%|          24.27398|
|    50%|         49.350273|
|    75%|          74.60691|
|    max|          99.99668|
+-------+------------------+

Median of num_hours: 48.76472473144531


In [22]:
def clean_data(df: DataFrame) -> DataFrame:
    # Impute missing dates with January 1, 2024
    df = df.withColumn("date", F.when(F.col("date").isNull(), F.lit("2024-01-01")).otherwise(F.col("date")))

    # Convert 'completed' field to BooleanType
    df = df.withColumn("completed", F.when(F.col("completed") == "yes", True)
                                      .when(F.col("completed") == "True", True)
                                      .when(F.col("completed") == "None", False)
                                      .otherwise(F.col("completed").cast(T.BooleanType())))

    # Replace nulls in 'engineer' with 'Unknown'
    df = df.withColumn("engineer", F.when(F.col("engineer").isNull(), "Unknown").otherwise(F.col("engineer")))

    # Impute missing 'num_slack_messages' with median
    median_slack_messages = df.approxQuantile("num_slack_messages", [0.5], 0.01)[0]
    df = df.withColumn("num_slack_messages", F.when(F.col("num_slack_messages").isNull(), median_slack_messages).otherwise(F.col("num_slack_messages")))

    # Impute missing 'num_hours' with median and correct negative values
    median_num_hours = df.approxQuantile("num_hours", [0.5], 0.01)[0]
    df = df.withColumn("num_hours", F.when((F.col("num_hours").isNull()) | (F.col("num_hours") < 0), median_num_hours).otherwise(F.col("num_hours")))

    # Handling missing or invalid 'ticket_description'
    df = df.withColumn("ticket_description", F.when(F.col("ticket_description").isNull(), "No Description").otherwise(F.col("ticket_description")))

    # Handling missing or invalid 'initiative' and 'new_revenue' in exploded KPIs
    df = df.withColumn("initiative", F.when(F.col("initiative").isNull(), "No Initiative").otherwise(F.col("initiative")))
    df = df.withColumn("new_revenue", F.when(F.col("new_revenue").isNull(), 0).otherwise(F.col("new_revenue")))

    # Handling missing or invalid 'repo_name' and 'lines_added' in exploded lines_per_repo
    df = df.withColumn("repo_name", F.when(F.col("repo_name").isNull(), "Unknown").otherwise(F.col("repo_name")))
    df = df.withColumn("lines_added", F.when(F.col("lines_added").isNull(), 0).otherwise(F.col("lines_added")))

    # Ensure unique jira_ticket_id
    df = df.dropDuplicates(["jira_ticket_id"])

    return df


In [23]:
cleaned_df = clean_data(exploded_df)
cleaned_df.show(5)

+--------------+----------+---------+------------------+-----------------+--------+--------------------+------------+-----------+---------+-----------+
|jira_ticket_id|      date|completed|num_slack_messages|        num_hours|engineer|  ticket_description|  initiative|new_revenue|repo_name|lines_added|
+--------------+----------+---------+------------------+-----------------+--------+--------------------+------------+-----------+---------+-----------+
|            26|2023-03-08|     true|             366.0|65.34468841552734|    Alex|herself sip incep...|New Customer|  7167.0356|        M|         12|
|            27|2023-12-14|     true|             388.0|85.50627899169922|    Alex|lute O'Brien coll...|     Support|  6864.0854|        Z|         90|
|            28|2023-06-16|    false|             194.0|3.239738941192627|    Dale|spheroid wobble c...|     Support|  5590.8164|        S|         18|
|            31|2023-06-06|    false|             112.0|37.53969192504883|  Sandra|tenur

In [24]:
#get the null distribution of the cleaned data
analyze_null_distribution(cleaned_df)

+--------------+----+---------+------------------+---------+--------+------------------+----------+-----------+---------+-----------+
|jira_ticket_id|date|completed|num_slack_messages|num_hours|engineer|ticket_description|initiative|new_revenue|repo_name|lines_added|
+--------------+----+---------+------------------+---------+--------+------------------+----------+-----------+---------+-----------+
|             0|   0|    14285|                 0|        0|       0|                 0|         0|          0|        0|          0|
+--------------+----+---------+------------------+---------+--------+------------------+----------+-----------+---------+-----------+



In [25]:
cleaned_df.count()

100000

In [16]:
#See the unique value count in the completed column
cleaned_df.groupBy("completed").count().show()

+---------+-----+
|completed|count|
+---------+-----+
|     NULL|14285|
|     true|43052|
|    false|42663|
+---------+-----+



In [10]:
#save the data to parquet in the data folder
cleaned_df.write.mode("overwrite").parquet("../data/clean_data.parquet")