### Clone Repository

Run the following command to load data from the `data-eng-interview` repo to your Colab working directory; importing this way will allow you to read the data without having to manually download & mount your personal Google drive.

In [None]:
!git clone https://github.com/JanusHealthInc/data-eng-interview.git

### Install PySpark

Run the below magic to install `pyspark` in the current kernel

In [8]:
%pip install pyspark



### Import Packages

Please use `pyspark` as your main data package for this exercise. Feel free to import any other packages you deem necessary.

In [35]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from genericpath import isfile
from posix import truncate
from pyspark.sql.functions import col

### Read Data

Create dataframes for each of the files in the `data/` directory.

In [13]:
spark = SparkSession.builder.appName("janus").getOrCreate()

directory = "data-eng-interview/data"
files = os.listdir(directory)
dfs = {}

for file_name in files:
  file_path = os.path.join(directory, file_name)
  if os.path.isfile(file_path):
    df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(file_path))
    dfs[os.path.splitext(file_name)[0]] = df
print(dfs)


{'medals': DataFrame[Rank: int, Team/NOC: string, Gold: int, Silver: int, Bronze: int, Total: int, Rank by Total: int], 'teams': DataFrame[Name: string, Discipline: string, NOC: string, Event: string], 'coaches': DataFrame[Name: string, NOC: string, Discipline: string, Event: string], 'athletes': DataFrame[Name: string, NOC: string, Discipline: string]}


### EDA & Transformations

- Get comfortable with the data by performing any data exploration you see fit
    - Please come prepared with code samples to discuss your findings
- Do any transformations you see fit to get the data ready for the final report below
- Create a final report that includes the following attributes:
    - Athlete name (in title case)
    - Coach name (in title case)
    - Team
    - Country
    - Number of Gold Medals

In [39]:
for key, value in dfs.items():
  print(key)

  null_records = value.filter(
      sum(col(c).isNull().cast("integer") for c in value.columns) > 0
  )
  null_records.show(null_records.count(), truncate=False)

medals
+----+--------+----+------+------+-----+-------------+
|Rank|Team/NOC|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------+----+------+------+-----+-------------+
+----+--------+----+------+------+-----+-------------+

teams
+----+----------+---+-----+
|Name|Discipline|NOC|Event|
+----+----------+---+-----+
+----+----------+---+-----+

coaches
+-----------------------------+--------------------------+-----------------+-----+
|Name                         |NOC                       |Discipline       |Event|
+-----------------------------+--------------------------+-----------------+-----+
|ABDELMAGID Wael              |Egypt                     |Football         |NULL |
|ABE Junya                    |Japan                     |Volleyball       |NULL |
|ABE Katsuhiko                |Japan                     |Basketball       |NULL |
|ADAMA Cherif                 |Côte d'Ivoire             |Football         |NULL |
|AGEBA Yuya                   |Japan                     |Volley

most records are complete, except event for coaches

In [14]:
for key, value in dfs.items():
  print(key)
  value.show()

medals
+----+--------------------+----+------+------+-----+-------------+
|Rank|            Team/NOC|Gold|Silver|Bronze|Total|Rank by Total|
+----+--------------------+----+------+------+-----+-------------+
|   1|United States of ...|  39|    41|    33|  113|            1|
|   2|People's Republic...|  38|    32|    18|   88|            2|
|   3|               Japan|  27|    14|    17|   58|            5|
|   4|       Great Britain|  22|    21|    22|   65|            4|
|   5|                 ROC|  20|    28|    23|   71|            3|
|   6|           Australia|  17|     7|    22|   46|            6|
|   7|         Netherlands|  10|    12|    14|   36|            9|
|   8|              France|  10|    12|    11|   33|           10|
|   9|             Germany|  10|    11|    16|   37|            8|
|  10|               Italy|  10|    10|    20|   40|            7|
|  11|              Canada|   7|     6|    11|   24|           11|
|  12|              Brazil|   7|     6|     8|   21|   

Keys/Columns
* teams.NOC
  * athletes.NOC
  * coaches.NOC
  * medals.NOC
* teams.discipline
 * athletes.discipline
 * coaches.discipline
* teams.event
  * coaches.event
  

In [17]:
teams_noc_count = dfs['teams'].select(F.countDistinct("NOC").alias("teams_NOC"))
athletes_noc_count = dfs['athletes'].select(F.countDistinct("NOC").alias("athletes_NOC"))
coaches_noc_count = dfs['coaches'].select(F.countDistinct("NOC").alias("coaches_NOC"))
medals_noc_count = dfs['medals'].select(F.countDistinct("Team/NOC").alias("medals_NOC"))

all_counts = (teams_noc_count.crossJoin(athletes_noc_count)
  .crossJoin(coaches_noc_count)
  .crossJoin(medals_noc_count)).show()


