## 00 Set up to call API


In [18]:
# lib
import requests
from requests import get
from datetime import datetime
from dateutil.relativedelta import relativedelta
import pandas as pd
from sqlalchemy import create_engine
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')


In [4]:
# setup owner name , access_token, and headers 
access_token='{sealed}' 
headers = {    "Authorization": f"Token {access_token}",
    "Accept": "application/vnd.github+json"}


## 01 Get Data of L6M commits of airflow repo

In [5]:
# main function to call github API
def get_commits_l6m(**kwargs):
    """
    Returns the number of commits to a GitHub repository.
    """
    biz_date = datetime.today() + relativedelta(months=-6)
    biz_date = biz_date.isoformat()
    params = {
        "since":biz_date,
        "per_page": 100,  # Number of results per page
        "page": 1  # Page number to retrieve
    }
    #initial request
    response = requests.get('https://api.github.com/repos/apache/airflow/commits', params)
    results = []
    
    if response.status_code == 200:
        results = response.json()
        #set a marker to see if we retrive all of the results
        done = False

        while not done:
            if 'next' in response.links:
                params["page"] += 1
                response = requests.get('https://api.github.com/repos/apache/airflow/commits', params)
                if response.status_code == 200:
                    results.extend(response.json())
                else:
                    # There was an error with the request, so stop processing
                    done = True
            else: 
                 # There are no more pages of results, so stop processing
                done = True            
    return  [i["commit"]["author"] for i in results]


In [6]:
commit_data = get_commits_l6m()

## 02 Using Pyspark

In [None]:
# set up for pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
print(type(spark))

In [9]:
# create dataframe: 1. have a grass_date to partition the table 2. turn datetime of commit from ISO8601 to datetime type (for mysql)
from pyspark.sql.functions import *

columns = ["datetime", "email", "name"]
df_commit = spark.createDataFrame(data=commit_data, schema = columns)
df_commit = df_commit.withColumn('grass_date', date_format(current_timestamp(), 'yyyy-MM-dd'))
df_commit = df_commit.withColumn("datetime",date_format('datetime',"yyyy-MM-dd HH:mm:ss"))


In [10]:
# the analysis base done
df_commit.show()

+-------------------+--------------------+----------------+----------+
|           datetime|               email|            name|grass_date|
+-------------------+--------------------+----------------+----------+
|2022-12-17 09:40:15|Andrey.Anshin@tar...|   Andrey Anshin|2022-12-18|
|2022-12-17 06:24:33|49878111+TohnJhom...|     John Thomas|2022-12-18|
|2022-12-17 02:04:36| brent@astronomer.io|   Brent Bovenzi|2022-12-18|
|2022-12-16 18:33:56|  uranusjr@gmail.com|  Tzu-ping Chung|2022-12-18|
|2022-12-16 17:37:10|    jarek@potiuk.com|    Jarek Potiuk|2022-12-18|
|2022-12-16 06:59:07|103602455+syedahs...|   Syed Hussaain|2022-12-18|
|2022-12-16 04:29:15|amoghrajesh1999@g...|     Amogh Desai|2022-12-18|
|2022-12-16 04:05:10|45845474+eladkal@...|         eladkal|2022-12-18|
|2022-12-16 00:43:52|  uranusjr@gmail.com|  Tzu-ping Chung|2022-12-18|
|2022-12-16 00:43:17|  uranusjr@gmail.com|  Tzu-ping Chung|2022-12-18|
|2022-12-15 18:29:25|cmachalow@linkedi...|Charles Machalow|2022-12-18|
|2022-

                                                                                

In [104]:
# connect to my mysql local database
conn = mysql.connector.connect(
    host='localhost',
    user='root',
    password='kelsy2022',
    database='edb'
)

# Iterate over the rows of the DataFrame and execute an INSERT statement for each row
for row in df_commit.rdd.toLocalIterator():
    insert_query = f"INSERT INTO commit_l6m_airflow VALUES ({','.join(['%s'] * len(row))})"
    conn.cursor().execute(insert_query, row)

# Commit the transaction
conn.commit()

# Close the connection
conn.close()

In [11]:
# create tempview table for sparksql
df_commit.createOrReplaceTempView("commit_l6m")
##after read mysql local table using pyspark, can also call df.createOrReplaceTempView

In [12]:
# this is used to check if we cover l6m data
sqlDF_0 = spark.sql("SELECT min(datetime), max(datetime) FROM commit_l6m")
sqlDF_0.show()

