## Reading Git Final Project

In [1]:
import os
import subprocess
import datetime
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import functions as F
from pyspark.sql.types import *

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

In [2]:
# import pyspark functions and data types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col, element_at, count, countDistinct, avg, split, array_contains, isnan, when

In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [4]:
gcs_folder = 'gs://msca-bdp-data-open/final_project_git'

#### Check data size in GCS

In [5]:
cmd = 'gsutil du -s -h ' + gcs_folder

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (f'Total directory size: {line}')

retval = p.wait() # Wait for the child process to terminate.

Total directory size: 1.36 TiB     gs://msca-bdp-data-open/final_project_git



### Read Git data from GCS

#### Languages
Programming languages by repository as reported by GitHub's https://developer.github.com/v3/repos/#list-languages API

In [6]:
%%time

df_languages = spark.read.parquet(os.path.join(gcs_folder, 'languages'))
print(f'Records read from dataframe *languages*: {df_languages.count():,.0f}')

[Stage 1:>                                                          (0 + 2) / 2]

Records read from dataframe *languages*: 3,325,634
CPU times: user 6.39 ms, sys: 5.14 ms, total: 11.5 ms
Wall time: 9.82 s


                                                                                

In [7]:
df_languages.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- language: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- bytes: long (nullable = true)



In [8]:
df_languages.take(10)

                                                                                

[Row(repo_name='lemi136/puntovent', language=[Row(name='C', bytes=80)]),
 Row(repo_name='taxigps/nctool', language=[Row(name='C', bytes=4461)]),
 Row(repo_name='ahy1/strbuf', language=[Row(name='C', bytes=5573)]),
 Row(repo_name='nleiten/mod_rpaf-ng', language=[Row(name='C', bytes=30330)]),
 Row(repo_name='kmcallister/alameda', language=[Row(name='C', bytes=17077)]),
 Row(repo_name='TianYJ1/Snake', language=[Row(name='C', bytes=10381)]),
 Row(repo_name='doctorfree/Lds', language=[Row(name='C', bytes=56413)]),
 Row(repo_name='SumiMakito/ReinaJNIDemo', language=[Row(name='C', bytes=7444)]),
 Row(repo_name='serprex/rainbeam', language=[Row(name='C', bytes=1964)]),
 Row(repo_name='pdigiglio/oracolo', language=[Row(name='C', bytes=1966)])]

In [9]:
df_languages.select([count(when(col(c).isNull(), c)).alias(c) for c in df_languages.columns]).show()



+---------+--------+
|repo_name|language|
+---------+--------+
|        0|       0|
+---------+--------+



                                                                                

#### Licenses
Open source license SPDX code for each repository as detected by https://developer.github.com/v3/licenses/

In [10]:
%%time

df_licenses = spark.read.parquet(os.path.join(gcs_folder, 'licenses'))
print(f'Records read from dataframe *licenses*: {df_licenses.count():,.0f}')

Records read from dataframe *licenses*: 3,325,634
CPU times: user 4.22 ms, sys: 69 µs, total: 4.29 ms
Wall time: 846 ms


In [11]:
df_licenses.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- license: string (nullable = true)



In [12]:
df_licenses.show(10)

+--------------------+------------+
|           repo_name|     license|
+--------------------+------------+
|autarch/Dist-Zill...|artistic-2.0|
|thundergnat/Prime...|artistic-2.0|
|kusha-b-k/Turabia...|artistic-2.0|
|onlinepremiumoutl...|artistic-2.0|
|huangyuanlove/Lia...|artistic-2.0|
|gitpan/Mojo-Serve...|artistic-2.0|
|justincampbell/la...|artistic-2.0|
|b4ldr/atlas-trace...|artistic-2.0|
|             MBAOS/A|artistic-2.0|
|   elbehosg/sg_test1|artistic-2.0|
+--------------------+------------+
only showing top 10 rows



#### Commits
Unique Git commits from open source repositories on GitHub, pre-grouped by repositories they appear in.

In [13]:
%%time

df_commits = spark.read.parquet(os.path.join(gcs_folder, 'commits'))
print(f'Records read from dataframe *commits*: {df_commits.count():,.0f}')



Records read from dataframe *commits*: 265,419,190
CPU times: user 893 ms, sys: 134 ms, total: 1.03 s
Wall time: 4min 9s


                                                                                

In [14]:
df_commits.printSchema()

