# Nexus Media | Your World, United

<img src="Nexus Project Media/Nexus Media Dark.png" style="width:450px; height:auto; display:block; margin:auto;" />

#### Welcome to Nexus Media, the newest social media platform designed to bring everyone together. Connect with family, friends, patrons, and coworkers—all in one place. Share your stories through blogs and vlogs, and discover new connections on your own terms. Join Nexus Media today and experience the future of social networking! 

The following is the process for visually mapping and creating a pipeline for the data from the Nexus Media platform. The information ingested will better help understand how customers are using the platform and help make decisions about what can be done to bring in more customers. The data compares four tables of information for this: Users, Posts, Engagement, and Conversions. Each of these help determine how to create a better product for our customers.

 We create the datasets to use with a python file, and have the data ready to ingest. Then we create functions within AWS to create s3 buckets, glue database, glue crawler, and connect everything with lake formation. We also run these functions to create the proper ETL pipeline. After confirming the processes, we will use redshift to extract the data and format it; making it ready for further analyzing. 

## Business Process Flow

 Below is the Business Process Flow

 <img src="Nexus Project Media/Nexus BPF.png" style="width:600px; height:auto; display:block; margin:auto;" />

## Entity Relational Diagram

Below is the ERD for the Social Media Platform

<img src="Nexus Project Media/Nexus ERD.png" style="width:600px; height:auto; display:block; margin:auto;" />

## Nexus Media Database DDL

### Connections to Source Database

### We are going to make the datasets for each table using the following code:

### users.py

In [1]:
import pandas as pd
import numpy as np
from faker import Faker
import random


# Initialize Faker library
fake = Faker()


# Number of records to generate
num_records = 1000


# Define user types
user_types = ['Personal', 'Business']
# Generate Users dataset
users_data = {
    'UserID': [fake.uuid4() for _ in range(num_records)],
    'UserType': [random.choice(user_types) for _ in range(num_records)],
    'Age': [random.randint(18, 65) for _ in range(num_records)],
    'Gender': [random.choice(['Male', 'Female', 'Other']) for _ in range(num_records)],
    'Location': [fake.city() for _ in range(num_records)],
    'AccountCreationDate': [fake.date_between(start_date='-2y', end_date='today') for _ in range(num_records)],
    'LastLoginDate': [fake.date_between(start_date='-1y', end_date='today') for _ in range(num_records)],
    'TotalLogins': [random.randint(1, 500) for _ in range(num_records)],
    'PreferredDevice': [random.choice(['Mobile', 'Desktop', 'Tablet']) for _ in range(num_records)],
    'ActiveStatus': [random.choice(['Active', 'Inactive']) for _ in range(num_records)]
}

# Create DataFrames
users_df = pd.DataFrame(users_data)

# Save to CSV
users_df.to_csv(f'C:/users_dataset.csv', index=False)

print("Datasets created successfully and saved as CSV files.")
print(users_df.head())



Datasets created successfully and saved as CSV files.
                                 UserID  UserType  Age  Gender  \
0  7fddf168-ca79-4f72-b421-5dbb5abfcce4  Business   52  Female   
1  1a4ba482-9001-4ed0-8a5e-9f78d228af4f  Personal   30  Female   
2  d54846c8-4e0b-4e85-9eb1-7bf1fed22c42  Business   51  Female   
3  c617c22a-337c-4fca-b661-964bf1d58d5b  Business   29  Female   
4  6aff9a88-cb4f-49d0-8065-c1506c11376f  Business   55  Female   

            Location AccountCreationDate LastLoginDate  TotalLogins  \
0      South Adamton          2023-05-24    2024-05-02           47   
1         Chapmanton          2024-07-10    2024-12-04          253   
2  West Michaelshire          2025-01-11    2024-07-28          370   
3           Lake Pam          2025-02-19    2024-05-29          444   
4        Williamston          2024-11-13    2024-11-25          388   

  PreferredDevice ActiveStatus  
0          Mobile       Active  
1         Desktop     Inactive  
2          Mobile      

### posts.py

In [2]:
import pandas as pd
import numpy as np
from faker import Faker
import random

# Initialize Faker library
fake = Faker()

# Number of records to generate
num_records = 1000

# Define post types
post_types = ['Personal Update', 'Business Update', 'Promotional Content']

# Generate Posts dataset
posts_data = {
    'PostID': [fake.uuid4() for _ in range(num_records)],
    'UserID': [random.choice(users_data['UserID']) for _ in range(num_records)],
    'PostType': [random.choice(post_types) for _ in range(num_records)],
    'TotalPosts': [random.randint(1, 1000) for _ in range(num_records)],
    'LikesReceived': [random.randint(0, 5000) for _ in range(num_records)],
    'CommentsReceived': [random.randint(0, 2000) for _ in range(num_records)],
    'SharesReceived': [random.randint(0, 1000) for _ in range(num_records)]
}

