diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml
index 38577b25..5cf9ef6a 100644
--- a/.github/workflows/gradle.yml
+++ b/.github/workflows/gradle.yml
@@ -23,5 +23,5 @@ jobs:
     - name: Upload build artifact
       uses: actions/upload-artifact@v1
       with:
-        name: kafka-dsf.jar
-        path: build/libs/kafka-dsf-all.jar
+        name: kafka-gitops.jar
+        path: build/libs/kafka-gitops-all.jar
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 63db95b3..41efce9c 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -26,7 +26,7 @@ jobs:
       - name: Build & Push Docker Image
         uses: mr-smithers-excellent/docker-build-push@v2
         with:
-          image: devshawn/kafka-dsf
+          image: devshawn/kafka-gitops
           tag: ${{ steps.get_version.outputs.VERSION }}
           registry: docker.io
           username: ${{ secrets.DOCKER_USERNAME }}
@@ -50,6 +50,6 @@ jobs:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
         with:
           upload_url: ${{ steps.create_release.outputs.upload_url }} 
-          asset_path: ./build/distributions/kafka-dsf.zip
-          asset_name: kafka-dsf.zip
+          asset_path: ./build/distributions/kafka-gitops.zip
+          asset_name: kafka-gitops.zip
           asset_content_type: application/zip
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index ece5b242..5210573a 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -1,6 +1,6 @@
 # Contributing
 
