In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.sql.types import *
import pyspark.sql.types as spark_types

import utils

spark = SparkSession.builder.master("spark://vm1:7077").appName("Cluster Code - Zafar").getOrCreate()

In [None]:
# Testing to see if everything is okay
commits = utils.read_csv(spark, "hdfs:/commits_new.csv")
commits.limit(10).show()

### Number of source & fork repositories of users (shouldn't be run again)

In [5]:
projects =  utils.read_csv(spark, "hdfs:/projects.csv", "projects_new.csv")

# This file has now been replaced with a new version
user_more =  utils.read_csv(spark, "hdfs:/user_more.csv")

# Find source repos
df4 = (
    projects
    .where((projects.deleted == 0) & (projects.forked_from.isNull()))
    .groupby("owner_id")
    .count()
    .withColumnRenamed("count", "repos_source")
    .withColumnRenamed("owner_id", "user_id")
)

# Find forks
df6 = (
    projects
    .where((projects.deleted == 0) & (projects.forked_from.isNotNull()))
    .groupby("owner_id")
    .count()
    .withColumnRenamed("count", "repos_forks")
    .withColumnRenamed("owner_id", "user_id")
)

# Join Data
df5 = user_more.join(df4, "user_id", "full").join(df6, "user_id", "full")

# Write to local directorya
df5.write.csv(
    "/user_more_2",
    mode="overwrite",
    nullValue="\\N"
)

### Issue Punchcard

In [5]:
issues = utils.read_csv(spark, "hdfs:/issues.csv")

df5 = (
    issues
    .where(
        issues.created_at.isNotNull()
    )
    .select(
        F.year('created_at').alias('year'), 
        F.month('created_at').alias('month'), 
        F.dayofmonth('created_at').alias('day'), 
        F.hour('created_at').alias('hour')
    )
   .groupBy('year', 'month', 'day', 'hour')
   .count()
)

df5.limit(10).show()

# df5.explain()
# df5.coalesce(1).write.json("hdfs:/issue_punchcard")

+----+-----+----+----+--------+
|year|month| day|hour|   count|
+----+-----+----+----+--------+
|null| null|null|null|54086297|
+----+-----+----+----+--------+



### Number of Commits of each user

In [15]:
commits = utils.read_csv(spark, "hdfs:/commits_new.csv")

res = (
    commits
    .groupby("author_id")
    .count()
    .withColumnRenamed("count", "commits_authored")
    .withColumnRenamed("author_id", "user_id")
)

res.write.csv(
    "hdfs:/user_commit_count",
    mode="overwrite",
    nullValue="\\N"
)

### Number of Commits of every project

In [16]:
# commits = utils.read_csv(spark, "hdfs:/commits_new.csv")

res = (
    commits
    .groupby("project_id")
    .count()
    .withColumnRenamed("count", "total_commits")
)

res.write.csv(
    "hdfs:/project_commit_count",
#     mode="overwrite",
    nullValue="\\N"
)

### Number of Commits on a project not authored by Owner

In [17]:
commits = utils.read_csv(spark, "hdfs:/commits_new.csv")
projects = utils.read_csv(spark, "hdfs:/projects_new.csv")

commits.createOrReplaceTempView("commits")
projects.createOrReplaceTempView("projects")

q = """
    SELECT C.project_id as project_id, COUNT(*) as total_commits_by_others
    FROM commits as C, projects as P
    WHERE C.project_id = P.id
    AND C.author_id <> P.owner_id
    GROUP BY C.project_id
"""

res = spark.sql(q)

res.write.csv(
    "hdfs:/project_commit_others_count",
#     mode="overwrite",
    nullValue="\\N"
)

### Number of commits of a user made on repositories not owned by them

In [18]:
q = """
    SELECT C.author_id as user_id, 
           COUNT(*) as total_commits_on_other_repos

    FROM commits as C, projects as P

    WHERE C.project_id = P.id
    AND C.author_id <> P.owner_id

    GROUP BY C.author_id
"""

res = spark.sql(q)

res.write.csv(
    "hdfs:/user_commit_others_count",
#     mode="overwrite",
    nullValue="\\N"
)

### "Top" Users

In [25]:
user_more = utils.read_csv(spark, "hdfs:/user_more.csv")
users = utils.read_csv(spark, "hdfs:/users.csv")

users = (
    users
    .withColumnRenamed("id", "user_id")
    .select("user_id", "login", "company", "type", "fake", "deleted")
)

user_new = users.join(user_more, "user_id", "left")

# Torvalds is at the top
# most_followers = user_new.orderBy("followers", ascending=False)

# most_followers.show()
top = (
    user_new
    .where(
        (user_new.type == "USR")
        & (user_new.followers.isNotNull())
#         & (user_new.starred.isNotNull())
    )
    .orderBy("followers", ascending=False)
)

top.show(50)

+-------+----------------+--------------------+----+----+-------+---------+---------+-------+------------+-----------+
|user_id|           login|             company|type|fake|deleted|following|followers|starred|repos_source|repos_forks|
+-------+----------------+--------------------+----+----+-------+---------+---------+-------+------------+-----------+
|   5203|        torvalds|    Linux Foundation| USR|   0|      0|     null|    52722|      1|           5|          2|
|    896|     JakeWharton|        Square, Inc.| USR|   0|      0|       24|    30161|    261|          86|         24|
| 376498|              Tj|                Apex| USR|   0|      0|      175|    25827|    966|         238|         81|
|   6240|      addyosmani|              Google| USR|   0|      0|      243|    24604|    522|         147|        154|
|   1779|       paulirish|   Google Chrome, ♥z| USR|   0|      0|      253|    24510|    445|          56|        242|
|   9236|         mojombo|                null| 

### Commits Punchcard for top users

In [7]:
commits = utils.read_csv(spark, "hdfs:/commits_new.csv")
projects = utils.read_csv(spark, "hdfs:/projects_new.csv")

top_users = [5203, 896, 376498, 6240, 1779, 9236, 1570, 3871, 1736, 13009, 24452, 616741, 2468643, 2427, 81423, 796, 10005, 417948, 2016667, 1954]

res = (
    commits
    .where(
        (commits.created_at.isNotNull())
        & (commits.author_id.isin(top_users))
    )
    .select(
        "author_id",
        F.date_format('created_at', 'E').alias('day'),
        F.hour('created_at').alias('hour')
    )
   .groupBy(
    'author_id',
    'day',
    'hour'
   )
   .count()
   .withColumnRenamed("count", "commits")
#    .withColumnRenamed("author_id", "user_id")
)

res.write.json(
    "hdfs:/top_users_commit_punchcard"
)

### Commits Punchcard for EVERYONE!

In [8]:
res = (
    commits
    .where(
        (commits.created_at.isNotNull())
    )
    .select(
        F.date_format('created_at', 'E').alias('day'),
        F.hour('created_at').alias('hour')
    )
   .groupBy(
    'day',
    'hour'
   )
   .count()
   .withColumnRenamed("count", "commits")
)

res.write.json(
    "hdfs:/commit_punchcard"
)