In [0]:
# create execution parameters
dbutils.widgets.dropdown("monthly",'false', choices=['true','false'])
dbutils.widgets.dropdown("daily",'false', choices=['true','false'])
dbutils.widgets.dropdown("hourly",'false', choices=['true','false'])
dbutils.widgets.text("month","")
dbutils.widgets.text("startingDay","")
dbutils.widgets.text("endingDay","")
dbutils.widgets.text("startingHour","")
dbutils.widgets.text("endingHour","")
dbutils.widgets.dropdown("eventType",'push', choices=['push', 'issue', 'all'])
dbutils.widgets.text("dataContainer","")

In [0]:
pip install azure.storage.blob

Python interpreter will be restarted.
Collecting azure.storage.blob
  Using cached azure_storage_blob-12.17.0-py3-none-any.whl (388 kB)
Collecting typing-extensions>=4.3.0
  Using cached typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Collecting isodate>=0.6.1
  Using cached isodate-0.6.1-py2.py3-none-any.whl (41 kB)
Collecting azure-core<2.0.0,>=1.28.0
  Using cached azure_core-1.28.0-py3-none-any.whl (185 kB)
Installing collected packages: typing-extensions, isodate, azure-core, azure.storage.blob
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 4.1.1
    Not uninstalling typing-extensions at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-40a979d0-fffe-49eb-a2ba-cfe79ecf801b
    Can't uninstall 'typing-extensions'. No files were found to uninstall.
Successfully installed azure-core-1.28.0 azure.storage.blob-12.17.0 isodate-0.6.1 typing-extensions-4.7.1
Python interpreter will

In [0]:
# imports
import requests
import pandas as pd
import json
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.core.pipeline.transport import HttpResponse
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, coalesce, lit, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType
import gzip
from pathlib import Path
import os
import glob
import time
import datetime
import logging



In [0]:
# Starting logging
day_time = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d-%H-%M-%S')
p_logfile = 'ETL-log-'+day_time+'.log'
print(p_logfile)
# create logger with 'Custom_log'
logger = logging.getLogger('log4j')
logger.setLevel(logging.INFO) 
# create file handler which logs even debug messages
fh = logging.FileHandler(p_logfile,mode='a')
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
#setting for ingoring frequest log information
#logging.getLogger("py4j").setLevel(logging.ERROR)
# tell the handler to use this format
#fh (file Handler)
fh.setFormatter(formatter)
#ch (console handler)
ch.setFormatter(formatter)
#Clearing old frequent log information to ignore that.
if (logger.hasHandlers()):
     logger.handlers.clear()
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch) 

ETL-log-2023-07-14-15-32-43.log


In [0]:
def get_data(date, time):

    """downloads data to {date}-{time}.json.gz file from GHArchive dataset by hitting an API reuqest

    Args:
        date(str or int) : date of YYYY-MM-DD format.
        time(str or int) : hour of the day ranging from 0-23

    Returns:
        None
    """

    #create filename based on inputs date and time and create url
    filename = str(date) +'-'+ str(time)
    url = "https://data.gharchive.org/"+ filename +'.json.gz'
    print(url)
    logger.debug('downloading from URL: ' + url)

    #send get API request to the url
    response = requests.get(url)

    # check resonse and save gunziped json file in response into a local file on databricks
    if response.status_code == 200:

       with open(filename +'.json.gz', "wb") as file:

            file.write(response.content)

       print("File downloaded successfully.")
       logger.debug('downloaded successfully from URL:' + url)

    else:

       print("Error downloading the file.")
       logger.error('Error downloading from URL:' + url)

In [0]:
def unzip_jsongz(filename):
    """Unpacks {filename}.json.gz into {filename}.json

    Args:
        filename (str): Name of gunzip file

    Returns:
        None
    """
    input_file = '/databricks/driver/'+filename+'.json.gz'
    output_file = '/databricks/driver/'+filename+'.json'
    with gzip.open(input_file, 'rb') as gz_file:
        with open(output_file, 'wb') as out_file:
            out_file.write(gz_file.read())
            logger.debug('Successfully unpacked ' + filename + '.json.gz')