+-------------------+-------------------+
|      min(datetime)|      max(datetime)|
+-------------------+-------------------+
|2022-06-19 16:53:15|2022-12-17 09:40:15|
+-------------------+-------------------+



## 03 Query 1

In [133]:
#query 1: For the ingested commits, determine the top 5 committers by count of commits and the number  of commits. 

sqlDF_1 = spark.sql('''
SELECT name, commit_cnt 
FROM 
  (
    SELECT distinct name, email, count(*) as commit_cnt 
    FROM commit_l6m 
    WHERE grass_date = current_date
    GROUP BY name, email
  ) 
Order By commit_cnt desc 
LIMIT 5
''')
sqlDF_1.show()

+---------------+----------+
|           name|commit_cnt|
+---------------+----------+
|   Jarek Potiuk|       365|
|Daniel Standish|       108|
| Tzu-ping Chung|        93|
| Jed Cunningham|        77|
|  Andrey Anshin|        75|
+---------------+----------+



## 04 Query 2

In [21]:
#query 2: For the ingested commits, determine the committer with the longest commit streak.  

sqlDF_2 = spark.sql('''

WITH streak AS (
	SELECT *
	FROM(
        SELECT name, email, datetime, lag(name) OVER (PARTITION BY NULL ORDER BY datetime) as prev_committer
        FROM commit_l6m
		WHERE grass_date = current_date
		)
	WHERE name = prev_committer
)
select name, count(*) as record
from streak
group by name
order by count(name) desc
LIMIT 1
''')
sqlDF_2.show()

+------------+------+
|        name|record|
+------------+------+
|Jarek Potiuk|   139|
+------------+------+



## 05 Query 3

In [95]:
#query 3: For the ingested commits, generate a heatmap of number of commits count by all users by day  of the week and by 3 hour blocks.  

##new date_format doc: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
##one thing needs to discuss/align: the original time is recorded in UTC time, here I use SG Local Time to do the heatmap
sqlDF_3_1 = spark.sql('''
SELECT name, email, datetime, date_format(datetime, 'E') as dw,
	case
		when date_format(datetime, 'H') in ('0', '1', '2') then '00-03'
		when date_format(datetime, 'H') in ('3', '4', '5') then '03-06'
		when date_format(datetime, 'H') in ('6', '7', '8') then '06-09'
		when date_format(datetime, 'H') in ('9', '10', '11') then '09-12'
		when date_format(datetime, 'H') in ('12', '13', '14') then '12-15'
		when date_format(datetime, 'H') in ('15', '16', '17') then '15-18'
		when date_format(datetime, 'H') in ('18', '19', '20') then '18-21'
		else '21-00'
	end as hour_itv
FROM commit_l6m
WHERE grass_date = current_date
''')
sqlDF_3_1.show()

+----------------+--------------------+--------------------+---+--------+
|            name|               email|            datetime| dw|hour_itv|
+----------------+--------------------+--------------------+---+--------+
|  Tzu-ping Chung|  uranusjr@gmail.com|2022-12-16T10:33:56Z|Fri|   18-21|
|    Jarek Potiuk|    jarek@potiuk.com|2022-12-16T09:37:10Z|Fri|   15-18|
|   Syed Hussaain|103602455+syedahs...|2022-12-15T22:59:07Z|Fri|   06-09|
|     Amogh Desai|amoghrajesh1999@g...|2022-12-15T20:29:15Z|Fri|   03-06|
|         eladkal|45845474+eladkal@...|2022-12-15T20:05:10Z|Fri|   03-06|
|  Tzu-ping Chung|  uranusjr@gmail.com|2022-12-15T16:43:52Z|Fri|   00-03|
|  Tzu-ping Chung|  uranusjr@gmail.com|2022-12-15T16:43:17Z|Fri|   00-03|
|Charles Machalow|cmachalow@linkedi...|2022-12-15T10:29:25Z|Thu|   18-21|
|  Tzu-ping Chung|  uranusjr@gmail.com|2022-12-15T09:20:25Z|Thu|   15-18|
|Ephraim Anierobi|splendidzigy24@gm...|2022-12-15T07:28:09Z|Thu|   15-18|
|     D. Ferruzzi| ferruzzi@amazon.com

