Skip to content
Permalink
Browse files

Adding pyspark_pagerank as a module and to buildkite

Summary: See title.

Test Plan: Run tests

Reviewers: max, natekupp

Reviewed By: natekupp

Differential Revision: https://dagster.phacility.com/D110
  • Loading branch information...
schrockn committed May 13, 2019
1 parent 2464273 commit 04c6426da9d541cba2de3da529fe88e6b8959641
@@ -235,6 +235,9 @@ def airflow_tests():
steps += python_modules_tox_tests("libraries/dagster-snowflake")
steps += python_modules_tox_tests("libraries/dagster-spark")
steps += python_modules_tox_tests("../examples/toys", label='examples-toys')
steps += python_modules_tox_tests(
"../examples/pyspark-pagerank", label='examples-pyspark-pagerank'
)
steps += airline_demo_tests()
steps += events_demo_tests()
steps += airflow_tests()
@@ -25,5 +25,5 @@ examples/airline-demo/airline_demo
examples/airline-demo/airline_demo_tests
examples/event-pipeline-demo/event_pipeline_demo
examples/event-pipeline-demo/event_pipeline_demo_tests
examples/pyspark_pagerank
examples/pyspark-pagerank/pyspark_pagerank
examples/toys/toys
@@ -41,6 +41,7 @@ install_dev_python_modules:
pip install -e examples/event-pipeline-demo
pip install -e examples/airline-demo
pip install -e examples/toys
pip install -e examples/pyspark-pagerank

rebuild_dagit:
cd js_modules/dagit/; yarn install && yarn build-for-python
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
@@ -1,17 +1,10 @@
import os
import sys

from dagster import RepositoryDefinition
from dagster.utils import script_relative_path

sys.path.insert(0, os.path.abspath(script_relative_path('.')))
# above confuses pylint
# pylint: disable=wrong-import-position, import-error
from pyspark_pagerank_pipeline import define_pipeline
from steps.step_one import define_pyspark_pagerank_step_one
from steps.step_two import define_pyspark_pagerank_step_two
from steps.step_three import define_pyspark_pagerank_step_three
from steps.step_four import define_pyspark_pagerank_step_four
from .pyspark_pagerank_pipeline import define_pipeline
from .steps.step_one import define_pyspark_pagerank_step_one
from .steps.step_two import define_pyspark_pagerank_step_two
from .steps.step_three import define_pyspark_pagerank_step_three
from .steps.step_four import define_pyspark_pagerank_step_four


def define_repository():
File renamed without changes.
No changes.
@@ -2,10 +2,7 @@
from operator import add

from dagster import (
Dict,
DependencyDefinition,
Field,
Int,
InputDefinition,
OutputDefinition,
Path,
@@ -14,7 +11,7 @@
solid,
)

from dagster_framework.pyspark import spark_session_resource, SparkRDD
from dagster_pyspark import spark_session_resource, SparkRDD


def parseNeighbors(urls):
@@ -10,7 +10,7 @@
PipelineDefinition,
solid,
)
from dagster_framework.pyspark import spark_session_resource, SparkRDD
from dagster_pyspark import spark_session_resource, SparkRDD


def computeContribs(urls, rank):
@@ -42,7 +42,7 @@ def rest_of_pipeline(context, urls):
iterations = 2

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(iterations):
for _iteration in range(iterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1])
File renamed without changes.
@@ -2,7 +2,7 @@
from operator import add

from dagster import PipelineDefinition, solid, InputDefinition, Path, PipelineContextDefinition
from dagster_framework.pyspark import spark_session_resource
from dagster_pyspark import spark_session_resource


def computeContribs(urls, rank):
@@ -24,15 +24,15 @@ def whole_pipeline_solid_using_context(context, pagerank_data):
lines = context.resources.spark.read.text(pagerank_data).rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
links = lines.map(parseNeighbors).distinct().groupByKey().cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

iterations = 2

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(iterations):
for _iteration in range(iterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1])
@@ -28,15 +28,15 @@ def whole_pipeline_solid(context, pagerank_data):
lines = spark.read.text(pagerank_data).rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
links = lines.map(parseNeighbors).distinct().groupByKey().cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

iterations = 2

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(iterations):
for _iteration in range(iterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1])
No changes.
@@ -0,0 +1,7 @@
import pyspark_pagerank

from pyspark_pagerank.repository import define_repository


def test_pyspark_pagerank_repo():
assert define_repository().get_all_pipelines()
@@ -0,0 +1,25 @@
import sys

from setuptools import find_packages, setup

# pylint: disable=E0401, W0611
if sys.version_info[0] < 3:
import __builtin__ as builtins
else:
import builtins

setup(
name='pyspark-pagerank',
author='Elementl',
license='Apache-2.0',
description='Page rank on PySpark pipeline for dagster (do not publish).',
classifiers=[
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
],
packages=find_packages(exclude=['test']),
install_requires=['dagster-pyspark'],
)
@@ -0,0 +1,17 @@
[tox]
envlist = py37,py36,py35,py27

[testenv]
passenv = CIRCLECI CIRCLE_* CI_PULL_REQUEST COVERALLS_REPO_TOKEN AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID
deps =
-e ../../python_modules/dagster
-r ../../python_modules/dagster/dev-requirements.txt
-e ../../python_modules/libraries/dagster-pyspark
-e .

commands =
coverage erase
pytest -vv --junitxml=test_results.xml --cov=pyspark_pagerank --cov-append --cov-report=
coverage report --omit='.tox/*,**/test_*.py' --skip-covered
coverage html --omit='.tox/*,**/test_*.py'
coverage xml --omit='.tox/*,**/test_*.py'
@@ -17,9 +17,9 @@
)
from dagster.core.types.runtime import define_any_type

from dagster_framework.spark.configs_spark import spark_config
from dagster_spark.configs_spark import spark_config

from dagster_framework.spark.utils import flatten_dict
from dagster_spark.utils import flatten_dict


@input_selector_schema(

0 comments on commit 04c6426

Please sign in to comment.
You can’t perform that action at this time.