# Create DataFrames
posts_df = pd.DataFrame(posts_data)

# Save to CSV
posts_df.to_csv(f'C:/posts_dataset.csv', index=False)

print("Datasets created successfully and saved as CSV files.")
print(posts_df.head())


Datasets created successfully and saved as CSV files.
                                 PostID                                UserID  \
0  e66008e5-2b02-4198-949c-842a024e3bd7  94f39491-0097-4aef-8066-9b1cca7d0b30   
1  d7176723-ba6b-4f5e-9b27-4f56444144b2  bd28ae47-96c8-437b-9908-93f5752579a9   
2  16dfef67-b5b2-4aca-8e73-efd829059d42  6e065bb9-e857-479c-bd1d-face81a93730   
3  79ce3924-1703-49cb-95f5-416e7542b3d0  bd28ae47-96c8-437b-9908-93f5752579a9   
4  4c1a44aa-58eb-4a9e-9a83-a262b1f9378d  43451613-f318-49d9-a06b-802e28052619   

          PostType  TotalPosts  LikesReceived  CommentsReceived  \
0  Business Update         253            152              1358   
1  Business Update         588           3654               946   
2  Business Update          74           2225              1852   
3  Personal Update         935            748               751   
4  Business Update         279           3338               103   

   SharesReceived  
0             418  
1             46

### engagement.py

In [3]:
import pandas as pd
import numpy as np
from faker import Faker
import random

# Initialize Faker library
fake = Faker()

# Number of records to generate
num_records = 1000

# Generate Engagements dataset
engagements_data = {
    'EngagementID': [fake.uuid4() for _ in range(num_records)],
    'UserID': [random.choice(users_data['UserID']) for _ in range(num_records)],
    'FriendsConnectionsCount': [random.randint(1, 2000) for _ in range(num_records)],
    'GroupsJoined': [random.randint(0, 50) for _ in range(num_records)],
    'EventsAttended': [random.randint(0, 100) for _ in range(num_records)],
    'EngagementRate': [round(random.uniform(0, 1), 2) for _ in range(num_records)]
}

# Create DataFrames
engagements_df = pd.DataFrame(engagements_data)

# Save to CSV
engagements_df.to_csv(f'C:/engagements_dataset.csv', index=False)

print("Datasets created successfully and saved as CSV files.")
print(engagements_df.head())


Datasets created successfully and saved as CSV files.
                           EngagementID                                UserID  \
0  8951a921-5178-4010-ba5e-46298e95acdd  74c5cfb7-21aa-47e9-8a7f-588af8f6b50e   
1  c9ee0c83-2223-4f63-a74e-291906ab10fc  01c2e0a5-08a5-48a2-8c26-b6b9ff6cd01e   
2  ddd4d3e4-5de8-427d-b606-67c9e2ca1905  1cada54c-ffe2-463e-9227-0cec1593d5ce   
3  6abe2852-270e-4ca8-802d-de37f68d5076  bcd849fb-c2ee-40f0-b414-17bd01abd708   
4  a260ed7e-758b-497a-9dc2-c00b29e95e0e  bf62bd1b-0781-4cfb-957a-bbd5571d1e1e   

   FriendsConnectionsCount  GroupsJoined  EventsAttended  EngagementRate  
0                     1928            24              91            0.10  
1                      861            37              39            0.99  
2                      604             4              58            0.40  
3                     1589            50              21            0.29  
4                     1222            39              87            0.13  


### conversions.py

In [4]:
import pandas as pd
import numpy as np
from faker import Faker
import random

# Initialize Faker library
fake = Faker()

# Number of records to generate
num_records = 1000

# Generate Conversions dataset
conversions_data = {
    'ConversionID': [fake.uuid4() for _ in range(num_records)],
    'UserID': [random.choice(users_data['UserID']) for _ in range(num_records)],
    'ConversionStatus': [random.choice(['Converted', 'Not Converted']) for _ in range(num_records)]
}

# Create DataFrames
conversions_df = pd.DataFrame(conversions_data)

# Save to CSV
conversions_df.to_csv(f'C:/conversions_dataset.csv', index=False)

print("Datasets created successfully and saved as CSV files.")
print(conversions_df.head())


Datasets created successfully and saved as CSV files.
                           ConversionID                                UserID  \
0  83f3987e-3940-4f02-8529-ca2bebc5bb0e  5344521e-2e71-4512-8116-767b1a968779   
1  029c1568-a658-4d37-9cfe-47e0f2210d56  bcdf3a1f-ec5d-4451-9ef9-b104a0c349e3   
2  74441cdc-f605-491e-b615-834d8a3b1bd4  c8b497e1-3db8-4394-87fc-ba40987d284d   
3  07c7e982-998f-47a0-91b9-126427e0701d  0707a505-080e-46fd-9ae3-eea04b4479e8   
4  89bde51f-903a-4e4a-80b5-d008c0e74e37  d764e801-3a5d-41df-abb4-a682b75527e4   

  ConversionStatus  
