# Establish Connection to Azure Blob Storage

## Install and Import Necessary Libraries

In [1]:
# Install necessary libraries
!pip install pyspark
!pip install python-dotenv



In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
from pyspark.sql.functions import explode, col, to_timestamp, substring
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, ArrayType

## Load Azure Connection Information

In [3]:
# Load environment variables for Azure access information
load_dotenv("azure_connection.env")

storage_account_name = os.getenv("AZURE_ACCOUNT_NAME")
storage_account_key = os.getenv("AZURE_STORAGE_KEY")
storage_container_name = "kaggle-datasets"
parquet_blob_name = "github-dataset-full.parquet"

## Create and Configure Spark Connection with Azure

In [4]:
# Creating Spark session
spark = SparkSession.builder \
    .appName("Read Parquet from Azure Blob Storage") \
    .config(f"spark.hadoop.fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.2,com.microsoft.azure:azure-storage:8.6.6") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Remove garbage error texts
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/opt/anaconda3/envs/naturalistvenv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthewleffler/.ivy2/cache
The jars for the packages stored in: /Users/matthewleffler/.ivy2/jars
org.apache.hadoop#hadoop-azure added as a dependency
com.microsoft.azure#azure-storage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7793224b-51a3-411c-9ce2-790dfce1ba2f;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-azure;3.3.2 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in local-m2-cache
	found org.apache.httpcomponents#httpcore;4.4.13 in local-m2-cache
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.11 in local-m2-cache
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found org.eclipse.jetty#jetty-util-ajax;9.4.43.v20210629 in central
	found org.eclipse.jetty#jetty-util;9.4.43.v20210629 in central
	found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in centr

In [5]:
# Set authentification for Spark to connect to Azure
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_key
)

In [6]:
# Set path to parquet file for access
parquet_path = f"wasbs://{storage_container_name}@{storage_account_name}.blob.core.windows.net/{parquet_blob_name}"

# Load Data from Azure to Spark DataFrame

