# Connecting Social Media via Domain Names
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import modules.parsers as parsers
import glob
import pandas as pd
import os
import pathlib

In [2]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.postgresql:postgresql:42.1.1")\
                     .master("spark://pop-os.localdomain:7077")\
                     .enableHiveSupport()\
                     .getOrCreate()
sc = spark.sparkContext

22/04/19 14:34:15 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.2.99 instead (on interface eno1)
22/04/19 14:34:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/paul/.ivy2/cache
The jars for the packages stored in: /home/paul/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-21d9147e-c0df-49b7-9ddf-72d2d1e04f6b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/paul/Projects/DataEngineering/Capstone/env/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.postgresql#postgresql;42.1.1 in central
:: resolution report :: resolve 65ms :: artifacts dl 2ms
	:: modules in use:
	org.postgresql#postgresql;42.1.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-21d9147e-c0df-49b7-9ddf-72d2d1e04f6b
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/6ms)
22/04/19 14:34:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Set

### Step 1: Scope the Project and Gather Data

## Who is going to use the data model, think of user persona that you think will benefit from this?

There are already startups out there harvesting email addresse from github repositories in order to sell people stuff.[Source](https://news.ycombinator.com/item?id=30977883) Connecting those email addresses with other social media data allows for better valuation of the value that can be extracted from each email address.

For me personally I want to better understand the relationships between individuals and the groups they attach themselves to and how they overlap. For example I find a corperate entity that is capable of using the same domain name for their website as well as their email has a certain level of competence that should be noted.


## What are that types of questions this data model can help answer?

* Can we find the same domain name being shared for both email addresses and normal websites
* Total commits per domain name, total commits per email address, total commits per git organization, most contributors per git organization, most contributors per repo 
* Can we find people using the same username across email domains as well as reddit accounts?

#### Scope 

The purpose of this project is to develop a ETL pipeline for social media information, in this case reddit comments and git metadata. The unique quality across these datasets are domain names. Git metadata includes email addresses which use a domain name and website URL's can be extracted from reddit comments.

#### Describe and Gather Data 

Two datasets were used for this project,

* Reddit comment export from [pushift.io](https://files.pushshift.io/reddit/comments/daily/), specically the RC_2018-01-01 export
* A custom list of git repos with metadata extracted using a custom script
### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

**Reddit Comment**

The main problem with the reddit dataset is that it is too large with various files containing JSON files 1-5 Gb in size. I split these files up to make ingestion easier.
* Cross dataset comparisons
  * Join domain name of git email with domain name of URL in reddit comment
  * Join username from git email with reddit username
* Inner dataset queries
  * Reddit
    * Most comments per user
    * Most comments per subreddit
  * Git
    * Groupby Email Address
    * Group by email address AND repo

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

**Reddit Comment**

The domain names inside the body of text are not parsed yet therefore I will have to do that myself.

**Git data**

I surveyed a lot of different tools to extract metadata from git repos. I discovered [mergestat](https://github.com/mergestat/mergestat) a simple open source program that allows one to perform SQL queries on all data in a git repository and even includes a ndjson export therefore the data can be ingested strait into pandas without a second thought.

#### Cleaning Steps

**Reddit Comment**

There are a bunch of deleted posts that don't serve any purpose so those can be gotten rid of.

**Git data**

There was a problem with mergestat though. There was no way to get the remote_url of the git repo while querying SQL. To solve this I extracted the remote URL myself and fed in a hard coded column into the SQL query. To see how I solved this check out modules/gitindexer.py

In [3]:
# Performing cleaning tasks here
# Reddit data can be loaded into spark directly
p = pathlib.Path("./data/cloned-repos")
p.mkdir(parents=True, exist_ok=True)
remote_urls = pd.read_json("./data/remote_urls_out/part-00000-0e2c3003-35ff-4f6a-825a-6bf7d5966903-c000.json", lines=True)
for remote_url in remote_urls.iterrows():
    print(remote_url[1]["remote_url"])
    git_clone_command = "cd ./data/cloned-repos && git clone {}".format(repo_url)
    os.system(reddit_split_command) # subprocess does not always work

In [4]:
reddit_df = spark.read.json("./data/RC_2018-01-01")

                                                                                

In [5]:
git_repo_df = spark.read.json("./data/git_out/commits/*/*.json")

                                                                                

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

I used a many to many schema. I have two fact tables reddit_df and git_repo_df that can be joined in two ways. Using domain names and usernames hence the two dimmension table joined_users joined_domains.

**reddit_df**

| Field Name             | Datatype | Constraint | Description                                            |
|------------------------|----------|------------|--------------------------------------------------------|
| author                 | text     | None       | Reddit username of who posted comment                  |
| author_cakeday         | bool     | None       | True if user created account on same day previous year |
| author_flair_css_class | text     | None       | CSS class to stylize username                          |
| author_flair_text      | text     | None       | Custom text displayed next to username in comment      |
| body                   | text     | None       | Contents of text                                       |
| controversiality       | int      | None       | Score used to rank comment                             |
| created_utc            | int      | None       | Timestamp when posted                                  |
| edited                 | int      | None       | Timestamp when edited                                  |
| id                     | text     | None       | Unique ID to relate to other comments                  |
| link_id                | text     | None       | Formatted ID to relate to other comments               |
| parent_id              | text     | None       | link_id that this comment was replying to              |
| permalink              | text     | None       | path to comment to share                               |
| subreddit              | text     | None       | Group comment was posted in                            |
| subreddit_type         | text     | None       | Type of subreddit                                      |
| url                    | text     | None       | URL extracted from body                                |
| domain_name            | text     | None       | Domain name extracted from URL                         |




**git_repo_df**

| Field Name     | Datatype | Constraint  | Description                             |
|----------------|----------|-------------|-----------------------------------------|
| author_email   | text     | None        | Email of person who commited to repo    |
| author_name    | text     | None        | Name of person who commmited to repo    |
| commits        | int      | None        | Number of commits in this repo          |
| remote_url     | int      | None        | Remote URL to clone this repo           |
| email_username | int      | None        | Parsed username from this repo          |
| email_domain   | text     | None        | Parsed domain name from this repo       |

**joined_users**

| Field Name     | Datatype | Constraint | Description                                    |
|----------------|----------|------------|------------------------------------------------|
| author_email   | text     | None       | Email address from git repo                    |
| email_username | text     | None       | Matching username between reddit and git email |
| author         | text     | None       | Name of person who commited to git repo        |


**joined_domains**

| Field Name          | Datatype | Constraint | Description                                                     |
|---------------------|----------|------------|-----------------------------------------------------------------|
| domain_name         | text     | None       | Matching domain name between reddit and git repo email address |
| reddit_domain_count | text     | None       | Number of times this domain appeared in comments                |
| email_domain_count  | text     | text       | Number of times this domain appeared in email addresses         |


In [6]:
# Functions for UDFs
# Source https://stackoverflow.com/questions/839994/extracting-a-url-in-python
def extract_urls(s):
    import re
    results = re.findall("(?P<url>https?://[^\s]+)", s)
    if results == []:
        return "NONE"
    else:
        return results[0]
udf_extract_urls = F.udf(extract_urls, T.StringType())
    
def extract_domain_name(s):
    import re
    results = re.findall('^(?:http:\/\/|www\.|https:\/\/)([^\/]+)', s)
    if results == []:
        return "NONE"
    else:
        return results[0]
udf_extract_domain_name = F.udf(extract_domain_name, T.StringType())

def extract_user_email(s):
    import re
    try:
        results = re.findall('([^@]+)', s)
        if results == []:
            return "NONE"
        else:
            return results[0]
    except:
        return "NONE"
udf_extract_user_email = F.udf(extract_user_email, T.StringType())

def extract_domain_name_email(s):
    import re
    try:
        results = re.findall('@(.*)', s)
        if results == []:
            return "NONE"
        else:
            return results[0]
    except:
        return "NONE"
udf_extract_domain_name_email = F.udf(extract_domain_name_email, T.StringType())

In [7]:
reddit_df = reddit_df.withColumn("url", udf_extract_urls(reddit_df.body))
reddit_df = reddit_df.withColumn("domain_name", udf_extract_domain_name(reddit_df.url))

In [8]:
git_repo_df = git_repo_df.withColumn("email_username", udf_extract_user_email(git_repo_df.author_email))
git_repo_df = git_repo_df.withColumn("email_domain", udf_extract_domain_name_email(git_repo_df.author_email))

In [9]:
reddit_domains_grouped = reddit_df.filter( F.col("url") != "NONE" ).groupBy("domain_name").count()
reddit_domains_grouped = reddit_domains_grouped.withColumnRenamed("count", "reddit_domain_count")

In [10]:
git_domains_grouped = git_repo_df.groupBy("email_domain").count()
git_domains_grouped = git_domains_grouped.withColumnRenamed("count", "email_domain_count")

In [11]:
joined_domains = reddit_domains_grouped.alias("reddit")\
  .join(git_domains_grouped.alias("git"), F.col("git.email_domain") == F.col("reddit.domain_name"))

In [12]:
git_domains_grouped = git_domains_grouped.withColumnRenamed("email_domain", "domain_name")

In [13]:
# Coppied below from Stack Overflow because spark 2.4 does not have unionByName
# reddit_domains_grouped.unionByName( git_domains_grouped,  allowMissingColumns=True ).show()
diff1 = [c for c in reddit_domains_grouped.columns if c not in git_domains_grouped.columns]
diff2 = [c for c in git_domains_grouped.columns if c not in reddit_domains_grouped.columns]
comapre_domain_df = git_domains_grouped.select('*', *[F.lit(None).alias(c) for c in diff1]) \
    .unionByName(reddit_domains_grouped.select('*', *[F.lit(None).alias(c) for c in diff2]))

In [14]:
joined_users = reddit_df.alias("reddit")\
  .join(git_repo_df.alias("git"), F.col("git.email_username") == F.col("reddit.author"))

In [15]:
joined_users[["git.author_email", "git.email_username", "reddit.author"]].limit(5).toPandas()

                                                                                

Unnamed: 0,author_email,email_username,author
0,jungans@gmail.com,jungans,jungans
1,mqduck@mqduck.net,mqduck,mqduck
2,mqduck@mqduck.net,mqduck,mqduck
3,mithrandi@mithrandi.net,mithrandi,mithrandi
4,jsh@fb.com,jsh,jsh


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [16]:
def check_df_over_million_lines(df):
    if df.count() >= 1000000:
        return True
    else:
        return False
def contains_n_distinct_in_column(n, df, col_name):
    if df[[col_name]].distinct().count() >= n:
        return True
    else:
        return False

In [17]:
check_df_over_million_lines(reddit_df)

                                                                                

True

In [18]:
contains_n_distinct_in_column(100, reddit_df, "domain_name")

                                                                                

True

In [19]:
contains_n_distinct_in_column(100, git_repo_df, "email_domain")

True

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [22]:
"""
Contains one column, remote_url that is a git remote url that can be used to clone the git 
repos involved in this dataset
"""
os.system("rm -rf ./data/remote_urls_out")
git_repo_df[["remote_url"]].distinct().write.json("./data/remote_urls_out")

**joined_domains**

| Field Name          | Datatype | Constraint | Description                                                     |
|---------------------|----------|------------|-----------------------------------------------------------------|
| domain_name         | text     | None       | Matching domain name between reddit and git repo email address |
| reddit_domain_count | text     | None       | Number of times this domain appeared in comments                |
| email_domain_count  | text     | text       | Number of times this domain appeared in email addresses         |



In [23]:
joined_domains[["domain_name", "reddit_domain_count", "email_domain_count"]].write.json("./data/joined_domains.parquet", mode="overwrite")

                                                                                

**joined_users**

| Field Name     | Datatype | Constraint | Description                                    |
|----------------|----------|------------|------------------------------------------------|
| author_email   | text     | None       | Email address from git repo                    |
| email_username | text     | None       | Matching username between reddit and git email |
| author         | text     | None       | Name of person who commited to git repo        |


In [24]:
joined_users[["git.author_email", "git.email_username", "reddit.author"]].write.json("./data/joined_users.parquet", mode="overwrite")

                                                                                

**reddit_df**

| Field Name             | Datatype | Constraint | Description                                            |
|------------------------|----------|------------|--------------------------------------------------------|
| author                 | text     | None       | Reddit username of who posted comment                  |
| author_cakeday         | bool     | None       | True if user created account on same day previous year |
| author_flair_css_class | text     | None       | CSS class to stylize username                          |
| author_flair_text      | text     | None       | Custom text displayed next to username in comment      |
| body                   | text     | None       | Contents of text                                       |
| controversiality       | int      | None       | Score used to rank comment                             |
| created_utc            | int      | None       | Timestamp when posted                                  |
| edited                 | int      | None       | Timestamp when edited                                  |
| id                     | text     | None       | Unique ID to relate to other comments                  |
| link_id                | text     | None       | Formatted ID to relate to other comments               |
| parent_id              | text     | None       | link_id that this comment was replying to              |
| permalink              | text     | None       | path to comment to share                               |
| subreddit              | text     | None       | Group comment was posted in                            |
| subreddit_type         | text     | None       | Type of subreddit                                      |
| url                    | text     | None       | URL extracted from body                                |
| domain_name            | text     | None       | Domain name extracted from URL                         |


In [25]:
reddit_df.write.json("./data/reddit_df.parquet", mode="overwrite")

                                                                                

**git_metadata**

| Field Name     | Datatype | Constraint  | Description                             |
|----------------|----------|-------------|-----------------------------------------|
| author_email   | text     | None        | Email of person who commited to repo    |
| author_name    | text     | None        | Name of person who commmited to repo    |
| commits        | int      | None        | Number of commits in this repo          |
| remote_url     | int      | None        | Remote URL to clone this repo           |
| email_username | int      | None        | Parsed username from this repo          |
| email_domain   | text     | None        | Parsed domain name from this repo       |

**joined_domains**

| Field Name          | Datatype | Constraint | Description                                                     |
|---------------------|----------|------------|-----------------------------------------------------------------|
| domain_name         | text     | None       | Matching domain name between reddit and git repo email address |
| reddit_domain_count | text     | None       | Number of times this domain appeared in comments                |
| email_domain_count  | text     | text       | Number of times this domain appeared in email addresses         |


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
  In the end I used Spark and my local file system but I had processed the same data into a variety of other data engineering tools before landing on this solution. 
  
  Initally Spark would hang when using the regexp_extract function on the 2.3 million rows of the reddit dataset. Since I was working with Regex to process my data I tried using the regex on redshift but the [REGEXP_SUBSTR function](https://docs.aws.amazon.com/redshift/latest/dg/REGEXP_SUBSTR.html) with no success as the dialect of regex refshift supports did not have the features to process the text I needed. I then practiced loading the data and processing it in postgres where I was sucessful using the regexp_matches function. I was able to load the entire dataset into postgres and realized it would take 15-30 second to perform the regex transformation on the body column of the reddit_comments table while spark does it in about 3.
  I also had spark setup to use S3 Object storage but over the course of troubleshooting I ended up running of my free 20000 requests on the AWS free tier.
* Propose how often the data should be updated and why.
  * The reddit dataset could ideally be updated in real time live from reddit.
  * The git repo dataset should be updated with new git repos. This can be done via airflow job once a day
  * New git repos can be added and run once a day via airflow job
  * The aggregations table should be updated when new git repos and commits are added once a day
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   * With 100x the data I would break the parsing of data into chunks. The Pushift dataset is partitioned by day therefore separate spark workers can process each piece of data into the datawarehouse. The task of scraping, indexing, and updating git repos can be paritioned in a similar way as well.
   * With 100x more storage I would need to  data warehouse such as redshift or [Snowflake](https://www.snowflake.com/). 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   * Since both the reddit and git data are all timestamped
 * The database needed to be accessed by 100+ people.
   * Permissions and logging would need to be added to the database. Can't have one person accidentally writing a infinite loop that keeps endlessly requesting data from the database. Logging is imporant to find when this is happening.