root
 |-- commit: string (nullable = true)
 |-- tree: string (nullable = true)
 |-- parent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- author: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- time_sec: long (nullable = true)
 |    |-- tz_offset: long (nullable = true)
 |    |-- date: struct (nullable = true)
 |    |    |-- seconds: long (nullable = true)
 |    |    |-- nanos: long (nullable = true)
 |-- committer: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- time_sec: long (nullable = true)
 |    |-- tz_offset: long (nullable = true)
 |    |-- date: struct (nullable = true)
 |    |    |-- seconds: long (nullable = true)
 |    |    |-- nanos: long (nullable = true)
 |-- subject: string (nullable = true)
 |-- message: string (nullable = true)
 |-- trailer: array (nullable = true)
 |    |-- element: struct (contains

In [15]:
df_commits.take(1)

                                                                                

[Row(commit='aa358905a1b12c6fa43b6e877e907fc9d36ff0b9', tree='df3f8bf61bf1cb0dff3a86ebe18671792c2d4f27', parent=['ea230a45a0e97e4d95b5f7fae9ce7ef659b60291'], author=Row(name='conda-forge-coordinator', email='e5ec32c8593b6ea3202de4b38c1a852e362e575b@gmail.com', time_sec=1647357073, tz_offset=0, date=Row(seconds=1647357073, nanos=None)), committer=Row(name='conda-forge-coordinator', email='e5ec32c8593b6ea3202de4b38c1a852e362e575b@gmail.com', time_sec=1647357073, tz_offset=0, date=Row(seconds=1647357073, nanos=None)), subject='Updated the qcengine feedstock.', message='Updated the qcengine feedstock.', trailer=[], difference=[], difference_truncated=True, repo_name=['conda-forge/feedstocks'], encoding=None)]

#### Contents
Unique file contents of text files under 1 MiB on the HEAD branch.  
Can be joined to `files` dataset using the id columns to identify the repository and file path.

In [None]:
%%time

df_contents = spark.read.parquet(os.path.join(gcs_folder, 'contents'))
print(f'Records read from dataframe *commits*: {df_contents.count():,.0f}')



Records read from dataframe *commits*: 281,191,977
CPU times: user 1.76 s, sys: 354 ms, total: 2.11 s
Wall time: 8min 49s


                                                                                

In [None]:
df_contents.printSchema()

root
 |-- id: string (nullable = true)
 |-- size: long (nullable = true)
 |-- content: string (nullable = true)
 |-- binary: boolean (nullable = true)
 |-- copies: long (nullable = true)



In [None]:
df_contents.take(10)

                                                                                

[Row(id='d5b1049fdaa182fa5f47e6f9c45bd46a478118eb', size=1570, content='{"version":3,"sources":["angular-locale_az-cyrl-az.js"],"names":["angular","module","$provide","PLURAL_CATEGORY","ZERO","ONE","TWO","FEW","MANY","OTHER","value","DATETIME_FORMATS","AMPMS","DAY","ERANAMES","ERAS","FIRSTDAYOFWEEK","MONTH","SHORTDAY","SHORTMONTH","STANDALONEMONTH","WEEKENDRANGE","fullDate","longDate","medium","mediumDate","mediumTime","short","shortDate","shortTime","NUMBER_FORMATS","CURRENCY_SYM","DECIMAL_SEP","GROUP_SEP","PATTERNS","gSize","lgSize","maxFrac","minFrac","minInt","negPre","negSuf","posPre","posSuf","id","localeID","pluralCat","n","opt_precision"],"mappings":"AAAA,YACAA,SAAQC,OAAO,eAAiB,WAAY,SAASC,GACrD,GAAIC,IAAmBC,KAAM,OAAQC,IAAK,MAAOC,IAAK,MAAOC,IAAK,MAAOC,KAAM,OAAQC,MAAO,QAC9FP,GAASQ,MAAM,WACbC,kBACEC,OACE,KACA,MAEFC,KACE,QACA,eACA,kBACA,WACA,cACA,OACA,SAEFC,UACE,MACA,MAEFC,MACE,MACA,MAEFC,eAAkB,EAClBC,OACE,SACA,SACA,OACA,QACA,MACA,OACA,OACA,SACA,WACA,UACA,SACA,UAEFC,UACE,QACA,eACA,

#### Files
File metadata for all files at HEAD.  
Join with `contents` dataset on id columns to search text.

In [None]:
%%time

df_files = spark.read.parquet(os.path.join(gcs_folder, 'files'))
print(f'Records read from dataframe *files*: {df_files.count():,.0f}')



Records read from dataframe *files*: 2,309,424,945
CPU times: user 251 ms, sys: 62.9 ms, total: 314 ms
Wall time: 1min 16s


                                                                                

In [None]:
df_files.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- ref: string (nullable = true)
 |-- path: string (nullable = true)
 |-- mode: long (nullable = true)
 |-- id: string (nullable = true)
 |-- symlink_target: string (nullable = true)



In [None]:
df_files.take(1)

                                                                                

[Row(repo_name='enzbang/diouzhtu', ref='refs/heads/master', path='gwiad_wiki_service/scripts/do-install.sh', mode=33261, id='49365044eed28769152726537f00a93a68988b07', symlink_target=None)]

#### 1. Discard irrelevant or obviously erroneous data.

In [None]:
# get the language from the language column by taking 'name' attribute of a Row object
df_languages1 = df_languages.withColumn('language_new', element_at(col('language'), 1)['name'])
df_languages1 = df_languages1.select(col("repo_name"), col("language_new"))
df_languages1.show(5)

[Stage 28:>                                                         (0 + 1) / 1]

+-------------------+------------+
|          repo_name|language_new|
+-------------------+------------+
|  lemi136/puntovent|           C|
|     taxigps/nctool|           C|
|        ahy1/strbuf|           C|
|nleiten/mod_rpaf-ng|           C|
|kmcallister/alameda|           C|
+-------------------+------------+
only showing top 5 rows



                                                                                

In [None]:
# filter columns in commits dataset
df_commits1 = df_commits.withColumn('commit_author', col('committer')['name'])
df_commits1 = df_commits1.withColumn('commit_time', col('committer')['time_sec'])
df_commits1 = df_commits1.withColumn('repo_name', element_at(col('repo_name'), 1))
df_commits1 = df_commits1.select(col("commit_author"), col('commit_time'), col("repo_name"), col("subject"), col("message"))
df_commits1.show(10)

+--------------------+-----------+--------------------+--------------------+--------------------+
|       commit_author|commit_time|           repo_name|             subject|             message|
+--------------------+-----------+--------------------+--------------------+--------------------+
|conda-forge-coord...| 1647357073|conda-forge/feeds...|Updated the qceng...|Updated the qceng...|
|           Rob Allen| 1323443117|MadCat34/zend-esc...|Merge remote-trac...|Merge remote-trac...|
|        Zhihui Zhang| 1303924643|pscedu/slash2-stable|provide hook to o...|provide hook to o...|
|conda-forge-coord...| 1643200796|conda-forge/feeds...|Updated the mailc...|Updated the mailc...|
|        armaneshaghi| 1394080200|armaneshaghi/prof...|    2014-03-06T04:30|   2014-03-06T04:30
|
| Frank Clay Anderson| 1181587417|opensim-org/opens...|Now producing the...|Now producing the...|
|conda-forge-coord...| 1542830971|conda-forge/feeds...|Updated the kiwip...|Updated the kiwip...|
|Android Git Autom..

                                                                                

In [None]:
# filter columns in files dataset
df_files1 = df_files.select(col("repo_name"), col('path'))
df_files1.show(10)

+--------------------+--------------------+
|           repo_name|                path|
+--------------------+--------------------+
|    enzbang/diouzhtu|gwiad_wiki_servic...|
|TheMrNomis/Latex-...|             LFM.php|
|TheMrNomis/Latex-...|PHP/LatexFlavored...|
|    xurigan/uexJPush|EUExJPush/EUExJPu...|
|    xurigan/uexJPush|EUExJPush/uexJPus...|
|    xurigan/uexJPush|EUExJPush/EUExJPu...|
|    xurigan/uexJPush|EUExJPush/EUExJPu...|
|    xurigan/uexJPush|EUExJPush/EUExJPu...|
|    xurigan/uexJPush|EUExJPush/EUExJPu...|
|    xurigan/uexJPush|EUExJPush/EUExJPu...|
+--------------------+--------------------+
only showing top 10 rows



                                                                                

#### 2. Complete thorough EDA to identify which variables you can use to complete your analysis

Since I made sure to exclude duplicate variables in the filtering part, I will check for poorly populated variables in this part. All the selected variables are included in the analysis part, so I will try to leave as many of them as possible.

In [None]:
#df_languages1s = df_languages1.sample(fraction=0.02, seed=42)
#df_commits1s = df_commits1.sample(fraction=0.02, seed=42)
#df_files1s = df_files1.sample(fraction=0.02, seed=42)
#df_licenses1s = df_licenses.sample(fraction=0.02, seed=42)

In [None]:
df_languages1s = df_languages1.sample(fraction=0.2, seed=42)
df_commits1s = df_commits1.sample(fraction=0.2, seed=42)
df_files1s = df_files1.sample(fraction=0.2, seed=42)
df_licenses1s = df_licenses.sample(fraction=0.2, seed=42)

In [None]:
c1 = df_languages1s.count()
c2 = df_commits1s.count()
c3 = df_files1s.count()
c4 = df_licenses1s.count()
print(f"df_languages1s:{c1}")
print(f"df_commits1s:{c2}")
print(f"df_files1s:{c3}")
print(f"df_licenses1s:{c4}")

24/03/08 22:42:41 WARN org.apache.spark.deploy.yarn.YarnAllocator: Cannot find executorId for container: container_1709851323677_0026_01_000002
                                                                                

df_languages1s:665942
df_commits1s:53082478
df_files1s:461885426
df_licenses1s:665957


In [None]:
joined_df = df_commits1s.join(
    df_languages1s, df_commits1s["repo_name"] == df_languages1s["repo_name"]).join(
    df_licenses1s, df_commits1s["repo_name"] == df_licenses1s["repo_name"]).drop(
    df_languages1s['repo_name']).drop(df_licenses1s['repo_name'])
joined_df.show(5)



In [None]:
#gs_write_path = 'gs://msca-bdp-students-bucket/shared_data/esmyslovskikh/sample_df.parquet'
#joined_df.write.parquet(gs_write_path)

In [None]:
gs_write_path_whole = 'gs://msca-bdp-students-bucket/shared_data/esmyslovskikh/whole_df.parquet'
joined_df.write.parquet(gs_write_path_whole)

In [None]:
import datetime
import pytz

datetime.datetime.now(pytz.timezone('US/Central')).strftime("%a, %d %B %Y %H:%M:%S")