In [22]:
sqlDF_3_1 = spark.sql('''
SELECT *
FROM
  (WITH raw AS
    (
      SELECT name, email, datetime,
      date_format(datetime, 'E') AS dw,
      CASE
        WHEN date_format(datetime, 'H') IN ('0','1','2') THEN '00-03'
        WHEN date_format(datetime, 'H') IN ('3','4','5') THEN '03-06'
        WHEN date_format(datetime, 'H') IN ('6','7','8') THEN '06-09'
        WHEN date_format(datetime, 'H') IN ('9','10','11') THEN '09-12'
        WHEN date_format(datetime, 'H') IN ('12','13','14') THEN '12-15'
        WHEN date_format(datetime, 'H') IN ('15','16','17') THEN '15-18'
        WHEN date_format(datetime, 'H') IN ('18','19','20') THEN '18-21'
        ELSE '21-00'
      END AS hour_itv
      FROM commit_l6m
      WHERE grass_date = current_date
    )
    SELECT dw, hour_itv, count(*) AS total_commits
    FROM raw
    GROUP BY dw, hour_itv
  ) AS data
PIVOT(SUM(total_commits) FOR hour_itv IN ('00-03','03-06','06-09','09-12','12-15','15-18','18-21','21-00'))
ORDER BY
  CASE dw
    WHEN 'Mon' THEN 1
    WHEN 'Tuesday' THEN 2
    WHEN 'Wednesday' THEN 3
    WHEN 'Thursday' THEN 4
    WHEN 'Friday' THEN 5
    WHEN 'Saturday' THEN 6
    ELSE 7
  END
''')
sqlDF_3_1.show()

+---+-----+-----+-----+-----+-----+-----+-----+-----+
| dw|00-03|03-06|06-09|09-12|12-15|15-18|18-21|21-00|
+---+-----+-----+-----+-----+-----+-----+-----+-----+
|Mon|   19|   28|   30|   38|   39|   37|   31|   37|
|Sun|   16|   28|   12|    7|    7|   16|   17|   11|
|Sat|   50|   50|   32|   18|    8|   15|   11|   19|
|Thu|   37|   60|   47|    7|   19|   40|   41|   50|
|Wed|   47|   63|   33|   24|   28|   34|   53|   47|
|Fri|   57|   55|   37|   23|    7|   30|   25|   65|
|Tue|   56|   46|   33|    7|   15|   36|   60|   65|
+---+-----+-----+-----+-----+-----+-----+-----+-----+



## 06 Query from mysql table
The above workflow is based on df_commit, which I fetch directly from API. I did insert the table into table commit_l6m_airflow in local mysql database.
Hence, the above query can change the source to mysql table.
Folllowings are the way to get data from mysql table: df.

In [125]:
# Create a cursor
conn = mysql.connector.connect(
    host='localhost',
    user='root',
    password='kelsy2022',
    database='edb'
)

cursor = conn.cursor()


# Select rows from the table
query = 'SELECT * FROM commit_l6m_airflow'
cursor.execute(query)
'''
# Get the column names
column_names = [column[0] for column in cursor.description]

# Store the results in a DataFrame
df = pd.DataFrame(cursor.fetchall(), columns=column_names)
'''

# Get the column names
column_names = [column[0] for column in cursor.description]

# Store the results in a DataFrame
df = spark.createDataFrame(cursor.fetchall(), column_names)

# Show the DataFrame
df.show()


# Close the cursor and connection
cursor.close()
conn.close()

+-------------------+--------------------+----------------+----------+
|           datetime|               email|            name|grass_date|
+-------------------+--------------------+----------------+----------+
|2022-12-17 09:40:15|Andrey.Anshin@tar...|   Andrey Anshin|2022-12-18|
|2022-12-17 06:24:33|49878111+TohnJhom...|     John Thomas|2022-12-18|
|2022-12-17 02:04:36| brent@astronomer.io|   Brent Bovenzi|2022-12-18|
|2022-12-16 18:33:56|  uranusjr@gmail.com|  Tzu-ping Chung|2022-12-18|
|2022-12-16 17:37:10|    jarek@potiuk.com|    Jarek Potiuk|2022-12-18|
|2022-12-16 06:59:07|103602455+syedahs...|   Syed Hussaain|2022-12-18|
|2022-12-16 04:29:15|amoghrajesh1999@g...|     Amogh Desai|2022-12-18|
|2022-12-16 04:05:10|45845474+eladkal@...|         eladkal|2022-12-18|
|2022-12-16 00:43:52|  uranusjr@gmail.com|  Tzu-ping Chung|2022-12-18|
|2022-12-16 00:43:17|  uranusjr@gmail.com|  Tzu-ping Chung|2022-12-18|
|2022-12-15 18:29:25|cmachalow@linkedi...|Charles Machalow|2022-12-18|
|2022-

In [127]:
df
#type = DataFrame. Equals to df_commit

DataFrame[datetime: timestamp, email: string, name: string, grass_date: date]