In [0]:
def flatten_df(df):
    """Normalizes payload column to the first level 

    Args:
        df (spark dataframe): the  spark df that needs to be normalized

    Returns:
        spark dataframe: dataframe with all payload objects converted into json dump strings
    """
    columns = df.columns
    # creating a list of columns to be selected into the new dataframe
    select_exprs = [
        col("type"),
        col("public"),
        col("created_at")
    ]
    # creating a list of all objects in payload object
    payload_columns = [
        "payload.action",
        "payload.before",
        "payload.comment",
        "payload.commits",
        "payload.description",
        "payload.distinct_size",
        "payload.forkee",
        "payload.head",
        "payload.issue",
        "payload.master_branch",
        "payload.member",
        "payload.number",
        "payload.release",
        "payload.repository_id",
        "payload.review",
        "payload.pages",
        "payload.pull_request",
        "payload.push_id",
        "payload.pusher_type",
        "payload.ref",
        "payload.size",
        "payload.ref_type"
    ]
    # creating a select expression while casting each of the payload objects into string
    for column in payload_columns:
        select_expr = col(column).cast("string").alias(column) 
        #appending select payload columns to the list of columns to be selected       
        select_exprs.append(select_expr)
    # creating flattened dataframe
    flat_df = df.select(select_exprs)
    return flat_df

In [0]:
def categorize_issue_label(labels,title):
    """ Categorizes issue label into 4 categories: bug, enhancement, request and other, based on the labels and title for the issue event.

    Args:
        labels (str): label name of the issue event
        title (str): title of the issue event
    Returns:
        target: category of the issue
    """
    #converting labels to lower case
    labels = labels.lower()
    #matching labels to a catergory based upon its contents
    if "bug" in labels or "issue" in labels:
        return "bug"
    elif "request" in labels or "question" in labels:
        return "request"
    elif "enhancement" in labels or "feature" in labels or "improvement" in labels:
        return "enhancement"
    
    #converting title to lower case
    title = title.lower()
    #matching title to a catergory based upon its contents
    if "bug" in title or "issue" in title:
        return "bug"
    elif "request" in title or "question" in title:
        return "request"
    elif "enhancement" in title or "feature" in title or "improvement" in title:
        return "enhancement"
    else:
        return "other"

In [0]:
def flatten_issues_df(issues_df):
    """Normalizes payload column by selecting necessary columns for issuesEvent

    Args:
        df (spark dataframe): the spark df that needs to be normalized

    Returns:
        spark dataframe: dataframe with all issues relevant payload objects placed in seperate columns
    """
    # creating a dataframe out of all necessary objects in payload object related to issues event
    issues_flat_df = issues_df.select(
    col("repo.name").alias("repo_name"),
    col("payload.issue.html_url").alias("html_url"),
    col("payload.issue.title").alias("title"),
    col("payload.issue.body").alias("body"),
    explode(col("payload.issue.labels")).alias("labels"))
    #extracting label names into target column
    issues_flat_df = issues_flat_df.withColumn("target", col("labels.name"))
    #categorizing target column
    categorize_udf = udf(categorize_issue_label, StringType())
    issues_flat_df = issues_flat_df.withColumn("target", categorize_udf(issues_flat_df["target"], issues_flat_df["title"]))

    return issues_flat_df

In [0]:
def flatten_push_df(push_df):
    """Normalizes payload column by selecting necessary columns for pushEvent

    Args:
        df (spark dataframe): the spark df that needs to be normalized

    Returns:
        spark dataframe: dataframe with all push event relevant payload objects placed in seperate columns
    """
    columns = push_df.columns
    # creating a list of columns to be selected into the new dataframe
    select_exprs = [
        col("type"),
        col("public"),
        col("created_at"),
        col("org"),
        col("actor"),
        col("repo")
    ]
    # creating a list of all objects in payload object relevant to push event
    payload_columns = [
        "payload.action",
        "payload.before",
        "payload.commits",
        "payload.repository_id",
        "payload.review",
        "payload.pages",
        "payload.push_id",
        "payload.pusher_type",
        "payload.ref",
        "payload.size",
        "payload.ref_type"
    ]
    
    for column in payload_columns:
        #creating a select expression for each of the payload objects
        select_expr = col(column).alias(column) 
        #appending select payload columns to the list of columns to be selected       
        select_exprs.append(select_expr)
    # creating flattened dataframe
    flat_push_df = push_df.select(select_exprs)
    return flat_push_df

