In [3]:
import findspark
findspark.init()  # Auto-detects SPARK_HOME

from typing import List
from pprint import pprint
import json

from pyspark.sql import SparkSession
from pyspark import SparkContext

In [14]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PySpark DF Assignment") \
    .getOrCreate()

# first load to DF because we have a multiline json
commits_df = spark.read.option("multiline", "true") \
      .json("../data/data_raw.json")

commits_df.createOrReplaceTempView("commits")

# show df schema
commits_df.printSchema()

[Stage 5:>                                                          (0 + 1) / 1]

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- author: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- site_admin: boolean (nullable = true)
 |    |-- type: string (nullable = true)
 |-- commit: struct (nullable = true)
 |    |-- author: struct (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- comment_count: long (nullable = true)
 |    |-- committer: struct (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- tree: struct (nullable = true)
 |    |    |-- sha: string (nullable = true)
 |    |-- verification: struct (nullable = true)
 |    |    |-- payload: string (nu

                                                                                

### Assignment 1
In this exercise we want to know all the commit SHA's from a list of commit committers. We require these to be in order according to timestamp.

| committer      | sha                                      | timestamp            |
|----------------|------------------------------------------|----------------------|
| Harbar-Inbound | 1d8e15a834a2157fe7af04421c42a893e8a1f23a | 2019-03-10T15:24:16Z |
| ...            | ...                                      | ...                  |

Hint: try to work out the individual stages of the exercises, which makes it easier to track bugs, and figure out how Spark Dataframes and their operations work. You can also use the `printSchema()` function and `show()`function to take a look at the structure and contents of the Dataframes.

@param commits Commit Dataframe, created from the data_raw.json file.

@param authors Sequence of String representing the authors from which we want to know their respective commit SHA's.

@return DataFrame of commits from the requested authors, including the commit SHA and the according timestamp.

In [61]:
def assignment_1(commits_df, authors: List[str]):

    # filter based on authors
    filtered_df = commits_df.filter(commits_df["commit.committer.name"].isin(*authors))
    filtered_df.createOrReplaceTempView("filtered_commits")

    # select relevant data: committer (commit/committer/name), sha, commit/committer/date
    # order by timestamp
    query = """
    SELECT  
        commit.committer.name as name,
        sha,
        commit.committer.date as time                                       
    FROM filtered_commits
    ORDER BY time
    """
    
    result_df = spark.sql(query)
        
    return result_df

In [62]:
expected_top2 = [("GitHub", "65a88dce81af6d4dc6751deab202fa8eeadaf9c0", "2019-01-27T07:09:13.000Z"),
                 ("GitHub", "54fc3ef7d48b1c25910485b8f369023077d0f995", "2019-03-04T15:21:52.000Z")]
result_df = assignment_1(commits_df, ["GitHub"]).collect()
assert tuple(result_df[0]) == expected_top2[0]
assert tuple(result_df[1]) == expected_top2[1]

### Assignment 2

In order to generate weekly dashboards for all projects, we need the data to be partitioned by weeks. As projects can span multiple years in the data set, care must be taken to partition by not only weeks but also by years.

The returned DataFrame that is expected is in the following format:

| repository | week             | year | count   |
|------------|------------------|------|---------|
| Maven      | 41               | 2019 | 21      |
| .....      | ..               | .... | ..      |

@param commits Commit Dataframe, created from the data_raw.json file.

@return Dataframe containing 4 columns, Repository name, week number, year and the number fo commits for that week.

In [85]:
def assignment_2():
    query = """
        SELECT 
            SPLIT_PART(url, '/', 6) as repo,
            weekofyear(commit.committer.date) as week,
            YEAR(commit.committer.date) as year,
            COUNT(*) as count
        FROM commits
        GROUP BY repo, week, year
    
    """
    result_df = spark.sql(query)
    return result_df
    

In [91]:
# test
result = assignment_2()
result_set = set(result.collect())
expected_set = {("DrTests", 21, 2019, 1), 
                ("ExoGit", 21, 2019, 6)}

assert expected_set.issubset(result_set)

                                                                                

### Assignment 3

A developer is interested in the age of commits in seconds. Although this can always be calculated during runtime, it would require passing a `Timestamp` (in milliseconds) along with the computation. Therefore, we require you to append the inputted DataFrame with an `age` column representing the age of each commit in seconds.

##### Expected DataFrame Example:

| age  |
|------|
| 1231 |
| 20   |
| ...  |

#### Parameters:
- **commits**: Commit DataFrame, created from the `data_raw.json` file.

#### Returns:
- The inputted DataFrame appended with an `age` column.


In [127]:
def assignment_3(timestamp_ms):
    query = f"""
        SELECT 
            *,
            DATEDIFF(SECOND, commit.committer.date, FROM_UNIXTIME({timestamp_ms}/ 1000)) as age
        FROM commits
    """
    result_df = spark.sql(query)
    return result_df

In [128]:
# test
expected = [12093031, 12093019, 12093032, 12093030]

result = assignment_3(timestamp_ms=1570707029000)
first_4_results = result.take(4)

for res, exp in zip(first_4_results, expected):
    assert res["age"] == exp

                                                                                

### Assignment 4

To perform analysis on commit behavior, the intermediate time between commits is needed. We require that the DataFrame input be appended with an extra column that expresses the number of days between the current commit and the previous commit of the user, independent of the branch or repository. If no commit exists before a commit, regard the time difference in days as zero. Make sure to return the commits in chronological order.

**Hint:** Look into Spark SQL's `Window` to have more expressive power in custom aggregations.

### Expected DataFrame Example:

| $oid                      | name   | date                       | time_diff |
|---------------------------|--------|----------------------------|-----------|
| 5ce6929e6480fd0d91d3106a   | GitHub | 2019-01-27T07:09:13.000Z   | 0         |
| 5ce693156480fd0d5edbd708   | GitHub | 2019-03-04T15:21:52.000Z   | 36        |
| 5ce691b06480fd0fe0972350   | GitHub | 2019-03-06T13:55:25.000Z   | 2         |
| ...                       | ...    | ...                        | ...       |

#### Parameters:
- **commits**: Commit DataFrame (see `commit.json` and `data_raw.json` for the structure of the file, or run `println(commits.schema)`).
- **authorName**: Name of the author for which the result must be generated.

#### Returns:
- DataFrame with a column expressing the days since the last commit.


In [193]:
def assignment_4(author_name):
    query = f"""
        WITH commits_with_lag (
            SELECT 
                _id as id,
                commit.committer.name as name,
                CAST(commit.committer.date AS date) as date,
                LAG(commit.committer.date) OVER 
                    (PARTITION BY commit.committer.name
                    ORDER BY commit.committer.date) as prev_commit_date
            FROM commits
            WHERE commit.committer.name = '{author_name}'
        )

        SELECT 
            id,
            name,
            date,
            CASE
                WHEN prev_commit_date IS NULL THEN 0
                WHEN prev_commit_date = date THEN 0
                ELSE DATEDIFF(DAY, prev_commit_date, date) + 1
            END AS time_diff
        FROM commits_with_lag
        ORDER BY date;   
    """
    result = spark.sql(query)
    return result

In [194]:
# test
expected = {"count": "2532",
            "stddev": "0.8288280854699502",
            "max": "36",
            "min": "0",
            "mean": "0.045813586097946286"}
    
result = assignment_4("GitHub")

time_diff_stats = result.describe("time_diff").collect()

for res in time_diff_stats:
    assert res["time_diff"] == expected[res["summary"]]

### Assignment 5

**Function Description**

To gain some insight into Spark SQL and its aggregation functions, you are required to implement a function that returns a DataFrame containing two columns:

- `day` (int): The day of the week based on the commit date, where Sunday is 1, Monday is 2, etc.
- `commits_per_day`: The total number of commits made on that particular day of the week.

### Expected DataFrame Example:

| day | commits_per_day |
|-----|-----------------|
| 0   | 32              |
| ... | ...             |

### Parameters:

- **commits**: A DataFrame containing the commit data. You can refer to `commit.json` and `data_raw.json` for the structure of the file, or run `println(commits.schema)` to inspect the schema.

### Return:

- A DataFrame containing:
  - `day`: The day of the week as an integer.
  - `commits_per_day`: The total number of commits made on that day of the week.

In [217]:
def assignment_5():
    query = """
        SELECT 
            dayofweek(commit.committer.date) as day,
            COUNT(*) as commits_per_day
        FROM commits
        GROUP BY day
    """
    res = spark.sql(query)
    return res

In [219]:
result = assignment_5().collect()

expected = {7:60, 4:373, 3:151, 1:70, 2:187, 5:9021, 6:138}

for res in result:
    assert res['commits_per_day'] == expected[res['day']]



                                                                                