In [7]:
# Define schema for raw data
schema = StructType([
    StructField("bio", StringType(), True),
    StructField("blog", StringType(), True),
    StructField("commit_list", ArrayType(
        StructType([
            StructField("author_id", LongType(), True),
            StructField("commit_at", StringType(), True),
            StructField("committer_id", LongType(), True),
            StructField("generate_at", StringType(), True),
            StructField("message", StringType(), True),
            StructField("repo_description", StringType(), True),
            StructField("repo_id", LongType(), True),
            StructField("repo_name", StringType(), True),
            StructField("repo_owner_id", LongType(), True)
        ])
    ), True),
    StructField("commits", LongType(), True),
    StructField("company", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("email", StringType(), True),
    StructField("follower_list", ArrayType(LongType(), True), True),
    StructField("followers", LongType(), True),
    StructField("following", LongType(), True),
    StructField("following_list", ArrayType(LongType(), True), True),
    StructField("hirable", BooleanType(), True),
    StructField("id", LongType(), True),
    StructField("is_suspicious", BooleanType(), True),
    StructField("location", StringType(), True),
    StructField("login", StringType(), True),
    StructField("name", StringType(), True),
    StructField("public_gists", LongType(), True),
    StructField("public_repos", LongType(), True),
    StructField("repo_list", ArrayType(
        StructType([
            StructField("created_at", StringType(), True),
            StructField("default_branch", StringType(), True),
            StructField("description", StringType(), True),
            StructField("fork", BooleanType(), True),
            StructField("forks_count", LongType(), True),
            StructField("full_name", StringType(), True),
            StructField("has_wiki", BooleanType(), True),
            StructField("id", LongType(), True),
            StructField("language", StringType(), True),
            StructField("license", StringType(), True),
            StructField("open_issues", LongType(), True),
            StructField("owner_id", LongType(), True),
            StructField("pushed_at", StringType(), True),
            StructField("size", LongType(), True),
            StructField("stargazers_count", LongType(), True),
            StructField("updated_at", StringType(), True)
        ])
    ), True),
    StructField("type", StringType(), True),
    StructField("updated_at", StringType(), True)
])

In [8]:
# Read data from parquet file into spark DataFrame
df = spark.read.schema(schema).parquet(parquet_path)

In [9]:
# Print table to ensure data was properly loaded
df.show(10, truncate=False)

                                                                                

+----+----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Convert Types and Save Data to Azure

## repo_list

In [10]:
# Extract user id, login username, and information from repo list
repo_list_df = df.select(
    "id",
    "login",
    explode("repo_list").alias("repo")
)

# Extract information from each exploded column from the above explode function into its own row
repo_list_df = repo_list_df.select(
    col("id").alias("user_id"),
    col("login").alias("user_login"),
    col("repo.created_at").alias("repo_created_at"),
    col("repo.default_branch").alias("repo_default_branch"),
    col("repo.description").alias("repo_description"),
    col("repo.fork").alias("repo_fork"),
    col("repo.forks_count").alias("repo_forks_count"),
    col("repo.full_name").alias("repo_full_name"),
    col("repo.has_wiki").alias("repo_has_wiki"),
    col("repo.id").alias("repo_id"),
    col("repo.language").alias("repo_language"),
    col("repo.license").alias("repo_license"),
    col("repo.open_issues").alias("repo_open_issues"),
    col("repo.owner_id").alias("repo_owner_id"),
    col("repo.pushed_at").alias("repo_pushed_at"),
    col("repo.size").alias("repo_size"),
    col("repo.stargazers_count").alias("repo_stargazers_count"),
    col("repo.updated_at").alias("repo_updated_at")
)

# Removes the UTC offset to the time to convert timestamp into local times
repo_list_df = repo_list_df \
               .withColumn("repo_created_at", substring("repo_created_at", 1, 19)) \
               .withColumn("repo_pushed_at", substring("repo_pushed_at", 1, 19)) \
               .withColumn("repo_updated_at", substring("repo_updated_at", 1, 19))

# Converts the strings into a timestamp type
repo_list_df = repo_list_df \
    .withColumn("repo_created_at", to_timestamp("repo_created_at")) \
    .withColumn("repo_pushed_at", to_timestamp("repo_pushed_at")) \
    .withColumn("repo_updated_at", to_timestamp("repo_updated_at"))

In [11]:
# Print to ensure timestamp conversion worked
repo_list_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- user_login: string (nullable = true)
 |-- repo_created_at: timestamp (nullable = true)
 |-- repo_default_branch: string (nullable = true)
 |-- repo_description: string (nullable = true)
 |-- repo_fork: boolean (nullable = true)
 |-- repo_forks_count: long (nullable = true)
 |-- repo_full_name: string (nullable = true)
 |-- repo_has_wiki: boolean (nullable = true)
 |-- repo_id: long (nullable = true)
 |-- repo_language: string (nullable = true)
 |-- repo_license: string (nullable = true)
 |-- repo_open_issues: long (nullable = true)
 |-- repo_owner_id: long (nullable = true)
 |-- repo_pushed_at: timestamp (nullable = true)
 |-- repo_size: long (nullable = true)
 |-- repo_stargazers_count: long (nullable = true)
 |-- repo_updated_at: timestamp (nullable = true)



In [12]:
# Ensure data correctly formatted
repo_list_df.show(10, truncate=False)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+----------+-------------------+-------------------+--------------------+---------+----------------+---------------------------------+-------------+--------+-------------+------------+----------------+-------------+-------------------+---------+---------------------+-------------------+
|user_id |user_login|repo_created_at    |repo_default_branch|repo_description    |repo_fork|repo_forks_count|repo_full_name                   |repo_has_wiki|repo_id |repo_language|repo_license|repo_open_issues|repo_owner_id|repo_pushed_at     |repo_size|repo_stargazers_count|repo_updated_at    |
+--------+----------+-------------------+-------------------+--------------------+---------+----------------+---------------------------------+-------------+--------+-------------+------------+----------------+-------------+-------------------+---------+---------------------+-------------------+
|16860856|chiaotzu  |2016-01-24 06:10:37|master             |my world            |false    |0               |

                                                                                

In [None]:
# # Write data to Azure container
# repo_list_df.write.mode("overwrite").parquet(
#     "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/repo_list_data"
# )

In [16]:
# Read data to ensure data was properly saved
repo_list_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/repo_list_data"
)

In [None]:
# Ensure data was saved
repo_list_df.show(10, truncate=False)