In [0]:
def upload_to_blob_storage( connection_string, container_name, file_path, blob_name):
    """upload file to blob storage

    Args:
        connection_string (str): Azure blob connection string
        container_name (str): Azure blob container name
        file_path (str): The file location of file to upload to blob storage
        blob_name (str): The location on blob storage to upload the file to

    Returns:
        None
    """
    #fetching blob_service_client
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    #fetching blob_service_client
    container_client = blob_service_client.get_container_client(container_name)
    #checking if storage container exists
    if not container_client.exists():
        #creating a storage container if it doesn't exist
        container_client.create_container()
    #fetching blob_client
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    #uploading to blob storage
    with open(file_path, "rb") as data:
        blob_client.upload_blob(data)

In [0]:
def clear_container_data(connection_string, container_name):
    """Clear all data in an Azure Blob Storage container.

    Args:
        connection_string (str): Azure Blob Storage connection string.
        container_name (str): Azure Blob Storage container name.

    Returns:
        None
    """
    #fetching blob_service_client
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    #fetching container client
    container_client = blob_service_client.get_container_client(container_name)
    #fetching files with in the storage container
    blobs = container_client.list_blobs()
    #deleting the fetched files in the storage container
    for blob in blobs:
        container_client.delete_blob(blob.name)

In [0]:
def etl(monthly,hourly,daily,month,startingDay,endingDay,startingHour,endingHour,eventType = 'push', container_name = 'data'):
    """Downloads, transforms and uploads data in parquet format to blob storage

    Args:
        monthly (str): string 'fale' or 'true'. fetch a month's data.
        hourly (str): string 'fale' or 'true'. fetch data in an hour range of a day.
        daily (str): string 'fale' or 'true'. fetch data in a day range of a month.
        month (str): month to fetch the data from. YYYY-MM format.
        startingDay (str): starting day to fecth data from. 1-30 range.
        endingDay (str): ending day to fecth data from. 1-30 range.
        startingHour (str): starting hour to fecth data from. 0-23 range.
        endingHour (str): ending hour to fecth data from. 0-23 range.
        eventType (str, Optional): Type of events to be fetched.
        container_name (str, Optional): Azure storage container name to store the data in.

    Returns:
        None
    """
    #initializes spark context
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext
    #initializing constants
    cache_folder_path_spark = "file:///databricks/driver/cache"
    cache_folder_path = "/databricks/driver/cache"
    folder_path = "/databricks/driver/"
    connection_string = "DefaultEndpointsProtocol=https;AccountName=pod4projectstorage;AccountKey=2hClDrVPLGX4QBDBk8OylAkHqczIQfDja66Yl488rmj/0+vb+CAzOxL5qMe5XyM9ZupgwveVRm3N+AStriO5vg==;EndpointSuffix=core.windows.net"
    #clearing container data
    clear_container_data(connection_string,container_name)
    logger.debug('Cleared container data')

    dayRange = None
    hourRange = None
    event_type = None
    flatten = None
    total_download_size = 0

    #setting day and hour ranges to fecth the data from
    if monthly:
        dayRange = range(1,31)
        hourRange = range(0,24)
    elif daily:
        dayRange = range(int(startingDay), int(endingDay)+1)
        hourRange = range(0,24)
    elif hourly:
        dayRange = range(int(startingDay), int(startingDay)+1)
        hourRange = range(int(startingHour),int(endingHour)+1)

    # Setting the type of events to filter and respective flatten method
    if eventType == 'push':
        event_type = 'PushEvent'
        flatten = flatten_push_df
    elif eventType == 'issue':
        event_type = 'IssuesEvent'
        flatten = flatten_issues_df
    elif eventType == 'all':
        event_type = 'all'
        flatten = flatten_df
    #looping over day range
    for day in dayRange:
        #formatting day to a DD format as required by GHArchives
        day_str = "{:02}".format(day)
        #initializing an empty df to store at most a days data
        main_df = None
        parquet_filename = None
        #looping over hour range
        for hour in hourRange:
            filename = month +"-" + day_str +"-" + str(hour)
            #fetching an hours data
            get_data(month +"-"+day_str, hour)
            #unpacking the hourly data
            unzip_jsongz(filename)
            #reading json data into a spark df
            df = spark.read.json("file:///databricks/driver/"+filename+".json")
            #filtering required events
            if event_type != 'all':
                df = df.filter(df.type == event_type)
            #flattening df
            flat_df = flatten(df)
            #combining hourly data into main df's daily data
            if main_df is not None:
                main_df = main_df.unionAll(flat_df)
            else:
                main_df = flat_df 
        #Clearing cache folder containing parquet file from previous loop in the day range
        if os.path.exists(cache_folder_path):
            os.system("rm -rf {}".format(cache_folder_path))
            print("cache cleared")
            logger.debug('databricks cache cleared')
        #creating a single parquet file in the cache folder
        main_df.coalesce(1).write.parquet(cache_folder_path_spark)
        #fecthing parquet file created by spark in cache folder
        parquet_files = Path(cache_folder_path).glob("*.parquet")
        #fetching name of the parquet file
        for file in parquet_files:
            parquet_filename = file
            print(parquet_filename)
        #creating blob name
        blob_name = month +"-"+day_str+".snappy.parquet"
        #uploading to blob storage
        upload_to_blob_storage( connection_string, container_name, parquet_filename, blob_name)
        print("uploaded {} to blob storage".format(blob_name))
        #fetching parquet file size
        file_size = os.path.getsize(parquet_filename)
        print("File size: ", int(file_size), " bytes")
        #logging file size of the blob
        logger.info(blob_name+' of size '+str(file_size)+' successfully uploaded to blob storage')
        #adding current file size to the total download size
        total_download_size += int(file_size)
        logger.info('Current total download size: ' + str(total_download_size))
        #removing all local json files to clear memory space
        json_files = glob.glob(os.path.join(folder_path, "*.json*"))
        for file_path in json_files:
            os.remove(file_path)
            print("File '{}' removed.".format(file_path))
            logger.debug("File '{}' removed.".format(file_path))
    