0        Converted  
1    Not Converted  
2    Not Converted  
3        Converted  
4        Converted  


## Create S3 bucket for all Raw and Verified Data.



First we will create a fuction to simplify the process of creating a bucket. Then we will run the fuction to create the seperate buckets that we are going to want.

Make another function to upload the chosen files to a specified bucket, then run the function to ingest the data.

### Create S3 Bucket function

In [5]:

import logging
import boto3

from botocore.exceptions import ClientError


def create_bucket(bucket_name, region=None):
    try:
        s3 = boto3.client('s3')
        s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': region
            }
        )
    except ClientError as e:
        logging.error(e)
        print(f"Error: {e}")
        return False
    return f"Bucket {bucket_name} created in {region} region!"

### Run Bucket function for raw and verified data

In [9]:

print(create_bucket("kh-nexus-raw", "us-west-2"))

print(create_bucket("kh-nexus-verified", "us-west-2"))

Bucket kh-nexus-raw created in us-west-2 region!
Bucket kh-nexus-verified created in us-west-2 region!


### Create function to upload files to S3 Bucket

In [10]:

def upload_file(file_name, bucket, object_name=None):
    if object_name is None:
        object_name = file_name
    s3 = boto3.client('s3')
    try:
        response = s3.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        print(f"Error: {e}")
        return False
    return f"File uploaded successfully to {bucket} bucket!"

### Run upload function for csv files

In [12]:
#call the function to upload CSV file
upload_file("users_dataset.csv", "kh-nexus-raw", "users_dataset.csv")
upload_file("posts_dataset.csv", "kh-nexus-raw", "posts_dataset.csv")
upload_file("engagements_dataset.csv", "kh-nexus-raw", "engagements_dataset.csv")
upload_file("conversions_dataset.csv", "kh-nexus-raw", "conversions_dataset.csv")


'File uploaded successfully to kh-nexus-raw bucket!'

## Create Glue Database and Lake Formation


Connect the lake formation to the S3 bucket and Glue database, to prepare for verifying the data.

### Create function to make Glue Database

In [17]:

def create_glue_database(database_name):
    glue = boto3.client('glue')
    try:
        glue.create_database(
            DatabaseInput={
                'Name': database_name
            }
        )
    except ClientError as e:
        logging.error(e)
        print(f"Error: {e}")
        return False
    return f"Database {database_name} created successfully!"

### Run Glue Database function

In [18]:

print(create_glue_database("NexusDatabase-kh"))

Database NexusDatabase-kh created successfully!


### Lake Formation Integration for S3 bucket and Glue database

In [None]:

def register_resource_with_lake_formation(bucket_name, iam_role_arn):
    try:
        lakeformation = boto3.client('lakeformation')
        resource_arn = f'arn:aws:s3:::{bucket_name}'

        # Register the S3 path with Lake Formation
        lakeformation.register_resource(
            ResourceArn=resource_arn,
            RoleArn=iam_role_arn
        )
        return f"Resource {resource_arn} registered successfully with Lake Formation!"
    except ClientError as e:
        logging.error(e)
        print(f"Error: {e}")
        return False

def grant_permissions_lake_formation(database_name, principal_arn):
    try:
        lakeformation = boto3.client('lakeformation')

        # Grant Lake Formation permissions
        lakeformation.grant_permissions(
            Principal={'DataLakePrincipalIdentifier': principal_arn},
            Resource={
                'Database': {
                    'Name': database_name
                }
            },
            Permissions=['ALL'],['DATA_LOCATION_ACCESS']
        )
        return f"Permissions granted to {principal_arn} for database {database_name}!"
    except ClientError as e:
        logging.error(e)
        print(f"Error: {e}")
        return False

iam_role_arn = "arn:aws:iam:::role/aws-service-role/lakeformation.amazonaws.com/AWSServiceRoleForLakeFormationDataAccess"
principal_arn = "arn:aws:iam:::role/service-role/AWSGlueServiceRole-s"

# Register the S3 resources with Lake Formation
print(register_resource_with_lake_formation("kh-nexus-raw", iam_role_arn))
print(register_resource_with_lake_formation("kh-nexus-verified", iam_role_arn))

# Grant permissions in Lake Formation
print(grant_permissions_lake_formation("NexusDatabase-kh", principal_arn))


ERROR:root:An error occurred (AlreadyExistsException) when calling the RegisterResource operation: Resource is already registered


