Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding flock to JS examples #85

Merged
merged 7 commits into from Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/flock/Dockerfile
@@ -0,0 +1,16 @@
# Alpine base image with NodeJS (lts)
FROM node:lts-alpine

LABEL maintainer="Prashant Shahi<prashant@dgraph.io>"

# Setting work directory
WORKDIR /usr/src/app

# Copy the source code of app to docker daemon
COPY . ./

# Install npm dependencies
RUN npm install

# Run the node command
CMD ["npm", "start"]
97 changes: 97 additions & 0 deletions examples/flock/README.md
@@ -0,0 +1,97 @@
# Flock

Flock loads real Twitter streams into Dgraph to make use of graph traversals.

This example follows the [Flock using Go](https://github.com/dgraph-io/flock) closely.

Flock has two parts :
- [*Tweet loader*](./index.js) - It connects to [realtime Tweets][tweetsapi] via the
Twitter Developer API and loads a graph model of Twitter into Dgraph via mutations.
- [*Query client*](./client/index.js) - It runs interesting graph queries on the Tweets
data stored in Dgraph.

Here is the graph schema of Flock:

![Schema](./schema.png)

[tweetsapi]: https://developer.twitter.com/en/docs/tweets/sample-realtime/overview/GET_statuse_sample

# Running Flock

## Obtaining Twitter credentials

We need to create a Twitter developer account and an app to be able to fetch a stream
of Tweets using their APIs. Let's start with how to create a Twitter developer account.

- Apply for a Twitter developer account [here](https://developer.twitter.com/en/apply/user)
and follow the instructions. The series of steps would end with your email verification.
- Create a Twitter app from [this link](https://developer.twitter.com/en/apps/create).
All fields are `not` required.
- You'll be redirected to the App details tab after creating the app. Go to the
`Keys and tokens` tab and create new access and secret tokens.
![Twitter Developer account](./twitter-keys.png)
- Create a copy of the credentials template.
```sh
cp credentials-template.json credentials.json
```
- Open the `crendentials.json` file and replace the placeholders with the keys from
the Twitter app's `Keys and token` tab.

---
## Setup

- Clone the repository.
```sh
$ git clone https://github.com/dgraph-io/dgraph-js.git
$ cd dgraph-js/examples/flock
```

- Export the persistent data directory. Since Dgraph is run using Docker containers, it
is nice to mount a directory on the host machine to persist the data across multiple runs.
```sh
$ mkdir ~/dgraph
$ export DATA_DIR=~/dgraph
```

- If you're running Linux, you can add the current user to the `docker` group to use
Docker as a non-root user. The `newgrp` command creates a new terminal session.
It is necessary after the user modification to see the effects.

```
$ sudo usermod -aG docker $USER
$ newgrp docker
```

- Ensure that `credentials.json` with the valid Twitter credentials exist in
the root directory of Flock.

- Start the Dgraph servers and Ratel with Docker Compose. Visit http://localhost:8000
on your browser to view the UI.

```sh
$ docker-compose up
```

- On another terminal, start Flock:

```sh
$ docker-compose -f docker-compose-flock.yml up
```

Flock will begin printing out periodic log messages mentioning its
loading rate. You're good to go if you see the `Commit Rate` higher
than 0/sec, which means data has been successfully committed to
Dgraph.

A few minutes of running Flock is sufficient to get enough data for
some interesting queries. To stop running Flock, press Ctrl+C on the
terminal running Flock.

```sh
$ docker-compose -f docker-compose-flock.yml up
...
<Ctrl+C>
Killing flock ... done
```

---
6 changes: 6 additions & 0 deletions examples/flock/credentials-template.json
@@ -0,0 +1,6 @@
{
"access_token_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"access_token_key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"consumer_key": "XXXXXXXXXXXXXXXXXXXXXXXXX",
"consumer_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
}
14 changes: 14 additions & 0 deletions examples/flock/docker-compose-flock.yml
@@ -0,0 +1,14 @@
version: "3.5"
services:
flock:
build: .
container_name: flock
labels:
cluster: flock-cluster
environment:
- ALPHA_ADDR=localhost:9080
- LOG_INTERVAL_TIME=2000
restart: always
network_mode: "host"
volumes:
- ./credentials.json:/usr/src/app/credentials.json
47 changes: 47 additions & 0 deletions examples/flock/docker-compose.yml
@@ -0,0 +1,47 @@
version: "3.2"
services:
zero:
image: dgraph/dgraph:v1.1.0
container_name: zero
labels:
cluster: flock-cluster
ports:
- 5080:5080
- 6080:6080
volumes:
- type: bind
source: $DATA_DIR
target: /dgraph
restart: on-failure
command: dgraph zero --my=zero:5080
alpha:
container_name: alpha
image: dgraph/dgraph:v1.1.0
labels:
cluster: flock-cluster
ports:
- 8080:8080
- 9080:9080
volumes:
- type: bind
source: $DATA_DIR
target: /dgraph
restart: on-failure
command: dgraph alpha --my=alpha:7080 --lru_mb=2048 --zero=zero:5080
ratel:
container_name: ratel
image: dgraph/dgraph:v1.1.0
labels:
cluster: flock-cluster
volumes:
- type: volume
source: dgraph
target: /dgraph
volume:
nocopy: true
ports:
- 8000:8000
command: dgraph-ratel

volumes:
dgraph:
209 changes: 209 additions & 0 deletions examples/flock/index.js
@@ -0,0 +1,209 @@
const dgraph = require('dgraph-js');
const grpc = require('grpc');
const twitter = require('twitter');

const creds = require('./credentials.json');
const client = new twitter(creds);

const ALPHA_ADDR = process.env.ALPHA_ADDR || "localhost:9080"
const LOG_INTERVAL_TIME = process.env.LOG_INTERVAL_TIME || 2000;
const startStatus = new Date().getTime();

let lastStatus = 0;
let retry = true;
let failureCount = 0;
let totalTweets = 0;
let oldTotalTweets = 0;
let retryCount = 0;
let errorCount = 0;

const dgraphClientStub = new dgraph.DgraphClientStub(ALPHA_ADDR, grpc.credentials.createInsecure());
const dgraphClient = new dgraph.DgraphClient(dgraphClientStub);

async function setSchema() {
const schema = `
type Tweet {
id_str: string
created_at: dateTime
message: string
urls: [string]
hashtags: [string]
author: [User]
mention: [User]
retweet: bool
}

type User {
user_id: string
user_name: string
screen_name: string
description: string
friends_count: int
verified: bool
profile_banner_url: string
profile_image_url: string
}

user_id: string @index(exact) .
user_name: string @index(hash) .
screen_name: string @index(term) .
id_str: string @index(exact) .
created_at: dateTime @index(hour) .
urls: [string] @index(term) .
hashtags: [string] @index(exact) .
mention: [uid] @count @reverse .
author: [uid] @count @reverse .
`;
const op = new dgraph.Operation();
op.setSchema(schema)
await dgraphClient.alter(op);
}

async function upsertData(jsonObj, query) {
try {
const mu = new dgraph.Mutation();
mu.setSetJson(jsonObj);

const req = new dgraph.Request();
req.setMutationsList([mu]);
req.setQuery(query);
req.setCommitNow(true);

await dgraphClient.newTxn().doRequest(req);
} catch (err) {
const errMsg = err.message;
if (errMsg.includes('connection refused')) {
// wait for alpha to restart
console.log('ERROR Connection refused... waiting a bit');
await wait(5000);
} else if (errMsg.includes('already been committed or discarded')) {
failureCount += 1;
} else if (retry && errMsg.includes('Please retry')) {
retryCount += 1;
await wait(100);
retry = false;
await upsertData(jsonObj, query);
} else {
errorCount += 1;
console.log(`ERROR Unable to commit.\n${err}\n`);
}
}
}

async function filterTweet(tweet) {
const userMentions = [];
const usersObject = [];
usersObject[tweet.user.id_str] = 'uid(u)';
tweet.entities.user_mentions.forEach((element, index) => {
let uid;
if (usersObject[element.id_str] != undefined) {
uid = usersObject[element.id_str];
} else {
uid = `uid(m${index+1})`;
usersObject[element.id_str] = uid;
}
userMentions.push({
'uid': uid,
'user_id': element.id_str,
'dgraph.type': 'User',
'user_name': element.name,
'screen_name': element.screen_name,
});
});
const hashtags = [];
tweet.entities.hashtags.forEach((element) => {
hashtags.push(element.text);
});
const author = {
'uid': `uid(u)`,
'user_id': tweet.user.id_str,
'dgraph.type': 'User',
'user_name': tweet.user.name,
'screen_name': tweet.user.screen_name,
'description': tweet.user.description,
'friends_count': tweet.user.friends_count,
'followers_count': tweet.user.followers_count,
'verified': tweet.user.verified,
'profile_banner_url': tweet.user.profile_banner_url,
'profile_image_url': tweet.user.profile_image_url,
};
const userObj = {
'uid': `uid(t)`,
'id_str': tweet.id_str,
'dgraph.type': 'Tweet',
'created_at': new Date(tweet.created_at),
'message': tweet.text,
'urls': tweet.urls,
'hashtags': hashtags,
'mention': userMentions,
'author': author,
};
return userObj;
}

async function buildQuery(tweet) {
const usersObject = [];
const query = [];

query.push(`t as var(func: eq(id_str, "${tweet.id_str}"))`);
query.push(`u as var(func: eq(user_id, "${tweet.author.user_id}"))`);
Comment on lines +146 to +149
Copy link
Contributor

@paulftw paulftw Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  const query = [
    `t as var(...)`,
    `u as var(...)`,
  ];


usersObject[tweet.author.user_id] = 'u';

tweet.mention.forEach((element, index) => {
let name;
if (usersObject[element.user_id] != undefined) {
name = usersObject[element.user_id];
} else {
name = `m${index+1}`;
query.push(`${name} as var(func: eq(user_id, ${element.user_id}))`);
usersObject[element.user_id] = name;
}
});

const finalQuery = `query {${query.join('\n')}}`;
return finalQuery;
Comment on lines +164 to +165
Copy link
Contributor

@paulftw paulftw Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  return `query {...}`;

}

function reportStats() {
const now = new Date().getTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paul client 𝝺 node
Welcome to Node.js v12.10.0.
Type ".help" for more information.
> new Date().getTime()
1570204761045
> Date.now()
1570204765965

Date.now() returns same value but shorter to type

// tslint:disable-next-line no-console
console.log(`STATS Tweets: ${totalTweets}, Failues: ${failureCount}, Retries: ${retryCount}, \
Errors: ${errorCount}, Commit Rate: ${(totalTweets-oldTotalTweets)/(LOG_INTERVAL_TIME/1000)} Total Time: ${now - startStatus} ms`);
oldTotalTweets = totalTweets;
}

async function wait(time) {
return new Promise((resolve) => {
const id = setTimeout(
() => {
clearTimeout(id);
resolve();
},
time,
);
});
}

Comment on lines +176 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is test code const wait = time => new Promise(resolve => setTimeout(resolve, time)) is good enough and shorter. const id isn't used anyway so no reason to keep it around or call clearTimeout

async function main() {
await setSchema();
setInterval(reportStats, LOG_INTERVAL_TIME);
client.stream('statuses/sample.json', function(stream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client is instantiated at the top of the file but only used once here. I think const client=... should be inside main

stream.on('data', async function(tweet) {
totalTweets += 1;
const tweetObj = await filterTweet(tweet);
const queries = await buildQuery(tweetObj);
retry = true;
await upsertData(tweetObj, queries);
});
stream.on('error', function(error) {
console.log(error);
});
});
}

main().then(() => {
console.log(`\nReporting stats every ${LOG_INTERVAL_TIME/1000} seconds\n`)
}).catch((e) => {
console.log(e);
});