In [0]:
# fetch execution parameters
monthly = dbutils.widgets.get("monthly")
monthly = False if monthly == 'false' else bool(monthly)
daily = dbutils.widgets.get("daily")
daily = False if daily == 'false' else bool(daily)
hourly = dbutils.widgets.get("hourly")
hourly = False if hourly == 'false' else bool(hourly)
month = dbutils.widgets.get("month")
startingDay = dbutils.widgets.get("startingDay")
endingDay = dbutils.widgets.get("endingDay")
startingHour = dbutils.widgets.get("startingHour")
endingHour = dbutils.widgets.get("endingHour")
eventType = dbutils.widgets.get("eventType")
container_name = dbutils.widgets.get("dataContainer")
#setting container name as data by default if it is null or empty
if container_name is None or container_name.strip() == "":
    container_name = 'data'

In [0]:
#Executing ETL
etl(bool(monthly),bool(hourly),bool(daily),month,startingDay,endingDay,startingHour,endingHour,eventType,container_name)

https://data.gharchive.org/2023-06-06-4.json.gz
File downloaded successfully.
https://data.gharchive.org/2023-06-06-5.json.gz
File downloaded successfully.


2023-07-14 15:34:21,024 - log4j - INFO - 2023-06-06.snappy.parquet of size 1541085 successfully uploaded to blob storage
2023-07-14 15:34:21,025 - log4j - INFO - Current total download size: 1541085


/databricks/driver/cache/part-00000-tid-8000346145254716032-e721d24f-dd76-447e-b49d-991c967ad790-25-1-c000.snappy.parquet
uploaded 2023-06-06.snappy.parquet to blob storage
File size:  1541085  bytes
File '/databricks/driver/2023-06-06-4.json' removed.
File '/databricks/driver/2023-06-06-5.json' removed.
File '/databricks/driver/2023-06-06-5.json.gz' removed.
File '/databricks/driver/2023-06-06-4.json.gz' removed.


In [0]:
# creating log file
logging.shutdown()

In [0]:
# Storing logs in blob storage
connection_string = "DefaultEndpointsProtocol=https;AccountName=pod4projectstorage;AccountKey=2hClDrVPLGX4QBDBk8OylAkHqczIQfDja66Yl488rmj/0+vb+CAzOxL5qMe5XyM9ZupgwveVRm3N+AStriO5vg==;EndpointSuffix=core.windows.net"
container_name = "logs"
file_path = p_logfile
blob_name = p_logfile
upload_to_blob_storage( connection_string, container_name, file_path, blob_name)

In [0]:
#close all widgets
dbutils.widgets.removeAll()