## following_list

In [14]:
# Extract non-null following list from raw data
following_list_df = df.select(['id', 'login', 'following_list']).filter('following_list IS NOT NULL')

# Explode the array so each follower becomes a separate row
following_list_df = following_list_df.select(
    col("id").alias("user_id"),
    col("login").alias("user_login"),
    explode('following_list').alias('following_id')
)

In [15]:
# Print to ensure timestamp conversion worked
following_list_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- user_login: string (nullable = true)
 |-- following_id: long (nullable = true)



In [16]:
# Ensure data correctly formatted
following_list_df.show(10, truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+-------+----------+------------+
|user_id|user_login|following_id|
+-------+----------+------------+
|141210 |jlsuarezs |574         |
|141210 |jlsuarezs |613         |
|141210 |jlsuarezs |630         |
|141210 |jlsuarezs |2525        |
|141210 |jlsuarezs |3341        |
|141210 |jlsuarezs |5010        |
|141210 |jlsuarezs |5364        |
|141210 |jlsuarezs |6384        |
|141210 |jlsuarezs |6612        |
|141210 |jlsuarezs |6699        |
+-------+----------+------------+
only showing top 10 rows



                                                                                

In [None]:
# # Write data to Azure container
# following_list_df.write.mode("overwrite").parquet(
#     "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/following_list_data"
# )


In [20]:
# Read data to ensure data was properly saved
following_list_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/following_list_data"
)

In [None]:
# Ensure data was saved
following_list_df.show(10, truncate=False)

## follower_list

In [21]:
# Extract non-null follower list from raw data
follower_list_df = df.select(['id', 'login', 'follower_list']).filter('follower_list IS NOT NULL')

# Explode the follower_List array into separate rows
follower_list_df = follower_list_df.select(
    col("id").alias("user_id"),
    col("login").alias("user_login"),
    explode("follower_List").alias("follower_login")
)

In [22]:
# Print to ensure timestamp conversion worked
follower_list_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- user_login: string (nullable = true)
 |-- follower_login: long (nullable = true)



In [23]:
# Ensure data correctly formatted
follower_list_df.show(10, truncate=False)

[Stage 7:>                                                          (0 + 1) / 1]

+-------+----------+--------------+
|user_id|user_login|follower_login|
+-------+----------+--------------+
|141210 |jlsuarezs |45214         |
|141210 |jlsuarezs |2398264       |
|141210 |jlsuarezs |7193921       |
|141210 |jlsuarezs |3604053       |
|141210 |jlsuarezs |5877145       |
|141210 |jlsuarezs |10236771      |
|141210 |jlsuarezs |17078242      |
|141210 |jlsuarezs |12826433      |
|141210 |jlsuarezs |13119017      |
|141210 |jlsuarezs |2439665       |
+-------+----------+--------------+
only showing top 10 rows



                                                                                

In [None]:
# # Write data to Azure container
# follower_list_df.write.mode("overwrite").parquet(
#     "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/follower_list_data"
# )

In [24]:
# Read data to ensure data was properly saved
follower_list_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/follower_list_data"
)

In [None]:
# Ensure data was saved
follower_list_df.show(10, truncate=False)

## commit_list

In [25]:
# Extract user id, login username, and information from commit list
commit_list_df = df.select(
    "id",
    "login",
    explode("commit_list").alias("commit")
)

# Extract information from each exploded column from the above explode function into its own row
commit_list_df = commit_list_df.select(
    col("id").alias("user_id"),
    col("login").alias("user_login"),
    col("commit.author_id").alias("author_id"),
    col("commit.commit_at").alias("commit_at"),
    col("commit.committer_id").alias("committer_id"),
    col("commit.generate_at").alias("generate_at"),
    col("commit.message").alias("message"),
    col("commit.repo_description").alias("repo_description"),
    col("commit.repo_id").alias("repo_id"),
    col("commit.repo_name").alias("repo_name"),
    col("commit.repo_owner_id").alias("repo_owner_id")
)

# Removes the UTC offset to the time to convert timestamp into local times
commit_list_df = commit_list_df \
               .withColumn("commit_at", substring("commit_at", 1, 19)) \
               .withColumn("generate_at", substring("generate_at", 1, 19))

