In [39]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql import types as t

In [2]:
spark = SparkSession.builder.appName('Telecom CaseStudy').getOrCreate()

In [8]:
# method to read json file and csv with custom header

def read_json(spark: SparkSession, file_name: str) -> DataFrame :
    """
    """
    return spark.read.json(file_name)

def read_csv(spark: SparkSession, file_name: str, sep: str) -> DataFrame:
    """
    """
    return spark.read.csv(file_name, header=True, inferSchema=True, sep=sep)

In [45]:
alarms = read_json(spark, 'data/alarms.json')
tickets = read_csv(spark, 'data/tickets.dat', '|')
teams = read_csv(spark, 'data/team.csv', ',')

In [46]:
def clean_teamid(team_id: str) -> str:
    """
    """
    return 'team' + team_id[-2:]

udf_clean_teamid = f.udf(lambda x: clean_teamid(x), t.StringType())

teams = teams.withColumn('team_id', udf_clean_teamid('team_id'))
teams.show(5)

+-------+---------+--------------------+
|team_id|team_size|avg_experience_years|
+-------+---------+--------------------+
| team40|        4|                   5|
| team33|        6|                   7|
| team90|        5|                   6|
| team56|        5|                   8|
| team48|        6|                   6|
+-------+---------+--------------------+
only showing top 5 rows



In [15]:
tk_solved = tickets.groupBy('solved_by').pivot('priority').agg(f.count(f.lit(1)))
tk_solved = tk_solved \
            .selectExpr('solved_by as team', '`1` as tickets_priority_1_count',
                        '`2` as tickets_priority_2_count', '`3` as tickets_priority_3_count')
tk_solved.show()

+------+------------------------+------------------------+------------------------+
|  team|tickets_priority_1_count|tickets_priority_2_count|tickets_priority_3_count|
+------+------------------------+------------------------+------------------------+
|team68|                     317|                    1172|                    1865|
|team56|                     356|                    1139|                    1859|
|team48|                     405|                    1093|                    1898|
|team30|                     354|                    1133|                    1863|
|team78|                     400|                    1114|                    1893|
|team28|                     434|                    1071|                    1800|
|team90|                     360|                    1111|                    1842|
|team33|                     364|                    1152|                    1787|
|team34|                     405|                    1064|                  

In [20]:
tickets_df = tickets \
                 .withColumn('year', f.year('started_ts')) \
                 .withColumn('month', f.month('started_ts'))
tickets_df.show(5)

+---------+--------+-------------------+-------------------+---------+----------+----------+--------+----+-----+
|ticket_id|   alarm|         started_ts|           ended_ts|solved_by|src_system|site_visit|priority|year|month|
+---------+--------+-------------------+-------------------+---------+----------+----------+--------+----+-----+
|T_0000001|50025222|2018-09-14 03:29:10|14-09-2018 04.06.13|   team78|       OP2|         1|       3|2018|    9|
|T_0000002|50021238|2018-10-29 03:39:49|29/10/2018 04:25:25|   team30|       OP1|         0|       2|2018|   10|
|T_0000003|50034089|2018-03-31 07:25:48|31/03/2018 07:58:45|   team68|       OP1|         0|       3|2018|    3|
|T_0000004|50021918|2018-08-08 19:57:56|08/08/2018 20:42:40|   team56|       OP1|         0|       2|2018|    8|
|T_0000005|50046096|2018-10-10 17:47:00|10/10/2018 18:31:05|   team34|       OP1|         0|       3|2018|   10|
+---------+--------+-------------------+-------------------+---------+----------+----------+----

In [21]:
year_df = tickets_df.groupBy('solved_by', 'year').agg(f.count(f.lit(1)).alias('yearly_tickets_solved'))
year_df.show()

+---------+----+---------------------+
|solved_by|year|yearly_tickets_solved|
+---------+----+---------------------+
|   team48|2018|                 3396|
|   team30|2018|                 3349|
|   team33|2018|                 3303|
|   team28|2019|                    1|
|   team78|2019|                    1|
|   team78|2018|                 3406|
|   team56|2018|                 3354|
|   team68|2018|                 3354|
|   team34|2018|                 3218|
|   team28|2018|                 3304|
|   team30|2019|                    1|
|   team90|2018|                 3313|
+---------+----+---------------------+



In [26]:
window_spec = Window.partitionBy('solved_by').orderBy('year')

year_df = year_df.withColumn('tickets_solved_previous_year', f.lag('yearly_tickets_solved').over(window_spec)).fillna(0)

In [34]:
month_df = tickets_df.groupBy('solved_by', 'year', 'month').agg(f.count(f.lit(1)).alias('monthly_tickets_solved'))

In [35]:
month_df.show(5)

+---------+----+-----+----------------------+
|solved_by|year|month|monthly_tickets_solved|
+---------+----+-----+----------------------+
|   team68|2018|    6|                   296|
|   team28|2018|    3|                   291|
|   team28|2018|   10|                   277|
|   team33|2018|    7|                   306|
|   team30|2018|    8|                   310|
+---------+----+-----+----------------------+
only showing top 5 rows



In [51]:
df = year_df \
     .join(month_df, on=['solved_by', 'year'])

final_df = df.join(teams, teams.team_id == df.solved_by)
final_df = final_df.select('team_id', 'team_size', 'year', 'month', 'monthly_tickets_solved', 'tickets_solved_previous_year')