-Contributions are very welcome! Any existing issues labeled ["help wanted"](https://github.com/devshawn/kafka-dsf/labels/help%20wanted) or ["good first issue"](https://github.com/devshawn/kafka-dsf/labels/good%20first%20issue) are free to be pursued.
+Contributions are very welcome! Any existing issues labeled ["help wanted"](https://github.com/devshawn/kafka-gitops/labels/help%20wanted) or ["good first issue"](https://github.com/devshawn/kafka-gitops/labels/good%20first%20issue) are free to be pursued.
 
 ## Feature Requests & Bug Reports
 For feature requests and bug reports, [submit an issue][issues].
@@ -17,5 +17,5 @@ The preferred way to contribute is to fork the [main repository][repository] on
 4. Once changes are completed, open a pull request for review against the master branch.
 
 
-[repository]: https://github.com/devshawn/kafka-dsf
-[issues]: https://github.com/devshawn/kafka-dsf/issues
+[repository]: https://github.com/devshawn/kafka-gitops
+[issues]: https://github.com/devshawn/kafka-gitops/issues
diff --git a/Dockerfile b/Dockerfile
index 756179db..1fced9eb 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -2,6 +2,6 @@ FROM openjdk:8-slim
 
 RUN apt-get update && apt-get install -y python3 python3-pip curl
 
-COPY ./build/libs/kafka-dsf-all.jar /usr/local/bin/kafka-dsf-all.jar
-COPY ./kafka-dsf /usr/local/bin/kafka-dsf
+COPY ./build/libs/kafka-gitops-all.jar /usr/local/bin/kafka-gitops-all.jar
+COPY ./kafka-gitops /usr/local/bin/kafka-gitops
 
diff --git a/README.md b/README.md
index 5b6b8235..708ee704 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# kafka-dsf
+# kafka-gitops
 
 Manage Apache Kafka topics and ACLs through a desired state file.
 
@@ -8,22 +8,42 @@ Manage Apache Kafka topics and ACLs through a desired state file.
 
 ## Overview
 
-This project allows you to manage Kafka topics and ACLs through use of a desired state file, much like Terraform and similar infrastructure-as-code projects. 
+Kafka GitOps is an Apache Kafka resources-as-code tool which allows you to automate the management of your Apache Kafka topics and ACLs from version controlled code. It allows you to define topics and services through the use of a desired state file, much like Terraform and other infrastructure-as-code tools.
 
-Topics and ACLs get defined in a YAML file. When run, `kafka-dsf` compares your desired state to the actual state of the cluster and generates a plan to execute against the cluster. This will make your topics and ACLs match your desired state.
+Topics and services get defined in a YAML file. When run, `kafka-gitops` compares your desired state to the actual state of the cluster and generates a plan to execute against the cluster. This will make your topics and ACLs match your desired state.
+
+This tool also generates the needed ACLs for each type of application. There is no need to manually create a bunch of ACLs for Kafka Connect, Kafka Streams, etc. By defining your services, `kafka-gitops` will build the necessary ACLs.
+
+This tool supports self-hosted Kafka, managed Kafka, and Confluent Cloud clusters.
+
+## Features
+
+- 🚀  **Built For CI/CD**: Made for CI/CD pipelines to automate the management of topics & ACLs.
+- 🔥  **Configuration as code**: Describe your desired state and manage it from a version-controlled declarative file.
+- 👍  **Easy to use**: Deep knowledge of Kafka administration or ACL management is **NOT** required. 
+- ⚡️️  **Plan & Apply**: Generate and view a plan with or without executing it against your cluster.
+- 💻  **Portable**: Works across self-hosted clusters, managed clusters, and even Confluent Cloud clusters.
+- 🦄  **Idempotency**: Executing the same desired state file on an up-to-date cluster will yield the same result.
+- ☀️  **Continue from failures**: If a specific step fails during an apply, you can fix your desired state and re-run the command. You can execute `kafka-gitops` again without needing to rollback any partial successes.
+
+## Getting Started
+
+Documentation on how to install and use this tool can be found on our [documentation site][documentation].
 
 ## Usage
 
-Run `kafka-dsf` to view the help output.
+Run `kafka-gitops` to view the help output.
 
 ```bash
-Usage: kafka-dsf [-hvV] [-f=<file>] [COMMAND]
+Usage: kafka-gitops [-hvV] [--no-delete] [-f=<file>] [COMMAND]
 Manage Kafka resources with a desired state file.
   -f, --file=<file>   Specify the desired state file.
   -h, --help          Display this help message.
+      --no-delete     Disable the ability to delete resources.
   -v, --verbose       Show more detail during execution.
   -V, --version       Print the current version of this tool.
 Commands:
+  account   Create Confluent Cloud service accounts.
   apply     Apply changes to Kafka resources.
   plan      Generate an execution plan of changes to Kafka resources.
   validate  Validates the desired state file.
@@ -51,7 +71,7 @@ The following configuration is generated:
 
 ## State File
 
-By default, `kafka-dsf` looks for `state.yaml` in the current directory. You can also use `kafka-dsf -f` to pass a file.
+By default, `kafka-gitops` looks for `state.yaml` in the current directory. You can also use `kafka-gitops -f` to pass a file.
 
 An example desired state file:
 
@@ -63,15 +83,13 @@ topics:
     configs:
       cleanup.policy: compact
 
-acls:
-  example-topic-read-acl:
-    name: example-topic
-    type: TOPIC
-    pattern: LITERAL
-    principal: User:super.admin
-    host: "*"
-    operation: WRITE
-    permission: ALLOW
+services:
+  example-service:
+    type: application
+    produces:
+      - example-topic
+    consumes:
+      - example-topic
 ```
 
 ## Contributing
@@ -84,5 +102,6 @@ Copyright (c) 2020 Shawn Seymour.
 
 Licensed under the [Apache 2.0 license][license].
 
+[documentation]: https://kafkagitops.devshawn.com
 [contributing]: ./CONTRIBUTING.md
 [license]: ./LICENSE
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index f534f449..30e242dd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,5 +1,6 @@
 plugins {
     id 'java'
+    id 'groovy'
     id 'application'
     id 'idea'
     id 'org.inferred.processors' version '1.2.10'
@@ -11,7 +12,7 @@ apply plugin: 'net.ltgt.apt-idea'
 
 group 'com.devshawn'
 
-mainClassName = 'com.devshawn.kafka.dsf.MainCommand'
+mainClassName = 'com.devshawn.kafka.gitops.MainCommand'
 
 sourceCompatibility = 1.8
 
@@ -33,20 +34,24 @@ dependencies {
     processor 'org.inferred:freebuilder:2.5.0'
 
     testCompile group: 'junit', name: 'junit', version: '4.12'
+    testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.4'
+    testCompile group: 'org.spockframework', name: 'spock-core', version: '1.0-groovy-2.4'
+    testCompile group: 'cglib', name: 'cglib-nodep', version: '2.2'
+
 }
 
 jar {
     manifest {
         attributes(
                 'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
-                'Main-Class': 'com.devshawn.kafka.dsf.MainCommand'
+                'Main-Class': 'com.devshawn.kafka.gitops.MainCommand'
         )
     }
 }
 
 task buildRelease(type: Zip, group: "build") {
     from("$buildDir/libs")
-    from("./kafka-dsf")
+    from("./kafka-gitops")
 }
 
 buildRelease.dependsOn shadowJar
\ No newline at end of file
diff --git a/docs/.nojekyll b/docs/.nojekyll
new file mode 100644
index 00000000..e69de29b
diff --git a/docs/_coverpage.md b/docs/_coverpage.md
new file mode 100644
index 00000000..13825c45
--- /dev/null
+++ b/docs/_coverpage.md
@@ -0,0 +1,9 @@
+
+# kafka-gitops
+
+> GitOps for Apache Kafka
+
+- Automated Topic & ACL Management
+- Manage topics, services, and ACLs with desired state files
+
+[Documentation](/documentation.md)
\ No newline at end of file
diff --git a/docs/_sidebar.md b/docs/_sidebar.md
new file mode 100644
index 00000000..9ac5356d
--- /dev/null
+++ b/docs/_sidebar.md
@@ -0,0 +1,10 @@
+- **Getting Started**
+- [Introduction](/documentation.md)
+- [Installation](/installation.md)
+- [Quick Start](/quick-start.md)
+- [Services](/services.md)
+- [Confluent Cloud](/confluent-cloud.md)
+- [Specification](/specification.md)
+- **Links**
+- [Contributing](https://github.com/devshawn/kafka-gitops/blob/master/CONTRIBUTING.md)
+- [GitHub](https://github.com/devshawn/kafka-gitops)
\ No newline at end of file
diff --git a/docs/confluent-cloud.md b/docs/confluent-cloud.md
new file mode 100644
index 00000000..7673c890
--- /dev/null
+++ b/docs/confluent-cloud.md
@@ -0,0 +1,115 @@
+# Confluent Cloud
+
+This tool was designed to work with Confluent Cloud. It can manage service accounts, topics, and ACLs for Confluent Cloud clusters.
+
+## Getting Started
+
+Ensure you have installed `kafka-gitops` or are using the `kafka-gitops` docker image as described in the [installation][installation] instructions.
+
+You must have the `ccloud` command line tools installed if you wish to auto-populate the `principal` fields on services.
+
+## Desired State File
+
+Create a basic desired state file, `state.yaml`, such as:
+
+```yaml
+settings:
+  ccloud:
+    enabled: true
+
+topics:
+  test-topic:
+    partitions: 6
+    replication: 3
+
+services:
+  test-service:
+    type: application
+    produces:
+      - test-topic
+```
+
+To give an overview, throughout this guide, this will create:
+
+- A topic named `test-topic`
+- A service account named `test-service`
+- An `WRITE` ACL for topic `test-topic` tied to the service account `test-service`
+
+## Configuration
+
+To use `kafka-gitops` with Confluent Cloud, you'll need to set a few environment variables.
+
+* `KAFKA_BOOTSTRAP_SERVERS`: Your Confluent Cloud cluster URL
+* `KAFKA_SASL_JAAS_USERNAME`: Your Confluent Cloud API key
+* `KAFKA_SASL_JAAS_USERNAME`: Your Confluent Cloud API secret
+* `KAFKA_SECURITY_PROTOCOL`: `SASL_SSL`
+* `KAFKA_SASL_MECHANISM`: `PLAIN`
+* `KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM`: `HTTPS`
+
+Additionally, you'll need to login to the `ccloud` tool. You can automate this by setting the following environment variables:
+
+* `XX_CCLOUD_EMAIL`: Your Confluent Cloud administrator email
+* `XX_CCLOUD_PASSWORD`: Your Confluent Cloud administrator password
+
+Then, you can run `ccloud login` and it will run without a prompt. This is great for CI builds.
+
+## Validate
+
+First, validate your state file is correct by running:
+
+```bash
+kafka-gitops -f state.yaml validate
+```
+
+An example success message would look like:
+
+```text
+[VALID] Successfully validated the desired state file.
+```
+
+## Accounts
+
+Before generating an execution plan, you will need to create the service accounts. This can be done by running:
+
+```bash
+kafka-gitops -f state.yaml accounts
+```
+
+This currently only creates service accounts; it will not delete any.
+
+## Plan
+
+We're now ready to generate a plan to execute against the cluster. By using the plan command, we are **NOT** changing the cluster.
+
+Generate a plan by running:
+
+```bash
+kafka-gitops -f state.yaml plan -o plan.json
+```
+
+This will generate an execution plan to run against the cluster and save the output to `plan.json`. 
+
+The command will also pretty-print what changes it wants to make to the cluster. 
+
+## Apply
+
+To execute a plan against the cluster, we use the apply command.
+
+!> **WARNING**: This will apply changes to the cluster. This can be potentially destructive if you do not have all topics and ACLs defined.
+
+By default, the apply command will generate a plan and then apply it. For most situations, **you should output a plan file from the plan command and pass it to the apply command**.
+
+Example:
+
+```bash
+kafka-gitops -f state.yaml apply -p plan.json
+```
+
+Congrats! You're now using `kafka-gitops` to manage your Confluent Cloud cluster. Once you've practiced locally, you should commit your state file to a repository and create CI/CD pipelines to create plans and execute them against the cluster. 
+
+Welcome to GitOps! 
+
+
+
+
+[installation]: /installation.md
\ No newline at end of file
diff --git a/docs/documentation.md b/docs/documentation.md
new file mode 100644
index 00000000..6a082adb
--- /dev/null
+++ b/docs/documentation.md
@@ -0,0 +1,50 @@
+<div align="center">
+    <h1>GitOps for Apache Kafka</h1>
+    <img src="https://i.imgur.com/eAIAv0w.png"/>
+    <span>Manage Apache Kafka topics, services, and ACLs through a desired state file.</span>
+</div>
+
+## Overview
+
+Kafka GitOps is an Apache Kafka resources-as-code tool which allows you to automate the management of your Apache Kafka topics and ACLs from version controlled code. It allows you to define topics and services through the use of a desired state file, much like Terraform and other infrastructure-as-code tools.
+
+Topics and services get defined in a YAML file. When run, `kafka-gitops` compares your desired state to the actual state of the cluster and generates a plan to execute against the cluster. This will make your topics and ACLs match your desired state.
+
+This tool also generates the needed ACLs for each type of application. There is no need to manually create a bunch of ACLs for Kafka Connect, Kafka Streams, etc. By defining your services, `kafka-gitops` will build the necessary ACLs.
+
+## Features
+
+- 🚀  **Built For CI/CD**: Made for CI/CD pipelines to automate the management of topics & ACLs.
+- 🔥  **Configuration as code**: Describe your desired state and manage it from a version-controlled declarative file.
+- 👍  **Easy to use**: Deep knowledge of Kafka administration or ACL management is **NOT** required. 
+- ⚡️️  **Plan & Apply**: Generate and view a plan with or without executing it against your cluster.
+- 💻  **Portable**: Works across self-hosted clusters, managed clusters, and even Confluent Cloud clusters.
+- 🦄  **Idempotency**: Executing the same desired state file on an up-to-date cluster will yield the same result.
+- ☀️  **Continue from failures**: If a specific step fails during an apply, you can fix your desired state and re-run the command. You can execute `kafka-gitops` again without needing to rollback any partial successes.
+
+## Getting Started
+
+Check out the **[Quick Start](/quick-start.md)** documentation to install `kafka-gitops` and get started.
+
+## Ideology
+
+The idea behind this project is to manage Kafka topics and ACLs through desired state files. These files define what your cluster should look like and `kafka-gitops` modifies your cluster's actual state to match the desired state file. Typically, this looks like:
+
+- State files are stored in version control, such as a git repository.
+- Developers add, change, and remove topics & services from files and open a PR.
+- Operations teams valdiate and merge the PR.
+- A CI/CD pipeline generates a plan using `kafka-gitops`. 
+- A human then validates the plan and applies the changes to the cluster.
+
+## Contributing
+
+Contributions are very welcome. See [CONTRIBUTING.md][contributing] for details.
+
+## License
+
+Copyright (c) 2020 Shawn Seymour.
+
+Licensed under the [Apache 2.0 license][license].
+
+[contributing]: https://github.com/devshawn/kafka-gitops/blob/master/CONTRIBUTING.md
+[license]: https://github.com/devshawn/kafka-gitops/blob/master/LICENSE
diff --git a/docs/index.html b/docs/index.html
new file mode 100644
index 00000000..b0b02337
--- /dev/null
+++ b/docs/index.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <title>Document</title>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"/>
+    <meta name="description" content="Description">
+    <meta name="viewport" content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
+    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/docsify-themeable@0/dist/css/theme-simple.css">
+</head>
+<body>
+<div id="app"></div>
+<script>
+    window.$docsify = {
+        name: "kafka-gitops",
+        repo: "devshawn/kafka-gitops",
+        homepage: "documentation.md",
+        formatUpdated: "{MM}/{DD} {HH}:{mm}",
+        auto2top: true,
+        maxLevel: 3,
+        subMaxLevel: 3,
+        coverpage: true,
+        loadSidebar: "_sidebar.md",
+        themeColor: "#1976d2",
+        // routerMode: "history"
+        search: {
+            depth: 3,
+            noData: "No results!",
+            placeholder: "Search..."
+        }
+    }
+</script>
+<script src="https://cdn.jsdelivr.net/npm/docsify@4"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify-themeable@0"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify-tabs@1"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify-copy-code@2"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify-tabs@1"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify-pagination@2/dist/docsify-pagination.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify@4/lib/plugins/external-script.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/docsify@4/lib/plugins/search.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/prismjs@1/components/prism-bash.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/prismjs@1/components/prism-java.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/prismjs@1/components/prism-json.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/prismjs@1/components/prism-yaml.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/prismjs@1/components/prism-docker.min.js"></script>
+<script src="https://cdn.jsdelivr.net/npm/prismjs@1/components/prism-properties.min.js"></script>
+</body>
+</html>
diff --git a/docs/installation.md b/docs/installation.md
new file mode 100644
index 00000000..3e0706b7
--- /dev/null
+++ b/docs/installation.md
@@ -0,0 +1,38 @@
+# Installation
+
+Installing `kafka-gitops` requires either **Java** or **Docker**.
+
+## Local
+
+Install `kafka-gitops` by downloading the zip file from our [releases][releases] page. Move the `kafka-gitops` file and the `kafka-gitops-all.jar` file to somewhere on your `$PATH`, such as `/usr/local/bin`. 
+
+
+Ensure the command is working by running:
+
+```bash
+kafka-gitops
+```
+
+You should see output similar to the following:
+
+```text
+Usage: kafka-gitops [-hvV] [--no-delete] [-f=<file>] [COMMAND]
+Manage Kafka resources with a desired state file.
+  -f, --file=<file>   Specify the desired state file.
+  -h, --help          Display this help message.
+      --no-delete     Disable the ability to delete resources.
+  -v, --verbose       Show more detail during execution.
+  -V, --version       Print the current version of this tool.
+Commands:
+  account   Create Confluent Cloud service accounts.
+  apply     Apply changes to Kafka resources.
+  plan      Generate an execution plan of changes to Kafka resources.
+  validate  Validates the desired state file.
+```
+
+## Docker
+
+We provide a public docker image: [devshawn/kafka-gitops][docker].
+
+[releases]: https://github.com/devshawn/kafka-gitops/releases
+[docker]: https://hub.docker.com/r/devshawn/kafka-gitops
\ No newline at end of file
diff --git a/docs/quick-start.md b/docs/quick-start.md
new file mode 100644
index 00000000..ec3319d6
--- /dev/null
+++ b/docs/quick-start.md
@@ -0,0 +1,145 @@
+# Quick Start
+
+Getting started with `kafka-gitops` is simple. For this tutorial, we will assume:
+
+- You have installed the `kafka-gitops` command as [described here](/installation.md).
+- You have a kafka cluster running on `localhost:9092`. 
+
+!> **NOTE**: If you desire to use this with Confluent Cloud, read our [Confluent Cloud page][ccloud].
+
+## Desired State File
+
+First, create a desired state file named `state.yaml`. In this file, we'll define a topic. To showcase how to set topic configs, we'll make it a compacted topic.
+
+```yaml
+topics:
+  my-example-topic:
+    partitions: 6
+    replication: 1
+    configs:
+      cleanup.policy: compact
+```
+
+## Configuration
+
+Currently, configuring `kafka-gitops` is done via environment variables. To configure properties, prefix them with `KAFKA_`. For example:
+
+* `KAFKA_BOOTSTRAP_SERVERS`: Injects as `bootstrap.servers`
+* `KAFKA_CLIENT_ID`: Injects as `client.id`
+
+For our quick start example, open a terminal where your `state.yaml` file is located and set the bootstrap servers:
+
+```bash
+export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
+```
+
+## Validate
+We can validate the desired state file conforms to the [specification][specification]. To do this, run:
+
+```bash
+kafka-gitops validate
+```
+
+By default, it will look for `state.yaml` in the current directory. To specify a different file name, run:
+
+```bash
+kafka-gitops -f my-state-file.yaml validate
+```
+
+An example success message would look like:
+
+```text
+[VALID] Successfully validated the desired state file.
+```
+
+## Plan
+
+We're now ready to generate a plan to execute against the cluster. By using the plan command, we are **NOT** changing the cluster.
+
+Generate a plan by running:
+
+```bash
+kafka-gitops plan
+```
+
+This will generate an execution plan that looks like:
+
+
+```text
+Generating execution plan...
+
+An execution plan has been generated and is shown below.
+Resource actions are indicated with the following symbols:
+  + create
+
+The following actions will be performed:
+
+Topics: 1 to create, 0 to update, 0 to delete.
+
++ [TOPIC] my-example-topic
+	+ cleanup.policy: compact
+
+
+ACLs: 0 to create, 0 to update, 0 to delete.
+
+Plan: 1 to create, 0 to update, 0 to delete.
+```
+
+In most cases, you will want to output the plan to a file which can then be passed to the apply command. Plan files are `JSON` files. This can be done by running:
+
+
+```bash
+kafka-gitops plan -o plan.json
+```
+
+## Apply
+
+To execute a plan against the cluster, we use the apply command.
+
+!> **WARNING**: This will apply changes to the cluster. This can be potentially destructive if you do not have all topics and ACLs defined.
+
+Run:
+
+```bash
+kafka-gitops apply
+```
+
+By default, the apply command will generate a plan and then apply it. For most situations, **you should output a plan file from the plan command and pass it to the apply command**.
+
+Example:
+
+```bash
+kafka-gitops apply -p plan.json
+```
+
+An output of a successful apply would look like:
+
+```text
+Executing apply...
+
+Applying: [CREATE]
+
++ [TOPIC] my-example-topic
+	+ cleanup.policy: compact
+
+Successfully applied.
+
+[SUCCESS] Apply complete! Resources: 1 created, 0 updated, 0 deleted.
+```
+
+If there is a partial failure, successes will not be rolled back. A failure error would look something like:
+
+```text
+[ERROR] Error thrown when attempting to create a Kafka topic:
+org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
+
+[ERROR] An error has occurred during the apply process.
+[ERROR] The apply process has stopped in place. There is no rollback.
+[ERROR] Fix the error, re-create a plan, and apply the new plan to continue.
+```
+
+Congrats! You've successfully started using GitOps strategies to manage your cluster. If you have security on your cluster, read the [services][services] page to start defining services. 
+
+[ccloud]: /confluent-cloud.md
+[specification]: /specification.md
+[services]: /services.md
\ No newline at end of file
diff --git a/docs/services.md b/docs/services.md
new file mode 100644
index 00000000..51b56859
--- /dev/null
+++ b/docs/services.md
@@ -0,0 +1,115 @@
+# Services
+
+If you have security on your cluster, you can use `kafka-gitops` to manage ACLs. Kafka GitOps automatically builds the necessary ACLs for most use cases. 
+
+## Application Example
+
+A basic example shown below defines one topic, `test-topic`, and one service, `my-application`. 
+
+The service `my-application` both consumes from and produces to `test-topic`. This will generate the necessary ACLs for `my-application` to do this.
+
+?> **Note**: If using Confluent Cloud, omit the principal field.
+
+```yaml
+topics:
+  test-topic:
+    partitions: 6
+    replication: 1
+
+services:
+  my-application:
+    principal: User:myapp
+    consumes:
+      - test-topic
+    produces:
+      - test-topic
+```
+
+Behind the scenes, this will generate three ACLs:
+
+- `READ` for topic `test-topic`
+- `WRITE` for topic `test-topic`
+- `READ` for consumer group `my-application`
+
+!> Currently, consumer `group.id` must match the service name.
+
+## Kafka Streams Example
+
+A basic example shown below defines one topic, `test-topic`, and one kafka streams application, `my-stream`. 
+
+The service `my-stream` both consumes from and produces to `test-topic`. This will generate the necessary ACLs for `my-stream` to do this.
+
+```yaml
+topics:
+  test-topic:
+    partitions: 6
+    replication: 1
+
+services:
+  my-stream:
+    principal: User:mystream
+    consumes:
+      - test-topic
+    produces:
+      - test-topic
+```
+
+Behind the scenes, this generates ACLs such as:
+
+- `READ` for topic `test-topic`
+- `WRITE` for topic `test-topic`
+- `READ` for consumer group `my-stream`
+- Various ACLs for Kafka streams internal topic management
+
+## Kafka Connect Example
+
+A basic example which defines one Kafka Connect cluster that has one connector running. 
+
+**Connect Topics**
+Currently, kafka connect's internal topics must be defined in the following format:
+
+- `connect-configs-{service-name}`
+- `connect-offsets-{service-name}`
+- `connect-status-{service-name}`
+
+In this example, the connect cluster `group.id` should be `my-connect-cluster`. 
+
+```yaml
+topics:
+  connect-configs-my-connect-cluster:
+    partitions: 1
+    replication: 1
+    configs:
+      cleanup.policy: compact
+
+  connect-offsets-my-connect-cluster:
+    partitions: 25
+    replication: 1
+    configs:
+      cleanup.policy: compact
+
+  connect-status-my-connect-cluster:
+    partitions: 5
+    replication: 1
+    configs:
+      cleanup.policy: compact
+      
+  rabbitmq-data:
+    partitions: 6
+    replication: 1
+
+services:
+  my-connect-cluster:
+    principal: User:mystream
+    connectors:
+      rabbitmq-sink:
+        consumes:
+          - rabbitmq-data
+```
+
+Behind the scenes, this generates ACLs such as:
+
+- `READ` for topic `rabbitmq-data`
+- `READ` for the consumer group `connect-rabbitmq-sink`
+- `READ` and `WRITE` for the internal kafka connect topics
+- `READ` for the consumer group `my-connect-cluster`
diff --git a/docs/specification.md b/docs/specification.md
new file mode 100644
index 00000000..8c454fc1
--- /dev/null
+++ b/docs/specification.md
@@ -0,0 +1,144 @@
+# Desired State Specification
+
+This document describes the specification for how to write your Kafka cluster's desired state file. This currently must be a `YAML` file. 
+
+?> Current version: `1`
+
+The desired state file consists of:
+
+- **Settings** [Optional]: Specific settings for configuring `kafka-gitops`.
+- **Topics** [Optional]: Topics and topic configuration definitions.
+- **Services** [Optional]: Service definitions for generating ACLs.
+- **Custom Service ACLs** [Optional]: Definitions for custom, non-generated ACLs.
+
+## Settings
+
+**Synopsis**: These are specific settings for configuring `kafka-gitops`. 
+
+**Options**:
+
+- **ccloud** [Optional]: An object which contains an `enabled` field. Set this to true if using a Confluent Cloud cluster. 
+- **topics** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
+
+**Example**:
+```yaml
+settings:
+  ccloud:
+    enabled: true
+  topics:
+    blacklist:
+      prefixed:
+        - _confluent
+```
+
+## Topics
+
+**Synopsis**: Define the topics you would like on your cluster and their configuration.
+
+?> Each topic is defined as a key-value pair, with the key being the topic name and the value being an object of settings.
+
+**Example**:
+
+```yaml
+topics:
+  my-topic-name:
+    partitions: 6
+    replication: 3
+    configs:
+      cleanup.policy: compact
+      segment.bytes: 1000000
+```
+
+## Services
+
+**Synopsis**: Define the services that will utilize your Kafka cluster. These service definitions allow `kafka-gitops` to generate ACLs for you. Yay!
+
+?> Each service has a `type`. This defines its structure.
+
+There are currently three service types:
+
+- `application`
+- `kafka-connect`
+- `kafka-streams`
+
+?> **NOTE**: If using Confluent Cloud, omit the `principal` fields.
+
+**Example application**:
+
+!> **NOTE**: Currently, the service name must match the consumer `group.id`.
+
+```yaml
+services:
+  my-application-name:
+    type: application
+    principal: User:my-application-principal
+    produces:
+      - topic-name-one
+    consumes:
+      - topic-name-two
+      - topic-name-three
+```
+
+**Example kafka connect cluster**:
+
+!> **NOTE**: Currently, the service name must match the connect cluster `group.id`.
+
+```yaml
+services:
+  my-kafka-connect-cluster-name:
+    type: kafka-connect
+    principal: User:my-connect-principal
+    connectors:
+      my-source-connector-name:
+        produces:
+          - topic-name-one
+      my-sink-connector-name:
+        consumes:
+          - topic-name-two
+```
+
+**Example kafka streams application**:
+
+!> **NOTE**: Currently, the service name must match the streams `application.id`.
+
+```yaml
+services:
+  my-kafka-streams-name:
+    type: kafka-streams
+    principal: User:my-streams-principal
+    produces:
+      - topic-name-one
+    consumes:
+      - topic-name-two
+```
+
+Under the cover, `kafka-gitops` generates ACLs based on these definitions.
+
+## Custom Service ACLs
+
+**Synopsis**: Define custom ACLs for a specific service. 
+
+For example, if a specific application needs to produce to all topics prefixed with `kafka.` and `service.`, you may not want to define them all in your desired state file. 
+
+If you have a service `my-test-service` defined, you can define custom ACLs as so:
+
+```yaml
+customServiceAcls:
+  my-test-service:
+    read-all-kafka:
+      name: kafka.
+      type: TOPIC
+      pattern: PREFIXED
+      host: "*"
+      principal: 
+      operation: READ
+      permission: ALLOW
+    read-all-service:
+      name: service.
+      type: TOPIC
+      pattern: PREFIXED
+      host: "*"
+      principal: 
+      operation: READ
+      permission: ALLOW
+```
diff --git a/kafka-dsf b/kafka-gitops
similarity index 59%
rename from kafka-dsf
rename to kafka-gitops
index de29a7f7..9d6e437c 100755
--- a/kafka-dsf
+++ b/kafka-gitops
@@ -4,7 +4,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 
 if [[ $# -eq 0 ]]
   then
-    java -jar "$DIR/kafka-dsf-all.jar"
+    java -jar "$DIR/kafka-gitops-all.jar"
   else
-    java -jar "$DIR/kafka-dsf-all.jar" "$@"
+    java -jar "$DIR/kafka-gitops-all.jar" "$@"
 fi
diff --git a/settings.gradle b/settings.gradle
index 73f33a71..ea56165a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1,2 @@
-rootProject.name = 'kafka-dsf'
+rootProject.name = 'kafka-gitops'
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/StateManager.java b/src/main/java/com/devshawn/kafka/dsf/StateManager.java
deleted file mode 100644
index b8a4dabe..00000000
--- a/src/main/java/com/devshawn/kafka/dsf/StateManager.java
+++ /dev/null
@@ -1,330 +0,0 @@
-package com.devshawn.kafka.dsf;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import com.devshawn.kafka.dsf.config.KafkaDsfConfig;
-import com.devshawn.kafka.dsf.config.KafkaDsfConfigLoader;
-import com.devshawn.kafka.dsf.domain.plan.*;
-import com.devshawn.kafka.dsf.domain.state.AclDetails;
-import com.devshawn.kafka.dsf.domain.state.DesiredState;
-import com.devshawn.kafka.dsf.domain.state.TopicDetails;
-import com.devshawn.kafka.dsf.enums.PlanAction;
-import com.devshawn.kafka.dsf.exception.ReadPlanInputException;
-import com.devshawn.kafka.dsf.exception.WritePlanOutputException;
-import com.devshawn.kafka.dsf.service.KafkaService;
-import com.devshawn.kafka.dsf.service.ParserService;
-import com.devshawn.kafka.dsf.util.LogUtil;
-import com.devshawn.kafka.dsf.util.PlanUtil;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import org.apache.kafka.clients.admin.AlterConfigOp;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.admin.TopicListing;
-import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.config.ConfigResource;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class StateManager {
-
-    private static org.slf4j.Logger log = LoggerFactory.getLogger(StateManager.class);
-
-    private final ParserService parserService;
-    private final KafkaDsfConfig config;
-    private final KafkaService kafkaService;
-    private final ObjectMapper objectMapper;
-
-    private final boolean deleteDisabled;
-    private final File planFile;
-
-    public StateManager(boolean verbose, File file, boolean deleteDisabled, File planFile) {
-        initializeLogger(verbose);
-        this.config = KafkaDsfConfigLoader.load();
-        this.kafkaService = new KafkaService(config);
-        this.parserService = new ParserService(file);
-        this.objectMapper = initializeObjectMapper();
-        this.deleteDisabled = deleteDisabled;
-        this.planFile = planFile;
-    }
-
-    public void validate() {
-        parserService.parseStateFile();
-    }
-
-    public void validatePlanHasChanges(DesiredPlan desiredPlan) {
-        PlanOverview planOverview = PlanUtil.getOverview(desiredPlan);
-        if (planOverview.getAdd() == 0 && planOverview.getUpdate() == 0 && planOverview.getRemove() == 0) {
-            LogUtil.printNoChangesMessage();
-            System.exit(0);
-        }
-    }
-
-    public DesiredPlan plan() {
-        return generatePlan();
-    }
-
-    public DesiredPlan apply() {
-        DesiredPlan desiredPlan = readPlanFromFile();
-        if (desiredPlan == null) {
-            desiredPlan = generatePlan();
-        }
-
-        validatePlanHasChanges(desiredPlan);
-
-        applyTopics(desiredPlan);
-        applyAcls(desiredPlan);
-
-        return desiredPlan;
-    }
-
-    private DesiredPlan generatePlan() {
-        DesiredState desiredState = parserService.parseStateFile();
-        DesiredPlan.Builder desiredPlan = new DesiredPlan.Builder();
-
-        planTopics(desiredState, desiredPlan);
-        planAcls(desiredState, desiredPlan);
-
-        return desiredPlan.build();
-    }
-
-    private void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
-        List<TopicListing> topics = kafkaService.getTopics();
-        List<String> prefixesToIgnore = getPrefixedTopicsToIgnore(desiredState);
-
-        desiredState.getTopics().forEach((key, value) -> {
-            TopicPlan.Builder topicPlan = new TopicPlan.Builder()
-                    .setName(key)
-                    .setTopicDetails(value);
-
-            TopicDescription topicDescription = kafkaService.describeTopic(key);
-
-            if (topicDescription == null) {
-                log.info("[PLAN] Topic {} does not exist; it will be created.", key);
-                topicPlan.setAction(PlanAction.ADD);
-            } else {
-                log.info("[PLAN] Topic {} exists, it will not be created.", key);
-                topicPlan.setAction(PlanAction.NO_CHANGE);
-                planTopicConfigurations(key, value, topicPlan);
-            }
-
-            desiredPlan.addTopicPlans(topicPlan.build());
-        });
-
-        topics.forEach(currentTopic -> {
-            boolean shouldIgnore = prefixesToIgnore.stream().anyMatch(it -> currentTopic.name().startsWith(it));
-            if (shouldIgnore) {
-                log.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
-                return;
-            }
-
-            if (!deleteDisabled && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
-                TopicPlan topicPlan = new TopicPlan.Builder()
-                        .setName(currentTopic.name())
-                        .setAction(PlanAction.REMOVE)
-                        .build();
-
-                desiredPlan.addTopicPlans(topicPlan);
-            }
-        });
-    }
-
-    private void planTopicConfigurations(String topicName, TopicDetails topicDetails, TopicPlan.Builder topicPlan) {
-        Map<String, TopicConfigPlan> configPlans = new HashMap<>();
-        List<ConfigEntry> currentConfigs = kafkaService.describeTopicConfigs(topicName);
-        List<ConfigEntry> customConfigs = currentConfigs.stream()
-                .filter(it -> it.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
-                .collect(Collectors.toList());
-
-        customConfigs.forEach(currentConfig -> {
-            String newConfig = topicDetails.getConfigs().getOrDefault(currentConfig.name(), null);
-
-            TopicConfigPlan.Builder topicConfigPlan = new TopicConfigPlan.Builder()
-                    .setKey(currentConfig.name());
-
-            if (currentConfig.value().equals(newConfig)) {
-                topicConfigPlan.setAction(PlanAction.NO_CHANGE);
-                topicConfigPlan.setValue(currentConfig.value());
-                configPlans.put(currentConfig.name(), topicConfigPlan.build());
-            } else if (newConfig == null) {
-                topicConfigPlan.setAction(PlanAction.REMOVE);
-                configPlans.put(currentConfig.name(), topicConfigPlan.build());
-                topicPlan.setAction(PlanAction.UPDATE);
-            }
-        });
-
-        topicDetails.getConfigs().forEach((key, value) -> {
-            ConfigEntry currentConfig = customConfigs.stream().filter(it -> it.name().equals(key)).findFirst().orElse(null);
-
-            TopicConfigPlan.Builder topicConfigPlan = new TopicConfigPlan.Builder()
-                    .setKey(key)
-                    .setValue(value);
-
-            if (currentConfig == null) {
-                topicConfigPlan.setAction(PlanAction.ADD);
-                configPlans.put(key, topicConfigPlan.build());
-                topicPlan.setAction(PlanAction.UPDATE);
-            } else if (!currentConfig.value().equals(value)) {
-                topicConfigPlan.setAction(PlanAction.UPDATE);
-                configPlans.put(key, topicConfigPlan.build());
-                topicPlan.setAction(PlanAction.UPDATE);
-            }
-        });
-
-        configPlans.forEach((key, plan) -> {
-            log.info("[PLAN] Topic {} | [{}] {}", topicName, plan.getAction(), plan.getKey());
-            topicPlan.addTopicConfigPlans(plan);
-        });
-    }
-
-    private void planAcls(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
-        List<AclBinding> currentAcls = kafkaService.getAcls();
-
-        currentAcls.forEach(acl -> {
-            Map.Entry<String, AclDetails> detailsEntry = desiredState.getAcls().entrySet().stream()
-                    .filter(entry -> entry.getValue().equalsAclBinding(acl))
-                    .findFirst().orElse(null);
-
-            AclPlan.Builder aclPlan = new AclPlan.Builder();
-
-            if (detailsEntry != null) {
-                aclPlan.setName(detailsEntry.getKey());
-                aclPlan.setAclDetails(detailsEntry.getValue());
-                aclPlan.setAction(PlanAction.NO_CHANGE);
-                desiredPlan.addAclPlans(aclPlan.build());
-            } else {
-                aclPlan.setName("Unnamed ACL");
-                aclPlan.setAclDetails(AclDetails.fromAclBinding(acl));
-                aclPlan.setAction(PlanAction.REMOVE);
-
-                if (!deleteDisabled) {
-                    desiredPlan.addAclPlans(aclPlan.build());
-                }
-            }
-        });
-
-        desiredState.getAcls().forEach((key, value) -> {
-            AclBinding aclBinding = currentAcls.stream().filter(value::equalsAclBinding).findFirst().orElse(null);
-            if (aclBinding == null) {
-                AclPlan aclPlan = new AclPlan.Builder()
-                        .setName(key)
-                        .setAclDetails(value)
-                        .setAction(PlanAction.ADD)
-                        .build();
-
-                desiredPlan.addAclPlans(aclPlan);
-            }
-        });
-    }
-
-    private void applyTopics(DesiredPlan desiredPlan) {
-        desiredPlan.getTopicPlans().forEach(topicPlan -> {
-            if (topicPlan.getAction() == PlanAction.ADD) {
-                LogUtil.printTopicPreApply(topicPlan);
-                kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetails().get());
-                LogUtil.printPostApply();
-            } else if (topicPlan.getAction() == PlanAction.UPDATE) {
-                LogUtil.printTopicPreApply(topicPlan);
-                topicPlan.getTopicConfigPlans().forEach(topicConfigPlan -> applyTopicConfiguration(topicPlan, topicConfigPlan));
-                LogUtil.printPostApply();
-            } else if (topicPlan.getAction() == PlanAction.REMOVE && !deleteDisabled) {
-                LogUtil.printTopicPreApply(topicPlan);
-                kafkaService.deleteTopic(topicPlan.getName());
-                LogUtil.printPostApply();
-            }
-        });
-    }
-
-    private void applyTopicConfiguration(TopicPlan topicPlan, TopicConfigPlan topicConfigPlan) {
-        Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
-        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicPlan.getName());
-        List<AlterConfigOp> configOps = new ArrayList<>();
-
-        ConfigEntry configEntry = new ConfigEntry(topicConfigPlan.getKey(), topicConfigPlan.getValue().orElse(null));
-
-        // TODO: Make OpType work with append/subtract
-        if (topicConfigPlan.getAction() == PlanAction.ADD) {
-            configOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
-        } else if (topicConfigPlan.getAction() == PlanAction.UPDATE) {
-            configOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
-        } else if (topicConfigPlan.getAction() == PlanAction.REMOVE) {
-            configOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.DELETE));
-        }
-
-        configs.put(configResource, configOps);
-
-        kafkaService.updateTopicConfig(configs);
-    }
-
-    private void applyAcls(DesiredPlan desiredPlan) {
-        desiredPlan.getAclPlans().forEach(aclPlan -> {
-            if (aclPlan.getAction() == PlanAction.ADD) {
-                LogUtil.printAclPreApply(aclPlan);
-                kafkaService.createAcl(aclPlan.getAclDetails().toAclBinding());
-                LogUtil.printPostApply();
-            } else if (aclPlan.getAction() == PlanAction.REMOVE && !deleteDisabled) {
-                LogUtil.printAclPreApply(aclPlan);
-                kafkaService.deleteAcl(aclPlan.getAclDetails().toAclBinding());
-                LogUtil.printPostApply();
-            }
-        });
-    }
-
-    public void writePlanToFile(DesiredPlan desiredPlan) {
-        if (planFile != null) {
-            try {
-                planFile.createNewFile();
-                FileWriter writer = new FileWriter(planFile);
-                writer.write(objectMapper.writeValueAsString(desiredPlan));
-                writer.close();
-            } catch (IOException ex) {
-                throw new WritePlanOutputException(ex.getMessage());
-            }
-        }
-    }
-
-    private DesiredPlan readPlanFromFile() {
-        if (planFile == null || !planFile.exists()) {
-            return null;
-        }
-
-        try {
-            return objectMapper.readValue(planFile, DesiredPlan.class);
-        } catch (IOException ex) {
-            throw new ReadPlanInputException(ex.getMessage());
-        }
-    }
-
-    private List<String> getPrefixedTopicsToIgnore(DesiredState desiredState) {
-        try {
-            return desiredState.getSettings().get().getTopics().get().getBlacklist().get().getPrefixed();
-        } catch (NoSuchElementException ex) {
-            return Collections.emptyList();
-        }
-    }
-
-    private ObjectMapper initializeObjectMapper() {
-        ObjectMapper objectMapper = new ObjectMapper();
-        objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
-        objectMapper.registerModule(new Jdk8Module());
-        return objectMapper;
-    }
-
-    private void initializeLogger(boolean verbose) {
-        Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
-        Logger kafka = (Logger) LoggerFactory.getLogger("org.apache.kafka");
-        if (verbose) {
-            root.setLevel(Level.INFO);
-            kafka.setLevel(Level.WARN);
-        } else {
-            root.setLevel(Level.WARN);
-            kafka.setLevel(Level.OFF);
-        }
-    }
-}
diff --git a/src/main/java/com/devshawn/kafka/dsf/cli/ApplyCommand.java b/src/main/java/com/devshawn/kafka/dsf/cli/ApplyCommand.java
deleted file mode 100644
index c7ee5489..00000000
--- a/src/main/java/com/devshawn/kafka/dsf/cli/ApplyCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package com.devshawn.kafka.dsf.cli;
-
-import com.devshawn.kafka.dsf.MainCommand;
-import com.devshawn.kafka.dsf.StateManager;
-import com.devshawn.kafka.dsf.domain.plan.DesiredPlan;
-import com.devshawn.kafka.dsf.exception.KafkaExecutionException;
-import com.devshawn.kafka.dsf.exception.MissingConfigurationException;
-import com.devshawn.kafka.dsf.exception.ValidationException;
-import com.devshawn.kafka.dsf.util.LogUtil;
-import com.devshawn.kafka.dsf.util.PlanUtil;
-import picocli.CommandLine;
-
-import java.io.File;
-import java.util.concurrent.Callable;
-
-@CommandLine.Command(name = "apply", description = "Apply changes to Kafka resources.")
-public class ApplyCommand implements Callable<Integer> {
-
-    @CommandLine.Option(names = {"-p", "--plan"}, paramLabel = "<file>",
-            description = "Specify the plan file to use.")
-    private File planFile;
-
-    @CommandLine.ParentCommand
-    private MainCommand parent;
-
-    @Override
-    public Integer call() {
-        try {
-            System.out.println("Executing apply...\n");
-            DesiredPlan desiredPlan = new StateManager(parent.isVerboseRequested(), parent.getFile(), parent.isDeleteDisabled(), planFile).apply();
-            LogUtil.printApplyOverview(PlanUtil.getOverview(desiredPlan));
-            return 0;
-        } catch (MissingConfigurationException ex) {
-            LogUtil.printGenericError(ex, true);
-            return 2;
-        } catch (ValidationException ex) {
-            LogUtil.printValidationResult(ex.getMessage(), false);
-            return 2;
-        } catch (KafkaExecutionException ex) {
-            LogUtil.printKafkaExecutionError(ex, true);
-            return 2;
-        }
-    }
-}
\ No newline at end of file
diff --git a/src/main/java/com/devshawn/kafka/dsf/cli/ValidateCommand.java b/src/main/java/com/devshawn/kafka/dsf/cli/ValidateCommand.java
deleted file mode 100644
index 0605a2a9..00000000
--- a/src/main/java/com/devshawn/kafka/dsf/cli/ValidateCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.devshawn.kafka.dsf.cli;
-
-import com.devshawn.kafka.dsf.MainCommand;
-import com.devshawn.kafka.dsf.StateManager;
-import com.devshawn.kafka.dsf.exception.ValidationException;
-import com.devshawn.kafka.dsf.util.LogUtil;
-import picocli.CommandLine;
-import picocli.CommandLine.Command;
-
-import java.util.concurrent.Callable;
-
-
-@Command(name = "validate", description = "Validates the desired state file.")
-public class ValidateCommand implements Callable<Integer> {
-
-    @CommandLine.ParentCommand
-    private MainCommand parent;
-
-    @Override
-    public Integer call() {
-        try {
-            new StateManager(parent.isVerboseRequested(), parent.getFile(), parent.isDeleteDisabled(), null).validate();
-            LogUtil.printValidationResult("Successfully validated the desired state file.", true);
-            return 0;
-        } catch (ValidationException ex) {
-            LogUtil.printValidationResult(ex.getMessage(), false);
-            return 2;
-        }
-    }
-}
diff --git a/src/main/java/com/devshawn/kafka/dsf/config/KafkaDsfConfig.java b/src/main/java/com/devshawn/kafka/dsf/config/KafkaDsfConfig.java
deleted file mode 100644
index c1f78be4..00000000
--- a/src/main/java/com/devshawn/kafka/dsf/config/KafkaDsfConfig.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.devshawn.kafka.dsf.config;
-
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.inferred.freebuilder.FreeBuilder;
-
-import java.util.Map;
-
-@FreeBuilder
-@JsonDeserialize(builder = KafkaDsfConfig.Builder.class)
-public abstract class KafkaDsfConfig {
-
-    public abstract Map<String, Object> getConfig();
-
-    public static class Builder extends KafkaDsfConfig_Builder {
-
-    }
-
-}
diff --git a/src/main/java/com/devshawn/kafka/dsf/MainCommand.java b/src/main/java/com/devshawn/kafka/gitops/MainCommand.java
similarity index 85%
rename from src/main/java/com/devshawn/kafka/dsf/MainCommand.java
rename to src/main/java/com/devshawn/kafka/gitops/MainCommand.java
index 39e33736..0a1ac59f 100644
--- a/src/main/java/com/devshawn/kafka/dsf/MainCommand.java
+++ b/src/main/java/com/devshawn/kafka/gitops/MainCommand.java
@@ -1,8 +1,9 @@
-package com.devshawn.kafka.dsf;
+package com.devshawn.kafka.gitops;
 
-import com.devshawn.kafka.dsf.cli.ApplyCommand;
-import com.devshawn.kafka.dsf.cli.PlanCommand;
-import com.devshawn.kafka.dsf.cli.ValidateCommand;
+import com.devshawn.kafka.gitops.cli.AccountCommand;
+import com.devshawn.kafka.gitops.cli.ApplyCommand;
+import com.devshawn.kafka.gitops.cli.PlanCommand;
+import com.devshawn.kafka.gitops.cli.ValidateCommand;
 import picocli.CommandLine;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
@@ -10,10 +11,11 @@
 import java.io.File;
 import java.util.concurrent.Callable;
 
-@Command(name = "kafka-dsf",
-        version = "0.1.0",
+@Command(name = "kafka-gitops",
+        version = "0.2.0-SNAPSHOT",
         exitCodeOnInvalidInput = 0,
         subcommands = {
+                AccountCommand.class,
                 ApplyCommand.class,
                 PlanCommand.class,
                 ValidateCommand.class
diff --git a/src/main/java/com/devshawn/kafka/gitops/StateManager.java b/src/main/java/com/devshawn/kafka/gitops/StateManager.java
new file mode 100644
index 00000000..bb514d0a
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/StateManager.java
@@ -0,0 +1,210 @@
+package com.devshawn.kafka.gitops;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import com.devshawn.kafka.gitops.config.KafkaGitopsConfigLoader;
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.domain.confluent.ServiceAccount;
+import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.domain.state.CustomAclDetails;
+import com.devshawn.kafka.gitops.domain.state.DesiredState;
+import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
+import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
+import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
+import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
+import com.devshawn.kafka.gitops.exception.ServiceAccountNotFoundException;
+import com.devshawn.kafka.gitops.manager.ApplyManager;
+import com.devshawn.kafka.gitops.manager.PlanManager;
+import com.devshawn.kafka.gitops.service.ConfluentCloudService;
+import com.devshawn.kafka.gitops.service.KafkaService;
+import com.devshawn.kafka.gitops.service.ParserService;
+import com.devshawn.kafka.gitops.util.LogUtil;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StateManager {
+
+    private static org.slf4j.Logger log = LoggerFactory.getLogger(StateManager.class);
+
+    private final ManagerConfig managerConfig;
+    private final ObjectMapper objectMapper;
+    private final ParserService parserService;
+    private final KafkaService kafkaService;
+    private final ConfluentCloudService confluentCloudService;
+
+    private PlanManager planManager;
+    private ApplyManager applyManager;
+
+    public StateManager(ManagerConfig managerConfig, ParserService parserService) {
+        initializeLogger(managerConfig.isVerboseRequested());
+        this.managerConfig = managerConfig;
+        this.objectMapper = initializeObjectMapper();
+        this.kafkaService = new KafkaService(KafkaGitopsConfigLoader.load());
+        this.parserService = parserService;
+        this.confluentCloudService = new ConfluentCloudService(objectMapper);
+        this.planManager = new PlanManager(managerConfig, kafkaService, objectMapper);
+        this.applyManager = new ApplyManager(managerConfig, kafkaService);
+    }
+
+    public void validate() {
+        parserService.parseStateFile();
+    }
+
+    public DesiredPlan plan() {
+        DesiredPlan desiredPlan = generatePlan();
+        planManager.validatePlanHasChanges(desiredPlan);
+        planManager.writePlanToFile(desiredPlan);
+        return desiredPlan;
+    }
+
+    private DesiredPlan generatePlan() {
+        DesiredState desiredState = getDesiredState();
+        DesiredPlan.Builder desiredPlan = new DesiredPlan.Builder();
+        planManager.planAcls(desiredState, desiredPlan);
+        planManager.planTopics(desiredState, desiredPlan);
+        return desiredPlan.build();
+    }
+
+    public DesiredPlan apply() {
+        DesiredPlan desiredPlan = planManager.readPlanFromFile();
+        if (desiredPlan == null) {
+            desiredPlan = generatePlan();
+        }
+
+        planManager.validatePlanHasChanges(desiredPlan);
+
+        applyManager.applyTopics(desiredPlan);
+        applyManager.applyAcls(desiredPlan);
+
+        return desiredPlan;
+    }
+
+    public void createServiceAccounts() {
+        DesiredStateFile desiredStateFile = parserService.parseStateFile();
+        List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
+        AtomicInteger count = new AtomicInteger();
+        if (isConfluentCloudEnabled(desiredStateFile)) {
+            desiredStateFile.getServices().forEach((name, service) -> {
+                if (serviceAccounts.stream().noneMatch(it -> it.getName().equals(name))) {
+                    confluentCloudService.createServiceAccount(name);
+                    LogUtil.printSimpleSuccess(String.format("Successfully created service account: %s", name));
+                    count.getAndIncrement();
+                }
+            });
+        } else {
+            throw new ConfluentCloudException("Confluent Cloud must be enabled in the state file to use this command.");
+        }
+
+        if (count.get() == 0) {
+            LogUtil.printSimpleSuccess("No service accounts were created as there are no new service accounts.");
+        }
+    }
+
+    private DesiredState getDesiredState() {
+        DesiredStateFile desiredStateFile = parserService.parseStateFile();
+        DesiredState.Builder desiredState = new DesiredState.Builder()
+                .addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
+                .putAllTopics(desiredStateFile.getTopics());
+
+        if (isConfluentCloudEnabled(desiredStateFile)) {
+            generateConfluentCloudServiceAcls(desiredState, desiredStateFile);
+        } else {
+            generateServiceAcls(desiredState, desiredStateFile);
+        }
+
+        return desiredState.build();
+    }
+
+    private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
+        List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
+        desiredStateFile.getServices().forEach((name, service) -> {
+            AtomicReference<Integer> index = new AtomicReference<>(0);
+
+            Optional<ServiceAccount> serviceAccount = serviceAccounts.stream().filter(it -> it.getName().equals(name)).findFirst();
+            String serviceAccountId = serviceAccount.orElseThrow(() -> new ServiceAccountNotFoundException(name)).getId();
+
+            service.getAcls(name).forEach(aclDetails -> {
+                aclDetails.setPrincipal(String.format("User:%s", serviceAccountId));
+                desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), aclDetails.build());
+            });
+
+            if (desiredStateFile.getCustomServiceAcls().containsKey(name)) {
+                Map<String, CustomAclDetails> customAcls = desiredStateFile.getCustomServiceAcls().get(name);
+                customAcls.forEach((aclName, customAcl) -> {
+                    AclDetails.Builder aclDetails = AclDetails.fromCustomAclDetails(customAcl);
+                    aclDetails.setPrincipal(String.format("User:%s", serviceAccountId));
+                    desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), aclDetails.build());
+                });
+            }
+        });
+    }
+
+    private void generateServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
+        desiredStateFile.getServices().forEach((name, service) -> {
+            AtomicReference<Integer> index = new AtomicReference<>(0);
+            service.getAcls(name).forEach(aclDetails -> {
+                desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), aclDetails.build());
+            });
+
+            if (desiredStateFile.getCustomServiceAcls().containsKey(name)) {
+                Map<String, CustomAclDetails> customAcls = desiredStateFile.getCustomServiceAcls().get(name);
+                customAcls.forEach((aclName, customAcl) -> {
+                    AclDetails.Builder aclDetails = AclDetails.fromCustomAclDetails(customAcl);
+                    aclDetails.setPrincipal(customAcl.getPrincipal().orElseThrow(() ->
+                            new MissingConfigurationException(String.format("Missing principal for custom ACL %s", aclName))));
+                    desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), aclDetails.build());
+                });
+            }
+        });
+    }
+
+    private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile) {
+        List<String> topics = new ArrayList<>();
+        try {
+            topics.addAll(desiredStateFile.getSettings().get().getTopics().get().getBlacklist().get().getPrefixed());
+        } catch (NoSuchElementException ex) {
+            // Do nothing, no blacklist exists
+        }
+        desiredStateFile.getServices().forEach((name, service) -> {
+            if (service instanceof KafkaStreamsService) {
+                topics.add(name);
+            }
+        });
+        return topics;
+    }
+
+    private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
+        if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getCcloud().isPresent()) {
+            return desiredStateFile.getSettings().get().getCcloud().get().isEnabled();
+        }
+        return false;
+    }
+
+    private ObjectMapper initializeObjectMapper() {
+        ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        objectMapper.registerModule(new Jdk8Module());
+        return objectMapper;
+    }
+
+    private void initializeLogger(boolean verbose) {
+        Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+        Logger kafka = (Logger) LoggerFactory.getLogger("org.apache.kafka");
+        if (verbose) {
+            root.setLevel(Level.INFO);
+            kafka.setLevel(Level.WARN);
+        } else {
+            root.setLevel(Level.WARN);
+            kafka.setLevel(Level.OFF);
+        }
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/cli/AccountCommand.java b/src/main/java/com/devshawn/kafka/gitops/cli/AccountCommand.java
new file mode 100644
index 00000000..c0f1e5fa
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/cli/AccountCommand.java
@@ -0,0 +1,49 @@
+package com.devshawn.kafka.gitops.cli;
+
+import com.devshawn.kafka.gitops.MainCommand;
+import com.devshawn.kafka.gitops.StateManager;
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.exception.*;
+import com.devshawn.kafka.gitops.service.ParserService;
+import com.devshawn.kafka.gitops.util.LogUtil;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+@CommandLine.Command(name = "account", description = "Create Confluent Cloud service accounts.")
+public class AccountCommand implements Callable<Integer> {
+
+    @CommandLine.ParentCommand
+    private MainCommand parent;
+
+    @Override
+    public Integer call() {
+        try {
+            System.out.println("Creating service accounts...\n");
+            ParserService parserService = new ParserService(parent.getFile());
+            StateManager stateManager = new StateManager(generateStateManagerConfig(), parserService);
+            stateManager.createServiceAccounts();
+            return 0;
+        } catch (MissingConfigurationException | ConfluentCloudException ex) {
+            LogUtil.printSimpleError(ex.getMessage());
+            return 2;
+        } catch (ValidationException ex) {
+            LogUtil.printValidationResult(ex.getMessage(), false);
+            return 2;
+        } catch (KafkaExecutionException ex) {
+            LogUtil.printKafkaExecutionError(ex);
+            return 2;
+        } catch (WritePlanOutputException ex) {
+            LogUtil.printPlanOutputError(ex);
+            return 2;
+        }
+    }
+
+    private ManagerConfig generateStateManagerConfig() {
+        return new ManagerConfig.Builder()
+                .setVerboseRequested(parent.isVerboseRequested())
+                .setDeleteDisabled(parent.isDeleteDisabled())
+                .setStateFile(parent.getFile())
+                .build();
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/cli/ApplyCommand.java b/src/main/java/com/devshawn/kafka/gitops/cli/ApplyCommand.java
new file mode 100644
index 00000000..38820af9
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/cli/ApplyCommand.java
@@ -0,0 +1,57 @@
+package com.devshawn.kafka.gitops.cli;
+
+import com.devshawn.kafka.gitops.MainCommand;
+import com.devshawn.kafka.gitops.StateManager;
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
+import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
+import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
+import com.devshawn.kafka.gitops.exception.ValidationException;
+import com.devshawn.kafka.gitops.service.ParserService;
+import com.devshawn.kafka.gitops.util.LogUtil;
+import com.devshawn.kafka.gitops.util.PlanUtil;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.util.concurrent.Callable;
+
+@CommandLine.Command(name = "apply", description = "Apply changes to Kafka resources.")
+public class ApplyCommand implements Callable<Integer> {
+
+    @CommandLine.Option(names = {"-p", "--plan"}, paramLabel = "<file>",
+            description = "Specify the plan file to use.")
+    private File planFile;
+
+    @CommandLine.ParentCommand
+    private MainCommand parent;
+
+    @Override
+    public Integer call() {
+        try {
+            System.out.println("Executing apply...\n");
+            ParserService parserService = new ParserService(parent.getFile());
+            StateManager stateManager = new StateManager(generateStateManagerConfig(), parserService);
+            DesiredPlan desiredPlan = stateManager.apply();
+            LogUtil.printApplyOverview(PlanUtil.getOverview(desiredPlan));
+            return 0;
+        } catch (MissingConfigurationException ex) {
+            LogUtil.printGenericError(ex, true);
+            return 2;
+        } catch (ValidationException ex) {
+            LogUtil.printValidationResult(ex.getMessage(), false);
+            return 2;
+        } catch (KafkaExecutionException ex) {
+            LogUtil.printKafkaExecutionError(ex, true);
+            return 2;
+        }
+    }
+
+    private ManagerConfig generateStateManagerConfig() {
+        return new ManagerConfig.Builder()
+                .setVerboseRequested(parent.isVerboseRequested())
+                .setDeleteDisabled(parent.isDeleteDisabled())
+                .setStateFile(parent.getFile())
+                .setNullablePlanFile(planFile)
+                .build();
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/com/devshawn/kafka/dsf/cli/PlanCommand.java b/src/main/java/com/devshawn/kafka/gitops/cli/PlanCommand.java
similarity index 51%
rename from src/main/java/com/devshawn/kafka/dsf/cli/PlanCommand.java
rename to src/main/java/com/devshawn/kafka/gitops/cli/PlanCommand.java
index e8d1f3b9..b1aa062c 100644
--- a/src/main/java/com/devshawn/kafka/dsf/cli/PlanCommand.java
+++ b/src/main/java/com/devshawn/kafka/gitops/cli/PlanCommand.java
@@ -1,13 +1,15 @@
-package com.devshawn.kafka.dsf.cli;
+package com.devshawn.kafka.gitops.cli;
 
-import com.devshawn.kafka.dsf.MainCommand;
-import com.devshawn.kafka.dsf.StateManager;
-import com.devshawn.kafka.dsf.domain.plan.DesiredPlan;
-import com.devshawn.kafka.dsf.exception.KafkaExecutionException;
-import com.devshawn.kafka.dsf.exception.MissingConfigurationException;
-import com.devshawn.kafka.dsf.exception.ValidationException;
-import com.devshawn.kafka.dsf.exception.WritePlanOutputException;
-import com.devshawn.kafka.dsf.util.LogUtil;
+import com.devshawn.kafka.gitops.MainCommand;
+import com.devshawn.kafka.gitops.StateManager;
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
+import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
+import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
+import com.devshawn.kafka.gitops.exception.ValidationException;
+import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
+import com.devshawn.kafka.gitops.service.ParserService;
+import com.devshawn.kafka.gitops.util.LogUtil;
 import picocli.CommandLine;
 
 import java.io.File;
@@ -27,11 +29,10 @@ public class PlanCommand implements Callable<Integer> {
     public Integer call() {
         try {
             System.out.println("Generating execution plan...\n");
-            StateManager stateManager = new StateManager(parent.isVerboseRequested(), parent.getFile(), parent.isDeleteDisabled(), outputFile);
+            ParserService parserService = new ParserService(parent.getFile());
+            StateManager stateManager = new StateManager(generateStateManagerConfig(), parserService);
             DesiredPlan desiredPlan = stateManager.plan();
-            stateManager.validatePlanHasChanges(desiredPlan);
             LogUtil.printPlan(desiredPlan);
-            stateManager.writePlanToFile(desiredPlan);
             return 0;
         } catch (MissingConfigurationException ex) {
             LogUtil.printGenericError(ex);
@@ -47,4 +48,13 @@ public Integer call() {
             return 2;
         }
     }
+
+    private ManagerConfig generateStateManagerConfig() {
+        return new ManagerConfig.Builder()
+                .setVerboseRequested(parent.isVerboseRequested())
+                .setDeleteDisabled(parent.isDeleteDisabled())
+                .setStateFile(parent.getFile())
+                .setNullablePlanFile(outputFile)
+                .build();
+    }
 }
diff --git a/src/main/java/com/devshawn/kafka/gitops/cli/ValidateCommand.java b/src/main/java/com/devshawn/kafka/gitops/cli/ValidateCommand.java
new file mode 100644
index 00000000..9fc1b0d9
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/cli/ValidateCommand.java
@@ -0,0 +1,42 @@
+package com.devshawn.kafka.gitops.cli;
+
+import com.devshawn.kafka.gitops.MainCommand;
+import com.devshawn.kafka.gitops.StateManager;
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.exception.ValidationException;
+import com.devshawn.kafka.gitops.service.ParserService;
+import com.devshawn.kafka.gitops.util.LogUtil;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+import java.util.concurrent.Callable;
+
+
+@Command(name = "validate", description = "Validates the desired state file.")
+public class ValidateCommand implements Callable<Integer> {
+
+    @CommandLine.ParentCommand
+    private MainCommand parent;
+
+    @Override
+    public Integer call() {
+        try {
+            ParserService parserService = new ParserService(parent.getFile());
+            StateManager stateManager = new StateManager(generateStateManagerConfig(), parserService);
+            stateManager.validate();
+            LogUtil.printValidationResult("Successfully validated the desired state file.", true);
+            return 0;
+        } catch (ValidationException ex) {
+            LogUtil.printValidationResult(ex.getMessage(), false);
+            return 2;
+        }
+    }
+
+    private ManagerConfig generateStateManagerConfig() {
+        return new ManagerConfig.Builder()
+                .setVerboseRequested(parent.isVerboseRequested())
+                .setDeleteDisabled(parent.isDeleteDisabled())
+                .setStateFile(parent.getFile())
+                .build();
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/config/KafkaGitopsConfig.java b/src/main/java/com/devshawn/kafka/gitops/config/KafkaGitopsConfig.java
new file mode 100644
index 00000000..1b1ce175
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/config/KafkaGitopsConfig.java
@@ -0,0 +1,18 @@
+package com.devshawn.kafka.gitops.config;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.Map;
+
+@FreeBuilder
+@JsonDeserialize(builder = KafkaGitopsConfig.Builder.class)
+public interface KafkaGitopsConfig {
+
+    Map<String, Object> getConfig();
+
+    class Builder extends KafkaGitopsConfig_Builder {
+
+    }
+
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/config/KafkaDsfConfigLoader.java b/src/main/java/com/devshawn/kafka/gitops/config/KafkaGitopsConfigLoader.java
similarity index 85%
rename from src/main/java/com/devshawn/kafka/dsf/config/KafkaDsfConfigLoader.java
rename to src/main/java/com/devshawn/kafka/gitops/config/KafkaGitopsConfigLoader.java
index 16cff071..5221454c 100644
--- a/src/main/java/com/devshawn/kafka/dsf/config/KafkaDsfConfigLoader.java
+++ b/src/main/java/com/devshawn/kafka/gitops/config/KafkaGitopsConfigLoader.java
@@ -1,6 +1,6 @@
-package com.devshawn.kafka.dsf.config;
+package com.devshawn.kafka.gitops.config;
 
-import com.devshawn.kafka.dsf.exception.MissingConfigurationException;
+import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.slf4j.LoggerFactory;
 
@@ -8,17 +8,17 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class KafkaDsfConfigLoader {
+public class KafkaGitopsConfigLoader {
 
-    private static org.slf4j.Logger log = LoggerFactory.getLogger(KafkaDsfConfigLoader.class);
+    private static org.slf4j.Logger log = LoggerFactory.getLogger(KafkaGitopsConfigLoader.class);
 
-    public static KafkaDsfConfig load() {
-        KafkaDsfConfig.Builder builder = new KafkaDsfConfig.Builder();
+    public static KafkaGitopsConfig load() {
+        KafkaGitopsConfig.Builder builder = new KafkaGitopsConfig.Builder();
         setConfig(builder);
         return builder.build();
     }
 
-    private static void setConfig(KafkaDsfConfig.Builder builder) {
+    private static void setConfig(KafkaGitopsConfig.Builder builder) {
         Map<String, Object> config = new HashMap<>();
         AtomicReference<String> username = new AtomicReference<>();
         AtomicReference<String> password = new AtomicReference<>();
@@ -50,7 +50,7 @@ private static void handleDefaultConfig(Map<String, Object> config) {
         }
 
         if (!config.containsKey(CommonClientConfigs.CLIENT_ID_CONFIG)) {
-            config.put(CommonClientConfigs.CLIENT_ID_CONFIG, "kafka-dsf");
+            config.put(CommonClientConfigs.CLIENT_ID_CONFIG, "kafka-gitops");
         }
     }
 
diff --git a/src/main/java/com/devshawn/kafka/gitops/config/ManagerConfig.java b/src/main/java/com/devshawn/kafka/gitops/config/ManagerConfig.java
new file mode 100644
index 00000000..59e5fe8d
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/config/ManagerConfig.java
@@ -0,0 +1,23 @@
+package com.devshawn.kafka.gitops.config;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.io.File;
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = ManagerConfig.Builder.class)
+public interface ManagerConfig {
+
+    boolean isVerboseRequested();
+
+    boolean isDeleteDisabled();
+
+    File getStateFile();
+
+    Optional<File> getPlanFile();
+
+    class Builder extends ManagerConfig_Builder {
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/confluent/ServiceAccount.java b/src/main/java/com/devshawn/kafka/gitops/domain/confluent/ServiceAccount.java
new file mode 100644
index 00000000..84da143a
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/confluent/ServiceAccount.java
@@ -0,0 +1,18 @@
+package com.devshawn.kafka.gitops.domain.confluent;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+@FreeBuilder
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonDeserialize(builder = ServiceAccount.Builder.class)
+public interface ServiceAccount {
+
+    String getId();
+
+    String getName();
+
+    class Builder extends ServiceAccount_Builder {
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/plan/AclPlan.java b/src/main/java/com/devshawn/kafka/gitops/domain/plan/AclPlan.java
similarity index 68%
rename from src/main/java/com/devshawn/kafka/dsf/domain/plan/AclPlan.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/plan/AclPlan.java
index d104add1..e1ad8d80 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/plan/AclPlan.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/plan/AclPlan.java
@@ -1,7 +1,7 @@
-package com.devshawn.kafka.dsf.domain.plan;
+package com.devshawn.kafka.gitops.domain.plan;
 
-import com.devshawn.kafka.dsf.domain.state.AclDetails;
-import com.devshawn.kafka.dsf.enums.PlanAction;
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.enums.PlanAction;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/plan/DesiredPlan.java b/src/main/java/com/devshawn/kafka/gitops/domain/plan/DesiredPlan.java
similarity index 88%
rename from src/main/java/com/devshawn/kafka/dsf/domain/plan/DesiredPlan.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/plan/DesiredPlan.java
index 4b8c93ce..a8be34c9 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/plan/DesiredPlan.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/plan/DesiredPlan.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.plan;
+package com.devshawn.kafka.gitops.domain.plan;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/plan/PlanOverview.java b/src/main/java/com/devshawn/kafka/gitops/domain/plan/PlanOverview.java
similarity index 88%
rename from src/main/java/com/devshawn/kafka/dsf/domain/plan/PlanOverview.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/plan/PlanOverview.java
index d3918a28..aa0d5398 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/plan/PlanOverview.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/plan/PlanOverview.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.plan;
+package com.devshawn.kafka.gitops.domain.plan;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/plan/TopicConfigPlan.java b/src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicConfigPlan.java
similarity index 80%
rename from src/main/java/com/devshawn/kafka/dsf/domain/plan/TopicConfigPlan.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicConfigPlan.java
index ae276fb5..22fc24d9 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/plan/TopicConfigPlan.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicConfigPlan.java
@@ -1,6 +1,6 @@
-package com.devshawn.kafka.dsf.domain.plan;
+package com.devshawn.kafka.gitops.domain.plan;
 
-import com.devshawn.kafka.dsf.enums.PlanAction;
+import com.devshawn.kafka.gitops.enums.PlanAction;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/plan/TopicPlan.java b/src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicPlan.java
similarity index 74%
rename from src/main/java/com/devshawn/kafka/dsf/domain/plan/TopicPlan.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicPlan.java
index 269db912..a1220f0c 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/plan/TopicPlan.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicPlan.java
@@ -1,7 +1,7 @@
-package com.devshawn.kafka.dsf.domain.plan;
+package com.devshawn.kafka.gitops.domain.plan;
 
-import com.devshawn.kafka.dsf.domain.state.TopicDetails;
-import com.devshawn.kafka.dsf.enums.PlanAction;
+import com.devshawn.kafka.gitops.domain.state.TopicDetails;
+import com.devshawn.kafka.gitops.enums.PlanAction;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
 
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/AbstractService.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/AbstractService.java
new file mode 100644
index 00000000..b91b5214
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/AbstractService.java
@@ -0,0 +1,58 @@
+package com.devshawn.kafka.gitops.domain.state;
+
+import java.util.Optional;
+
+public abstract class AbstractService {
+
+    public AclDetails.Builder generateReadAcl(String topic, Optional<String> principal) {
+        AclDetails.Builder builder = new AclDetails.Builder()
+                .setHost("*")
+                .setName(topic)
+                .setOperation("READ")
+                .setPermission("ALLOW")
+                .setPattern("LITERAL")
+                .setType("TOPIC");
+
+        principal.ifPresent(builder::setPrincipal);
+        return builder;
+    }
+
+    public AclDetails.Builder generateWriteACL(String topic, Optional<String> principal) {
+        AclDetails.Builder builder = new AclDetails.Builder()
+                .setHost("*")
+                .setName(topic)
+                .setOperation("WRITE")
+                .setPermission("ALLOW")
+                .setPattern("LITERAL")
+                .setType("TOPIC");
+
+        principal.ifPresent(builder::setPrincipal);
+        return builder;
+    }
+
+    public AclDetails.Builder generatePrefixedTopicACL(String topic, Optional<String> principal, String operation) {
+        AclDetails.Builder builder = new AclDetails.Builder()
+                .setHost("*")
+                .setName(topic)
+                .setOperation(operation)
+                .setPermission("ALLOW")
+                .setPattern("PREFIXED")
+                .setType("TOPIC");
+
+        principal.ifPresent(builder::setPrincipal);
+        return builder;
+    }
+
+    public AclDetails.Builder generateConsumerGroupAcl(String consumerGroupId, Optional<String> principal, String operation) {
+        AclDetails.Builder builder = new AclDetails.Builder()
+                .setHost("*")
+                .setName(consumerGroupId)
+                .setOperation(operation)
+                .setPermission("ALLOW")
+                .setPattern("LITERAL")
+                .setType("GROUP");
+
+        principal.ifPresent(builder::setPrincipal);
+        return builder;
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/state/AclDetails.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/AclDetails.java
similarity index 82%
rename from src/main/java/com/devshawn/kafka/dsf/domain/state/AclDetails.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/state/AclDetails.java
index 9f988acc..09fb0260 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/state/AclDetails.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/AclDetails.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.state;
+package com.devshawn.kafka.gitops.domain.state;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.kafka.common.acl.AccessControlEntry;
@@ -40,6 +40,16 @@ public static AclDetails fromAclBinding(AclBinding aclBinding) {
         return aclDetails.build();
     }
 
+    public static AclDetails.Builder fromCustomAclDetails(CustomAclDetails customAclDetails) {
+        return new AclDetails.Builder()
+                .setName(customAclDetails.getName())
+                .setType(customAclDetails.getType())
+                .setPattern(customAclDetails.getPattern())
+                .setHost(customAclDetails.getHost())
+                .setOperation(customAclDetails.getOperation())
+                .setPermission(customAclDetails.getPermission());
+    }
+
     public boolean equalsAclBinding(AclBinding aclBinding) {
         if (aclBinding.pattern().name().equals(getName())
                 && aclBinding.pattern().patternType().name().equals(getPattern())
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/CustomAclDetails.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/CustomAclDetails.java
new file mode 100644
index 00000000..788332ad
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/CustomAclDetails.java
@@ -0,0 +1,29 @@
+package com.devshawn.kafka.gitops.domain.state;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = CustomAclDetails.Builder.class)
+public abstract class CustomAclDetails {
+
+    public abstract String getName();
+
+    public abstract String getType();
+
+    public abstract String getPattern();
+
+    public abstract Optional<String> getPrincipal();
+
+    public abstract String getHost();
+
+    public abstract String getOperation();
+
+    public abstract String getPermission();
+
+    public static class Builder extends CustomAclDetails_Builder {
+
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/state/DesiredState.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredState.java
similarity index 76%
rename from src/main/java/com/devshawn/kafka/dsf/domain/state/DesiredState.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredState.java
index df0d621f..d46d6d78 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/state/DesiredState.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredState.java
@@ -1,21 +1,21 @@
-package com.devshawn.kafka.dsf.domain.state;
+package com.devshawn.kafka.gitops.domain.state;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 @FreeBuilder
 @JsonDeserialize(builder = DesiredState.Builder.class)
 public interface DesiredState {
 
-    Optional<Settings> getSettings();
-
     Map<String, TopicDetails> getTopics();
 
     Map<String, AclDetails> getAcls();
 
+    List<String> getPrefixedTopicsToIgnore();
+
     class Builder extends DesiredState_Builder {
     }
 }
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredStateFile.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredStateFile.java
new file mode 100644
index 00000000..5974d951
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredStateFile.java
@@ -0,0 +1,25 @@
+package com.devshawn.kafka.gitops.domain.state;
+
+
+import com.devshawn.kafka.gitops.domain.state.settings.Settings;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.Map;
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = DesiredStateFile.Builder.class)
+public interface DesiredStateFile {
+
+    Optional<Settings> getSettings();
+
+    Map<String, ServiceDetails> getServices();
+
+    Map<String, TopicDetails> getTopics();
+
+    Map<String, Map<String, CustomAclDetails>> getCustomServiceAcls();
+
+    class Builder extends DesiredStateFile_Builder {
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/ServiceDetails.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/ServiceDetails.java
new file mode 100644
index 00000000..882f8f68
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/ServiceDetails.java
@@ -0,0 +1,24 @@
+package com.devshawn.kafka.gitops.domain.state;
+
+import com.devshawn.kafka.gitops.domain.state.service.ApplicationService;
+import com.devshawn.kafka.gitops.domain.state.service.KafkaConnectService;
+import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.List;
+
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = ApplicationService.class, name = "application"),
+        @JsonSubTypes.Type(value = KafkaConnectService.class, name = "kafka-connect"),
+        @JsonSubTypes.Type(value = KafkaStreamsService.class, name = "kafka-streams")
+})
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public abstract class ServiceDetails extends AbstractService {
+
+    public String type;
+
+    public List<AclDetails.Builder> getAcls(String serviceName) {
+        throw new UnsupportedOperationException("Method getAcls is not implemented.");
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/state/TopicDetails.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/TopicDetails.java
similarity index 89%
rename from src/main/java/com/devshawn/kafka/dsf/domain/state/TopicDetails.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/state/TopicDetails.java
index 7c032220..1c5dabad 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/state/TopicDetails.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/TopicDetails.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.state;
+package com.devshawn.kafka.gitops.domain.state;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/service/ApplicationService.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/ApplicationService.java
new file mode 100644
index 00000000..ccf4408f
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/ApplicationService.java
@@ -0,0 +1,36 @@
+package com.devshawn.kafka.gitops.domain.state.service;
+
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = ApplicationService.Builder.class)
+public abstract class ApplicationService extends ServiceDetails {
+
+    public abstract Optional<String> getPrincipal();
+
+    public abstract List<String> getProduces();
+
+    public abstract List<String> getConsumes();
+
+    @Override
+    public List<AclDetails.Builder> getAcls(String serviceName) {
+        List<AclDetails.Builder> acls = new ArrayList<>();
+        getProduces().forEach(topic -> acls.add(generateWriteACL(topic, getPrincipal())));
+        getConsumes().forEach(topic -> acls.add(generateReadAcl(topic, getPrincipal())));
+        if (!getConsumes().isEmpty()) {
+            acls.add(generateConsumerGroupAcl(serviceName, getPrincipal(), "READ"));
+        }
+        return acls;
+    }
+
+    public static class Builder extends ApplicationService_Builder {
+
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectService.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectService.java
new file mode 100644
index 00000000..00f9e41d
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectService.java
@@ -0,0 +1,48 @@
+package com.devshawn.kafka.gitops.domain.state.service;
+
+
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = KafkaConnectService.Builder.class)
+public abstract class KafkaConnectService extends ServiceDetails {
+
+    public abstract Optional<String> getPrincipal();
+
+    public abstract List<String> getProduces();
+
+    public abstract Map<String, KafkaConnectorDetails> getConnectors();
+
+    @Override
+    public List<AclDetails.Builder> getAcls(String serviceName) {
+        List<AclDetails.Builder> acls = new ArrayList<>();
+        getProduces().forEach(topic -> acls.add(generateWriteACL(topic, getPrincipal())));
+        acls.addAll(getConnectWorkerAcls(serviceName));
+        return acls;
+    }
+
+    private List<AclDetails.Builder> getConnectWorkerAcls(String serviceName) {
+        List<AclDetails.Builder> acls = new ArrayList<>();
+        acls.add(generateReadAcl(String.format("connect-configs-%s", serviceName), getPrincipal()));
+        acls.add(generateReadAcl(String.format("connect-offsets-%s", serviceName), getPrincipal()));
+        acls.add(generateReadAcl(String.format("connect-status-%s", serviceName), getPrincipal()));
+        acls.add(generateWriteACL(String.format("connect-configs-%s", serviceName), getPrincipal()));
+        acls.add(generateWriteACL(String.format("connect-offsets-%s", serviceName), getPrincipal()));
+        acls.add(generateWriteACL(String.format("connect-status-%s", serviceName), getPrincipal()));
+        acls.add(generateConsumerGroupAcl(serviceName, getPrincipal(), "READ"));
+        getConnectors().forEach((connectorName, connector) -> acls.addAll(connector.getAcls(connectorName, getPrincipal())));
+        return acls;
+    }
+
+    public static class Builder extends KafkaConnectService_Builder {
+
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectorDetails.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectorDetails.java
new file mode 100644
index 00000000..a8a3793f
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectorDetails.java
@@ -0,0 +1,33 @@
+package com.devshawn.kafka.gitops.domain.state.service;
+
+import com.devshawn.kafka.gitops.domain.state.AbstractService;
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = KafkaConnectorDetails.Builder.class)
+public abstract class KafkaConnectorDetails extends AbstractService {
+
+    public abstract List<String> getProduces();
+
+    public abstract List<String> getConsumes();
+
+    public List<AclDetails.Builder> getAcls(String connectorName, Optional<String> principal) {
+        List<AclDetails.Builder> acls = new ArrayList<>();
+        getProduces().forEach(topic -> acls.add(generateWriteACL(topic, principal)));
+        getConsumes().forEach(topic -> acls.add(generateReadAcl(topic, principal)));
+        if (!getConsumes().isEmpty()) {
+            acls.add(generateConsumerGroupAcl(String.format("connect-%s", connectorName), principal, "READ"));
+        }
+        return acls;
+    }
+
+    public static class Builder extends KafkaConnectorDetails_Builder {
+
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaStreamsService.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaStreamsService.java
new file mode 100644
index 00000000..b9fd5f2f
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaStreamsService.java
@@ -0,0 +1,48 @@
+package com.devshawn.kafka.gitops.domain.state.service;
+
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@FreeBuilder
+@JsonDeserialize(builder = KafkaStreamsService.Builder.class)
+public abstract class KafkaStreamsService extends ServiceDetails {
+
+    public abstract Optional<String> getPrincipal();
+
+    public abstract List<String> getProduces();
+
+    public abstract List<String> getConsumes();
+
+    @Override
+    public List<AclDetails.Builder> getAcls(String serviceName) {
+        List<AclDetails.Builder> acls = new ArrayList<>();
+        getProduces().forEach(topic -> acls.add(generateWriteACL(topic, getPrincipal())));
+        getConsumes().forEach(topic -> acls.add(generateReadAcl(topic, getPrincipal())));
+        acls.addAll(getInternalAcls(serviceName));
+        return acls;
+    }
+
+    private List<AclDetails.Builder> getInternalAcls(String serviceName) {
+        List<AclDetails.Builder> acls = new ArrayList<>();
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "READ"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "WRITE"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "DESCRIBE"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "DELETE"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "CREATE"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "ALTER"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "ALTER_CONFIGS"));
+        acls.add(generatePrefixedTopicACL(serviceName, getPrincipal(), "DESCRIBE_CONFIGS"));
+        acls.add(generateConsumerGroupAcl(serviceName, getPrincipal(), "READ"));
+        acls.add(generateConsumerGroupAcl(serviceName, getPrincipal(), "DESCRIBE"));
+        return acls;
+    }
+
+    public static class Builder extends KafkaStreamsService_Builder {
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/state/Settings.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/Settings.java
similarity index 76%
rename from src/main/java/com/devshawn/kafka/dsf/domain/state/Settings.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/state/settings/Settings.java
index 628ba09e..de75d478 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/state/Settings.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/Settings.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.state;
+package com.devshawn.kafka.gitops.domain.state.settings;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
@@ -9,6 +9,8 @@
 @JsonDeserialize(builder = Settings.Builder.class)
 public interface Settings {
 
+    Optional<SettingsCCloud> getCcloud();
+
     Optional<SettingsTopics> getTopics();
 
     class Builder extends Settings_Builder {
diff --git a/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsCCloud.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsCCloud.java
new file mode 100644
index 00000000..5428561b
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsCCloud.java
@@ -0,0 +1,14 @@
+package com.devshawn.kafka.gitops.domain.state.settings;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.inferred.freebuilder.FreeBuilder;
+
+@FreeBuilder
+@JsonDeserialize(builder = SettingsCCloud.Builder.class)
+public interface SettingsCCloud {
+
+    boolean isEnabled();
+
+    class Builder extends SettingsCCloud_Builder {
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/state/SettingsTopics.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopics.java
similarity index 86%
rename from src/main/java/com/devshawn/kafka/dsf/domain/state/SettingsTopics.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopics.java
index d1acbd84..6ac3811f 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/state/SettingsTopics.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopics.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.state;
+package com.devshawn.kafka.gitops.domain.state.settings;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
diff --git a/src/main/java/com/devshawn/kafka/dsf/domain/state/SettingsTopicsBlacklist.java b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopicsBlacklist.java
similarity index 86%
rename from src/main/java/com/devshawn/kafka/dsf/domain/state/SettingsTopicsBlacklist.java
rename to src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopicsBlacklist.java
index 911022c9..2dcbede4 100644
--- a/src/main/java/com/devshawn/kafka/dsf/domain/state/SettingsTopicsBlacklist.java
+++ b/src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopicsBlacklist.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.domain.state;
+package com.devshawn.kafka.gitops.domain.state.settings;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.inferred.freebuilder.FreeBuilder;
diff --git a/src/main/java/com/devshawn/kafka/dsf/enums/PlanAction.java b/src/main/java/com/devshawn/kafka/gitops/enums/PlanAction.java
similarity index 64%
rename from src/main/java/com/devshawn/kafka/dsf/enums/PlanAction.java
rename to src/main/java/com/devshawn/kafka/gitops/enums/PlanAction.java
index 72503aff..98de2903 100644
--- a/src/main/java/com/devshawn/kafka/dsf/enums/PlanAction.java
+++ b/src/main/java/com/devshawn/kafka/gitops/enums/PlanAction.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.enums;
+package com.devshawn.kafka.gitops.enums;
 
 public enum PlanAction {
     ADD,
diff --git a/src/main/java/com/devshawn/kafka/gitops/exception/ConfluentCloudException.java b/src/main/java/com/devshawn/kafka/gitops/exception/ConfluentCloudException.java
new file mode 100644
index 00000000..56ee2077
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/ConfluentCloudException.java
@@ -0,0 +1,8 @@
+package com.devshawn.kafka.gitops.exception;
+
+public class ConfluentCloudException extends RuntimeException {
+
+    public ConfluentCloudException(String message) {
+        super(message);
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/exception/KafkaExecutionException.java b/src/main/java/com/devshawn/kafka/gitops/exception/KafkaExecutionException.java
similarity index 88%
rename from src/main/java/com/devshawn/kafka/dsf/exception/KafkaExecutionException.java
rename to src/main/java/com/devshawn/kafka/gitops/exception/KafkaExecutionException.java
index 16fb7588..149a7b1e 100644
--- a/src/main/java/com/devshawn/kafka/dsf/exception/KafkaExecutionException.java
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/KafkaExecutionException.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.exception;
+package com.devshawn.kafka.gitops.exception;
 
 public class KafkaExecutionException extends RuntimeException {
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/exception/MissingConfigurationException.java b/src/main/java/com/devshawn/kafka/gitops/exception/MissingConfigurationException.java
similarity index 84%
rename from src/main/java/com/devshawn/kafka/dsf/exception/MissingConfigurationException.java
rename to src/main/java/com/devshawn/kafka/gitops/exception/MissingConfigurationException.java
index 23dadfa5..d203911b 100644
--- a/src/main/java/com/devshawn/kafka/dsf/exception/MissingConfigurationException.java
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/MissingConfigurationException.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.exception;
+package com.devshawn.kafka.gitops.exception;
 
 public class MissingConfigurationException extends RuntimeException {
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/exception/ReadPlanInputException.java b/src/main/java/com/devshawn/kafka/gitops/exception/ReadPlanInputException.java
similarity index 82%
rename from src/main/java/com/devshawn/kafka/dsf/exception/ReadPlanInputException.java
rename to src/main/java/com/devshawn/kafka/gitops/exception/ReadPlanInputException.java
index b2c86d3b..57295419 100644
--- a/src/main/java/com/devshawn/kafka/dsf/exception/ReadPlanInputException.java
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/ReadPlanInputException.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.exception;
+package com.devshawn.kafka.gitops.exception;
 
 public class ReadPlanInputException extends RuntimeException {
 
diff --git a/src/main/java/com/devshawn/kafka/gitops/exception/ServiceAccountNotFoundException.java b/src/main/java/com/devshawn/kafka/gitops/exception/ServiceAccountNotFoundException.java
new file mode 100644
index 00000000..1366073e
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/ServiceAccountNotFoundException.java
@@ -0,0 +1,8 @@
+package com.devshawn.kafka.gitops.exception;
+
+public class ServiceAccountNotFoundException extends RuntimeException {
+
+    public ServiceAccountNotFoundException(String service) {
+        super(String.format("Service account not found for service: %s", service));
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/exception/UnknownFileException.java b/src/main/java/com/devshawn/kafka/gitops/exception/UnknownFileException.java
similarity index 80%
rename from src/main/java/com/devshawn/kafka/dsf/exception/UnknownFileException.java
rename to src/main/java/com/devshawn/kafka/gitops/exception/UnknownFileException.java
index 08be57d2..ee8f430f 100644
--- a/src/main/java/com/devshawn/kafka/dsf/exception/UnknownFileException.java
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/UnknownFileException.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.exception;
+package com.devshawn.kafka.gitops.exception;
 
 public class UnknownFileException extends RuntimeException {
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/exception/ValidationException.java b/src/main/java/com/devshawn/kafka/gitops/exception/ValidationException.java
similarity index 76%
rename from src/main/java/com/devshawn/kafka/dsf/exception/ValidationException.java
rename to src/main/java/com/devshawn/kafka/gitops/exception/ValidationException.java
index d509ec9e..781d3a7f 100644
--- a/src/main/java/com/devshawn/kafka/dsf/exception/ValidationException.java
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/ValidationException.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.exception;
+package com.devshawn.kafka.gitops.exception;
 
 public class ValidationException extends RuntimeException {
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/exception/WritePlanOutputException.java b/src/main/java/com/devshawn/kafka/gitops/exception/WritePlanOutputException.java
similarity index 82%
rename from src/main/java/com/devshawn/kafka/dsf/exception/WritePlanOutputException.java
rename to src/main/java/com/devshawn/kafka/gitops/exception/WritePlanOutputException.java
index 53fc64d9..9ca9a27c 100644
--- a/src/main/java/com/devshawn/kafka/dsf/exception/WritePlanOutputException.java
+++ b/src/main/java/com/devshawn/kafka/gitops/exception/WritePlanOutputException.java
@@ -1,4 +1,4 @@
-package com.devshawn.kafka.dsf.exception;
+package com.devshawn.kafka.gitops.exception;
 
 public class WritePlanOutputException extends RuntimeException {
 
diff --git a/src/main/java/com/devshawn/kafka/gitops/manager/ApplyManager.java b/src/main/java/com/devshawn/kafka/gitops/manager/ApplyManager.java
new file mode 100644
index 00000000..f32160ee
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/manager/ApplyManager.java
@@ -0,0 +1,78 @@
+package com.devshawn.kafka.gitops.manager;
+
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
+import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
+import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
+import com.devshawn.kafka.gitops.enums.PlanAction;
+import com.devshawn.kafka.gitops.service.KafkaService;
+import com.devshawn.kafka.gitops.util.LogUtil;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.*;
+
+public class ApplyManager {
+
+    private final ManagerConfig managerConfig;
+    private final KafkaService kafkaService;
+
+    public ApplyManager(ManagerConfig managerConfig, KafkaService kafkaService) {
+        this.managerConfig = managerConfig;
+        this.kafkaService = kafkaService;
+    }
+
+    public void applyTopics(DesiredPlan desiredPlan) {
+        desiredPlan.getTopicPlans().forEach(topicPlan -> {
+            if (topicPlan.getAction() == PlanAction.ADD) {
+                LogUtil.printTopicPreApply(topicPlan);
+                kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetails().get());
+                LogUtil.printPostApply();
+            } else if (topicPlan.getAction() == PlanAction.UPDATE) {
+                LogUtil.printTopicPreApply(topicPlan);
+                topicPlan.getTopicConfigPlans().forEach(topicConfigPlan -> applyTopicConfiguration(topicPlan, topicConfigPlan));
+                LogUtil.printPostApply();
+            } else if (topicPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled()) {
+                LogUtil.printTopicPreApply(topicPlan);
+                kafkaService.deleteTopic(topicPlan.getName());
+                LogUtil.printPostApply();
+            }
+        });
+    }
+
+    private void applyTopicConfiguration(TopicPlan topicPlan, TopicConfigPlan topicConfigPlan) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
+        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicPlan.getName());
+        List<AlterConfigOp> configOps = new ArrayList<>();
+
+        ConfigEntry configEntry = new ConfigEntry(topicConfigPlan.getKey(), topicConfigPlan.getValue().orElse(null));
+
+        // TODO: Make OpType work with append/subtract
+        if (topicConfigPlan.getAction() == PlanAction.ADD) {
+            configOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
+        } else if (topicConfigPlan.getAction() == PlanAction.UPDATE) {
+            configOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
+        } else if (topicConfigPlan.getAction() == PlanAction.REMOVE) {
+            configOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.DELETE));
+        }
+
+        configs.put(configResource, configOps);
+
+        kafkaService.updateTopicConfig(configs);
+    }
+
+    public void applyAcls(DesiredPlan desiredPlan) {
+        desiredPlan.getAclPlans().forEach(aclPlan -> {
+            if (aclPlan.getAction() == PlanAction.ADD) {
+                LogUtil.printAclPreApply(aclPlan);
+                kafkaService.createAcl(aclPlan.getAclDetails().toAclBinding());
+                LogUtil.printPostApply();
+            } else if (aclPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled()) {
+                LogUtil.printAclPreApply(aclPlan);
+                kafkaService.deleteAcl(aclPlan.getAclDetails().toAclBinding());
+                LogUtil.printPostApply();
+            }
+        });
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java b/src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java
new file mode 100644
index 00000000..6d3d9b5e
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java
@@ -0,0 +1,202 @@
+package com.devshawn.kafka.gitops.manager;
+
+import com.devshawn.kafka.gitops.config.ManagerConfig;
+import com.devshawn.kafka.gitops.domain.plan.*;
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.domain.state.DesiredState;
+import com.devshawn.kafka.gitops.domain.state.TopicDetails;
+import com.devshawn.kafka.gitops.enums.PlanAction;
+import com.devshawn.kafka.gitops.exception.ReadPlanInputException;
+import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
+import com.devshawn.kafka.gitops.service.KafkaService;
+import com.devshawn.kafka.gitops.util.LogUtil;
+import com.devshawn.kafka.gitops.util.PlanUtil;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.common.acl.AclBinding;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PlanManager {
+
+    private static org.slf4j.Logger log = LoggerFactory.getLogger(PlanManager.class);
+
+    private final ManagerConfig managerConfig;
+    private final KafkaService kafkaService;
+    private final ObjectMapper objectMapper;
+
+    public PlanManager(ManagerConfig managerConfig, KafkaService kafkaService, ObjectMapper objectMapper) {
+        this.managerConfig = managerConfig;
+        this.kafkaService = kafkaService;
+        this.objectMapper = objectMapper;
+    }
+
+    public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
+        List<TopicListing> topics = kafkaService.getTopics();
+
+        desiredState.getTopics().forEach((key, value) -> {
+            TopicPlan.Builder topicPlan = new TopicPlan.Builder()
+                    .setName(key)
+                    .setTopicDetails(value);
+
+            TopicDescription topicDescription = kafkaService.describeTopic(key);
+
+            if (topicDescription == null) {
+                log.info("[PLAN] Topic {} does not exist; it will be created.", key);
+                topicPlan.setAction(PlanAction.ADD);
+            } else {
+                log.info("[PLAN] Topic {} exists, it will not be created.", key);
+                topicPlan.setAction(PlanAction.NO_CHANGE);
+                planTopicConfigurations(key, value, topicPlan);
+            }
+
+            desiredPlan.addTopicPlans(topicPlan.build());
+        });
+
+        topics.forEach(currentTopic -> {
+            boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
+            if (shouldIgnore) {
+                log.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
+                return;
+            }
+
+            if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
+                TopicPlan topicPlan = new TopicPlan.Builder()
+                        .setName(currentTopic.name())
+                        .setAction(PlanAction.REMOVE)
+                        .build();
+
+                desiredPlan.addTopicPlans(topicPlan);
+            }
+        });
+    }
+
+    private void planTopicConfigurations(String topicName, TopicDetails topicDetails, TopicPlan.Builder topicPlan) {
+        Map<String, TopicConfigPlan> configPlans = new HashMap<>();
+        List<ConfigEntry> currentConfigs = kafkaService.describeTopicConfigs(topicName);
+        List<ConfigEntry> customConfigs = currentConfigs.stream()
+                .filter(it -> it.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
+                .collect(Collectors.toList());
+
+        customConfigs.forEach(currentConfig -> {
+            String newConfig = topicDetails.getConfigs().getOrDefault(currentConfig.name(), null);
+
+            TopicConfigPlan.Builder topicConfigPlan = new TopicConfigPlan.Builder()
+                    .setKey(currentConfig.name());
+
+            if (currentConfig.value().equals(newConfig)) {
+                topicConfigPlan.setAction(PlanAction.NO_CHANGE);
+                topicConfigPlan.setValue(currentConfig.value());
+                configPlans.put(currentConfig.name(), topicConfigPlan.build());
+            } else if (newConfig == null) {
+                topicConfigPlan.setAction(PlanAction.REMOVE);
+                configPlans.put(currentConfig.name(), topicConfigPlan.build());
+                topicPlan.setAction(PlanAction.UPDATE);
+            }
+        });
+
+        topicDetails.getConfigs().forEach((key, value) -> {
+            ConfigEntry currentConfig = customConfigs.stream().filter(it -> it.name().equals(key)).findFirst().orElse(null);
+
+            TopicConfigPlan.Builder topicConfigPlan = new TopicConfigPlan.Builder()
+                    .setKey(key)
+                    .setValue(value);
+
+            if (currentConfig == null) {
+                topicConfigPlan.setAction(PlanAction.ADD);
+                configPlans.put(key, topicConfigPlan.build());
+                topicPlan.setAction(PlanAction.UPDATE);
+            } else if (!currentConfig.value().equals(value)) {
+                topicConfigPlan.setAction(PlanAction.UPDATE);
+                configPlans.put(key, topicConfigPlan.build());
+                topicPlan.setAction(PlanAction.UPDATE);
+            }
+        });
+
+        configPlans.forEach((key, plan) -> {
+            log.info("[PLAN] Topic {} | [{}] {}", topicName, plan.getAction(), plan.getKey());
+            topicPlan.addTopicConfigPlans(plan);
+        });
+    }
+
+    public void planAcls(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
+        List<AclBinding> currentAcls = kafkaService.getAcls();
+
+        currentAcls.forEach(acl -> {
+            Map.Entry<String, AclDetails> detailsEntry = desiredState.getAcls().entrySet().stream()
+                    .filter(entry -> entry.getValue().equalsAclBinding(acl))
+                    .findFirst().orElse(null);
+
+            AclPlan.Builder aclPlan = new AclPlan.Builder();
+
+            if (detailsEntry != null) {
+                aclPlan.setName(detailsEntry.getKey());
+                aclPlan.setAclDetails(detailsEntry.getValue());
+                aclPlan.setAction(PlanAction.NO_CHANGE);
+                desiredPlan.addAclPlans(aclPlan.build());
+            } else {
+                aclPlan.setName("Unnamed ACL");
+                aclPlan.setAclDetails(AclDetails.fromAclBinding(acl));
+                aclPlan.setAction(PlanAction.REMOVE);
+
+                if (!managerConfig.isDeleteDisabled()) {
+                    desiredPlan.addAclPlans(aclPlan.build());
+                }
+            }
+        });
+
+        desiredState.getAcls().forEach((key, value) -> {
+            AclBinding aclBinding = currentAcls.stream().filter(value::equalsAclBinding).findFirst().orElse(null);
+            if (aclBinding == null) {
+                AclPlan aclPlan = new AclPlan.Builder()
+                        .setName(key)
+                        .setAclDetails(value)
+                        .setAction(PlanAction.ADD)
+                        .build();
+
+                desiredPlan.addAclPlans(aclPlan);
+            }
+        });
+    }
+
+    public void validatePlanHasChanges(DesiredPlan desiredPlan) {
+        PlanOverview planOverview = PlanUtil.getOverview(desiredPlan);
+        if (planOverview.getAdd() == 0 && planOverview.getUpdate() == 0 && planOverview.getRemove() == 0) {
+            LogUtil.printNoChangesMessage();
+            System.exit(0);
+        }
+    }
+
+    public DesiredPlan readPlanFromFile() {
+        if (!managerConfig.getPlanFile().isPresent() || !managerConfig.getPlanFile().get().exists()) {
+            return null;
+        }
+
+        try {
+            return objectMapper.readValue(managerConfig.getPlanFile().get(), DesiredPlan.class);
+        } catch (IOException ex) {
+            throw new ReadPlanInputException(ex.getMessage());
+        }
+    }
+
+    public void writePlanToFile(DesiredPlan desiredPlan) {
+        if (managerConfig.getPlanFile().isPresent()) {
+            try {
+                managerConfig.getPlanFile().get().createNewFile();
+                FileWriter writer = new FileWriter(managerConfig.getPlanFile().get());
+                writer.write(objectMapper.writeValueAsString(desiredPlan));
+                writer.close();
+            } catch (IOException ex) {
+                throw new WritePlanOutputException(ex.getMessage());
+            }
+        }
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/gitops/service/ConfluentCloudService.java b/src/main/java/com/devshawn/kafka/gitops/service/ConfluentCloudService.java
new file mode 100644
index 00000000..12aa5adc
--- /dev/null
+++ b/src/main/java/com/devshawn/kafka/gitops/service/ConfluentCloudService.java
@@ -0,0 +1,48 @@
+package com.devshawn.kafka.gitops.service;
+
+import com.devshawn.kafka.gitops.config.KafkaGitopsConfigLoader;
+import com.devshawn.kafka.gitops.domain.confluent.ServiceAccount;
+import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ConfluentCloudService {
+
+    private static org.slf4j.Logger log = LoggerFactory.getLogger(KafkaGitopsConfigLoader.class);
+
+    private final ObjectMapper objectMapper;
+
+    public ConfluentCloudService(ObjectMapper objectMapper) {
+        this.objectMapper = objectMapper;
+    }
+
+    public List<ServiceAccount> getServiceAccounts() {
+        log.info("Test");
+        try {
+            String result = execCmd(new String[]{"ccloud", "service-account", "list", "-o", "json"});
+            return objectMapper.readValue(result, new TypeReference<List<ServiceAccount>>() {
+            });
+        } catch (IOException ex) {
+            throw new ConfluentCloudException("There was an error listing Confluent Cloud service accounts. Are you logged in?");
+        }
+    }
+
+    public ServiceAccount createServiceAccount(String name) {
+        try {
+            String description = String.format("Service account: %s", name);
+            String result = execCmd(new String[]{"ccloud", "service-account", "create", name, "--description", description, "-o", "json"});
+            return objectMapper.readValue(result, ServiceAccount.class);
+        } catch (IOException ex) {
+            throw new ConfluentCloudException(String.format("There was an error creating Confluent Cloud service account: %s.", name));
+        }
+    }
+
+    public static String execCmd(String[] cmd) throws java.io.IOException {
+        java.util.Scanner s = new java.util.Scanner(Runtime.getRuntime().exec(cmd).getInputStream()).useDelimiter("\\A");
+        return s.hasNext() ? s.next() : "";
+    }
+}
diff --git a/src/main/java/com/devshawn/kafka/dsf/service/KafkaService.java b/src/main/java/com/devshawn/kafka/gitops/service/KafkaService.java
similarity index 94%
rename from src/main/java/com/devshawn/kafka/dsf/service/KafkaService.java
rename to src/main/java/com/devshawn/kafka/gitops/service/KafkaService.java
index e64db277..177d686c 100644
--- a/src/main/java/com/devshawn/kafka/dsf/service/KafkaService.java
+++ b/src/main/java/com/devshawn/kafka/gitops/service/KafkaService.java
@@ -1,8 +1,8 @@
-package com.devshawn.kafka.dsf.service;
+package com.devshawn.kafka.gitops.service;
 
-import com.devshawn.kafka.dsf.config.KafkaDsfConfig;
-import com.devshawn.kafka.dsf.domain.state.TopicDetails;
-import com.devshawn.kafka.dsf.exception.KafkaExecutionException;
+import com.devshawn.kafka.gitops.config.KafkaGitopsConfig;
+import com.devshawn.kafka.gitops.domain.state.TopicDetails;
+import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.acl.*;
@@ -17,9 +17,9 @@
 
 public class KafkaService {
 
-    private final KafkaDsfConfig config;
+    private final KafkaGitopsConfig config;
 
-    public KafkaService(KafkaDsfConfig config) {
+    public KafkaService(KafkaGitopsConfig config) {
         this.config = config;
     }
 
diff --git a/src/main/java/com/devshawn/kafka/dsf/service/ParserService.java b/src/main/java/com/devshawn/kafka/gitops/service/ParserService.java
similarity index 86%
rename from src/main/java/com/devshawn/kafka/dsf/service/ParserService.java
rename to src/main/java/com/devshawn/kafka/gitops/service/ParserService.java
index 059862ab..576ab6bf 100644
--- a/src/main/java/com/devshawn/kafka/dsf/service/ParserService.java
+++ b/src/main/java/com/devshawn/kafka/gitops/service/ParserService.java
@@ -1,7 +1,7 @@
-package com.devshawn.kafka.dsf.service;
+package com.devshawn.kafka.gitops.service;
 
-import com.devshawn.kafka.dsf.domain.state.DesiredState;
-import com.devshawn.kafka.dsf.exception.ValidationException;
+import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
+import com.devshawn.kafka.gitops.exception.ValidationException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonMappingException;
@@ -35,12 +35,11 @@ public ParserService(File file) {
         this.file = file;
     }
 
-    public DesiredState parseStateFile() {
-        // TODO: Read from file
+    public DesiredStateFile parseStateFile() {
         log.info("Parsing desired state file...");
 
         try {
-            return objectMapper.readValue(file, DesiredState.class);
+            return objectMapper.readValue(file, DesiredStateFile.class);
         } catch (ValueInstantiationException ex) {
             List<String> fields = getYamlFields(ex);
             String joinedFields = String.join(" -> ", fields);
@@ -57,8 +56,9 @@ public DesiredState parseStateFile() {
             throw new ValidationException(String.format("Value '%s' is not a valid format for: [%s] in state file definition: %s", value, propertyName, joinedFields));
         } catch (JsonMappingException ex) {
             List<String> fields = getYamlFields(ex);
+            String message = ex.getCause() != null ? ex.getCause().getMessage().split("\n")[0] : ex.getMessage().split("\n")[0];
             String joinedFields = String.join(" -> ", fields);
-            throw new ValidationException(String.format("%s in state file definition: %s", ex.getCause().getMessage().split("\n")[0], joinedFields));
+            throw new ValidationException(String.format("%s in state file definition: %s", message, joinedFields));
         } catch (IOException ex) {
             throw new ValidationException(String.format("Invalid state file. Unknown error: %s", ex.getMessage()));
         }
diff --git a/src/main/java/com/devshawn/kafka/dsf/util/LogUtil.java b/src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java
similarity index 93%
rename from src/main/java/com/devshawn/kafka/dsf/util/LogUtil.java
rename to src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java
index 16e74b60..5f9b804c 100644
--- a/src/main/java/com/devshawn/kafka/dsf/util/LogUtil.java
+++ b/src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java
@@ -1,11 +1,11 @@
-package com.devshawn.kafka.dsf.util;
-
-import com.devshawn.kafka.dsf.domain.plan.*;
-import com.devshawn.kafka.dsf.domain.state.AclDetails;
-import com.devshawn.kafka.dsf.domain.state.TopicDetails;
-import com.devshawn.kafka.dsf.enums.PlanAction;
-import com.devshawn.kafka.dsf.exception.KafkaExecutionException;
-import com.devshawn.kafka.dsf.exception.WritePlanOutputException;
+package com.devshawn.kafka.gitops.util;
+
+import com.devshawn.kafka.gitops.domain.plan.*;
+import com.devshawn.kafka.gitops.domain.state.AclDetails;
+import com.devshawn.kafka.gitops.domain.state.TopicDetails;
+import com.devshawn.kafka.gitops.enums.PlanAction;
+import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
+import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
 import picocli.CommandLine;
 
 public class LogUtil {
@@ -167,6 +167,14 @@ public static void printApplyOverview(PlanOverview planOverview) {
                 planOverview.getAdd(), planOverview.getUpdate(), planOverview.getRemove()));
     }
 
+    public static void printSimpleSuccess(String message) {
+        System.out.println(String.format("[%s] %s\n", green("SUCCESS"), message));
+    }
+
+    public static void printSimpleError(String message) {
+        System.out.println(String.format("[%s] %s\n", red("ERROR"), message));
+    }
+
     public static void printGenericError(RuntimeException ex) {
         printGenericError(ex, false);
     }
diff --git a/src/main/java/com/devshawn/kafka/dsf/util/PlanUtil.java b/src/main/java/com/devshawn/kafka/gitops/util/PlanUtil.java
similarity index 88%
rename from src/main/java/com/devshawn/kafka/dsf/util/PlanUtil.java
rename to src/main/java/com/devshawn/kafka/gitops/util/PlanUtil.java
index ff9c66ad..b86483ad 100644
--- a/src/main/java/com/devshawn/kafka/dsf/util/PlanUtil.java
+++ b/src/main/java/com/devshawn/kafka/gitops/util/PlanUtil.java
@@ -1,8 +1,8 @@
-package com.devshawn.kafka.dsf.util;
+package com.devshawn.kafka.gitops.util;
 
-import com.devshawn.kafka.dsf.domain.plan.DesiredPlan;
-import com.devshawn.kafka.dsf.domain.plan.PlanOverview;
-import com.devshawn.kafka.dsf.enums.PlanAction;
+import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
+import com.devshawn.kafka.gitops.domain.plan.PlanOverview;
+import com.devshawn.kafka.gitops.enums.PlanAction;
 
 import java.util.EnumMap;
 import java.util.EnumSet;
diff --git a/src/test/groovy/com/devshawn/kafka/gitops/domain/state/service/ApplicationServiceSpec.groovy b/src/test/groovy/com/devshawn/kafka/gitops/domain/state/service/ApplicationServiceSpec.groovy
new file mode 100644
index 00000000..d5ce3ae8
--- /dev/null
+++ b/src/test/groovy/com/devshawn/kafka/gitops/domain/state/service/ApplicationServiceSpec.groovy
@@ -0,0 +1,23 @@
+package com.devshawn.kafka.gitops.domain.state.service
+
+import com.devshawn.kafka.gitops.domain.state.AclDetails
+import spock.lang.Specification
+
+class ApplicationServiceSpec extends Specification {
+
+    void 'test consumer and producer ACLs'() {
+        setup:
+        ApplicationService sut = new ApplicationService.Builder()
+                .setPrincipal("principal")
+                .addConsumes("consumer-topic")
+                .addProduces("producer-topic")
+                .build()
+
+        when:
+        List<AclDetails.Builder> result = sut.getAcls("kafka-connect-cluster")
+
+        then:
+        result
+        result.size() == 3
+    }
+}