# Converts the strings into a timestamp type
commit_list_df = commit_list_df \
    .withColumn("commit_at", to_timestamp("commit_at")) \
    .withColumn("generate_at", to_timestamp("generate_at"))

In [26]:
# Print to ensure timestamp conversion worked
commit_list_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- user_login: string (nullable = true)
 |-- author_id: long (nullable = true)
 |-- commit_at: timestamp (nullable = true)
 |-- committer_id: long (nullable = true)
 |-- generate_at: timestamp (nullable = true)
 |-- message: string (nullable = true)
 |-- repo_description: string (nullable = true)
 |-- repo_id: long (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- repo_owner_id: long (nullable = true)



In [27]:
# Ensure data was saved
commit_list_df.show(10, truncate=False)

[Stage 9:>                                                          (0 + 1) / 1]

+--------+----------+---------+-------------------+------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------+--------+---------------------------------+-------------+
|user_id |user_login|author_id|commit_at          |committer_id|generate_at        |message                                                                                                                                                                                                      |repo_description                  |repo_id |repo_name                        |repo_owner_id|
+--------+----------+---------+-------------------+------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [None]:
# # Save commit_list_df
# commit_list_df.write.mode("overwrite").parquet(
#     "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/commit_list_data"
# )

In [28]:
# Read data to ensure data was properly saved
commit_list_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/commit_list_data"
)

In [None]:
# Ensure data was saved
repo_list_df.show(10, truncate=False)

## Non-List Data

In [29]:
# Save names of non-list data as a list for later extraction
columns_to_keep = [field.name for field in df.schema.fields 
                   if not isinstance(field.dataType, (ArrayType, StructType))]

# Extract only non-list type columns from raw data DataFrame
non_list_df = df.select(columns_to_keep)

# Removes the UTC offset to the time to convert timestamp into local times
non_list_df = non_list_df \
               .withColumn("created_at", substring("created_at", 1, 19)) \
               .withColumn("updated_at", substring("updated_at", 1, 19))

# Converts the strings into a timestamp type
non_list_df = non_list_df \
    .withColumn("created_at", to_timestamp("created_at")) \
    .withColumn("updated_at", to_timestamp("updated_at"))

In [30]:
# Print to ensure timestamp conversion worked
non_list_df.printSchema()

root
 |-- bio: string (nullable = true)
 |-- blog: string (nullable = true)
 |-- commits: long (nullable = true)
 |-- company: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- followers: long (nullable = true)
 |-- following: long (nullable = true)
 |-- hirable: boolean (nullable = true)
 |-- id: long (nullable = true)
 |-- is_suspicious: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- login: string (nullable = true)
 |-- name: string (nullable = true)
 |-- public_gists: long (nullable = true)
 |-- public_repos: long (nullable = true)
 |-- type: string (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [31]:
# Ensure data was saved
non_list_df.show(10, truncate=False)

[Stage 11:>                                                         (0 + 1) / 1]

+----+----------------+-------+----------+-------------------+-----+---------+---------+-------+--------+-------------+--------+----------------------+-----------+------------+------------+----+-------------------+
|bio |blog            |commits|company   |created_at         |email|followers|following|hirable|id      |is_suspicious|location|login                 |name       |public_gists|public_repos|type|updated_at         |
+----+----------------+-------+----------+-------------------+-----+---------+---------+-------+--------+-------------+--------+----------------------+-----------+------------+------------+----+-------------------+
|NULL|                |NULL   |NULL      |2015-09-20 19:52:29|NULL |0        |0        |NULL   |14413602|true         |NULL    |llciq992              |NULL       |0           |0           |User|2016-02-28 10:26:34|
|NULL|                |0      |NULL      |2014-10-05 10:46:27|NULL |0        |0        |NULL   |9025223 |false        |NULL    |cymssss45   

                                                                                

In [None]:
# # Save commit_list_df
# non_list_df.write.mode("overwrite").parquet(
#     "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/non_list_data"
# )

In [32]:
# Read data to ensure data was properly saved
non_list_df = spark.read.parquet(
    "wasbs://kaggle-datasets@matthewleffler1.blob.core.windows.net/clean_data/non_list_data"
)

In [None]:
# Ensure data was saved
repo_list_df.show(10, truncate=False)