Error: An error occurred (AlreadyExistsException) when calling the RegisterResource operation: Resource is already registered
False
Resource arn:aws:s3:::kh-nexus-verified registered successfully with Lake Formation!
Permissions granted to arn:aws:iam:::role/service-role/AWSGlueServiceRole-s for database NexusDatabase-kh!


## Create a Glue Crawler 

Create a Glue Crawler to Extract the data from the S3 bucket of raw data into the Glue Catalog.

### Create a function to run the glue crawler

In [37]:
import boto3
from botocore.exceptions import ClientError

def create_glue_crawler(crawler_name, role, database_name, s3_paths):
    client = boto3.client('glue')
    
    try:
        response = client.create_crawler(
            Name=crawler_name,
            Role=role,
            DatabaseName=database_name,
            Targets={
                'S3Targets': [{'Path': path} for path in s3_paths]
            },
            TablePrefix='',
            SchemaChangePolicy={
                'UpdateBehavior': 'UPDATE_IN_DATABASE',
                'DeleteBehavior': 'LOG'
            }
        )
        return f"Crawler {crawler_name} created successfully!"
    except ClientError as e:
        print(f"Error: {e}")
        return False

def start_glue_crawler(crawler_name):
    client = boto3.client('glue')
    
    try:
        response = client.start_crawler(Name=crawler_name)
        return f"Crawler {crawler_name} started successfully!"
    except ClientError as e:
        print(f"Error: {e}")
        return False


### Run the Glue Grawler function for extracting the data

In [38]:
# Configuration
crawler_name = 'nexus_crawler'
role = 'arn:aws:iam:::role/service-role/AWSGlueServiceRole-s'
database_name = 'NexusDatabase-kh'
s3_paths = [
    's3://kh-nexus-raw/users_dataset/',
    's3://kh-nexus-raw/engagements_dataset/',
    's3://kh-nexus-raw/posts_dataset/',
    's3://kh-nexus-raw/conversions_dataset/'
]

# Create and start the crawler
print(create_glue_crawler(crawler_name, role, database_name, s3_paths))
print(start_glue_crawler(crawler_name))


Error: An error occurred (AlreadyExistsException) when calling the CreateCrawler operation::nexus_crawler already exists
False
Crawler nexus_crawler started successfully!


## Transform and verify data

Glue ETL jobs to correct mapping mistakes and covnvert csv to parquet format. Following jobs were ran within the AWS Glue concole.

### Users verified

### Posts verified

### Engagement verified 

### Conversions verified 

## Datasets Verified

## Redshift Warehouse DDL

#### The last thing we are going to do is connect and create a database in Redshift, to prepare and allow for quering. The following processes were ran within the query editor.

##### Create the Schema within the database

##### Create tables and load the data into the database

-- Create Users table
CREATE EXTERNAL TABLE nexusCapstone.users (
    UserID VARCHAR(36),
    UserType VARCHAR(20),
    Age VARCHAR(5),
    Gender VARCHAR(20),
    Location VARCHAR(100),
    AccountCreationDate VARCHAR(10),
    LastLoginDate VARCHAR(10),
    TotalLogins VARCHAR(20),
    PreferredDevice VARCHAR(20),
    ActiveStatus VARCHAR(20)
)
STORED AS PARQUET
LOCATION 's3://kh-nexus-verified/Users/';


-- Create Posts table
CREATE EXTERNAL TABLE nexusCapstone.posts (
    PostID VARCHAR(36),
    UserID VARCHAR(36),
    PostType VARCHAR(50),
    TotalPosts VARCHAR(10),
    LikesReceived VARCHAR(10),
    CommentsReceived VARCHAR(50),
    SharesReceived VARCHAR(10)
)
STORED AS PARQUET
LOCATION 's3://kh-nexus-verified/Posts/';

-- Create Engagements table
CREATE EXTERNAL TABLE nexuscapstone.engagements (
    EngagementID VARCHAR(36),
    UserID VARCHAR(36),
    FriendsConnectionsCount VARCHAR(10),
    GroupsJoined VARCHAR(10),
    EventsAttended VARCHAR(10),
    EngagementRate VARCHAR(10)
)
STORED AS PARQUET
LOCATION 's3://kh-nexus-verified/Engagements/';

-- Create Conversions table
CREATE EXTERNAL TABLE nexusCapstone.conversions (
    ConversionID VARCHAR(36),
    UserID VARCHAR(36),
    ConversionStatus VARCHAR(20)
)
STORED AS PARQUET
LOCATION 's3://kh-nexus-verified/Conversions/';

-- Verify data
SELECT * FROM nexusCapstone.users;
SELECT * FROM nexusCapstone.posts;
SELECT * FROM nexusCapstone.engagements;
SELECT * FROM nexusCapstone.conversions;

##### Verify that data has transfered into the database correctly

##### Data is ingested, transformed, and ready for future queries.