+---------+------------+-----------+----------+
|teams_NOC|athletes_NOC|coaches_NOC|medals_NOC|
+---------+------------+-----------+----------+
|       84|         206|         61|        93|
+---------+------------+-----------+----------+



Initially would have thought these would have been more similar
* teams_NOC > coaches_NOC - some teams are missing coaches data (or don't have coaches
* medeals_NOC > teams_NOC - some NOCs don't have team data (or perhaps some events are not considered to be a team event)
  * perhaps there's missing individual ahtletes data?
* athletes_NOC > medals_NOC - many countries do not appear on the medals table, perhaps only those that have won a medal. There's not an explicit/canonical list of NOCS


In [25]:
teams_wo_coach =(dfs['teams'].select("NOC", "Discipline", "Event")
  .join(dfs['coaches'],
    (dfs['teams']["NOC"] == dfs['coaches']["NOC"]) &
    (dfs['teams']["Discipline"] == dfs['coaches']["Discipline"]) &
    (dfs['teams']["Event"] == dfs['coaches']["Event"]),
    "left_anti")
  .groupBy("Discipline", "Event")
  .count()
  .orderBy(F.asc("count")))
teams_wo_coach.show(teams_wo_coach.count(), truncate=False)

disc_wo_coach = (teams_wo_coach.select("Discipline")
  .distinct()
  .join(dfs['coaches'],
    teams_wo_coach["Discipline"] == dfs['coaches']["Discipline"],
    "left")
  .groupBy(teams_wo_coach["Discipline"])
  .count())
disc_wo_coach.show(disc_wo_coach.count(), truncate=False)

+-------------------+--------------------------------+-----+
|Discipline         |Event                           |count|
+-------------------+--------------------------------+-----+
|Artistic Swimming  |Team                            |1    |
|Artistic Swimming  |Duet                            |2    |
|Fencing            |Women's Épée Team               |8    |
|Cycling Track      |Men's Team Pursuit              |8    |
|3x3 Basketball     |Men                             |8    |
|Cycling Track      |Women's Team Pursuit            |8    |
|3x3 Basketball     |Women                           |8    |
|Fencing            |Women's Foil Team               |8    |
|Cycling Track      |Women's Team Sprint             |8    |
|Cycling Track      |Men's Team Sprint               |8    |
|Fencing            |Men's Épée Team                 |9    |
|Fencing            |Women's Sabre Team              |9    |
|Fencing            |Men's Sabre Team                |9    |
|Fencing            |Men

Looks like there are some teams without coaches that are valid, and the artistic swimming example might be missing data

In [26]:
coach_wo_team = (dfs['coaches'].select("Name", "NOC", "Discipline", "Event")
  .join(dfs['teams'],
    (dfs['teams']["NOC"] == dfs['coaches']["NOC"]) &
    (dfs['teams']["Discipline"] == dfs['coaches']["Discipline"]) &
    (dfs['teams']["Event"] == dfs['coaches']["Event"]),
    "left_anti")
  .orderBy("NOC", "Discipline"))
coach_wo_team.show(coach_wo_team.count(), truncate=False)

+-----------------------------+--------------------------+-----------------+-----+
|Name                         |NOC                       |Discipline       |Event|
+-----------------------------+--------------------------+-----------------+-----+
|LOPEZ Monica Susana          |Argentina                 |Artistic Swimming|NULL |
|RODRIGUEZ Eduardo Rafael     |Argentina                 |Volleyball       |NULL |
|WOELFLIN Roberto Osvaldo     |Argentina                 |Volleyball       |NULL |
|CHAMBERS Cheryl              |Australia                 |Basketball       |NULL |
|DYKSTRA Trisha               |Australia                 |Basketball       |NULL |
|RILLIE John                  |Australia                 |Basketball       |NULL |
|JENNINGS Dee                 |Australia                 |Hockey           |NULL |
|STANIFORTH Dave              |Australia                 |Hockey           |NULL |
|BRUN Aristide                |Austria                   |Artistic Swimming|NULL |
|SHA

There are many examples where the event is missing from the coach record  

In [29]:
teams_disc_count = dfs['teams'].select(F.countDistinct("Discipline").alias("teams_disc"))
athletes_disc_count = dfs['athletes'].select(F.countDistinct("Discipline").alias("athletes_disc"))
coaches_disc_count = dfs['coaches'].select(F.countDistinct("Discipline").alias("coaches_disc"))

all_counts = (teams_disc_count.crossJoin(athletes_disc_count).crossJoin(coaches_disc_count)).show()

+----------+-------------+------------+
|teams_disc|athletes_disc|coaches_disc|
+----------+-------------+------------+
|        20|           46|           9|
+----------+-------------+------------+



* there are likely 26 individual disciplines
* 11 disciplines don't have coaches which matches above
* 46 total disciplines